I’m not sure the best way to do this, and is not working for my as it will lock the program on when its asleep not sure how to make it works right…
Im trying to monitor when a door open and closes with a raspberry Pi if there door is open for more than x time send some sort of alert (like and email), I’ve the problem that when the door closes before the countdown finished it wont stop the countdown, which causes the thread to halt, also havent implemented the alert side of thing but if the code is as at the moment it will trigger the alert even if the door closes before countdown.
At the moment I’m using a push button instead of a door sensor for testing, , also eventually i will log the opening and closing of the door, but for now id like to know if there is a nicer way of doing this, i got the code im using from this post
my code is as follows
#!/usr/bin/python
import threading, subprocess, sys, time, syslog
import RPi.GPIO as GPIO
lim = 2 # seconds until warning
# thread for countdown (should be interruptable)
class CountdownTask:
global dooropen
global countdone
def __init__(self):
#print("thread in")
self._running = True
def start(self):
print("thread in")
self._running = True
def terminate(self):
print("thread killed")
self._running = False
def run(self, n):
while True:
global countdone
while self._running and dooropen == False and countdone:
pass
while self._running and dooropen == False and countdone == False:
pass
while self._running and dooropen and countdone:
pass
while self._running and dooropen and countdone == False:
print("start timer")
time.sleep(5)
if dooropen:
## action when timer isup
print("timer ended, send notify")
countdone = True
c = CountdownTask()
t = threading.Thread(target=c.run, args=(lim,))
t.daemon = True
REED = 23 # data pin of reed sensor (in)
# GPIO setup
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(REED, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
dooropen = False # assuming door's closed when starting
countdone = True
def edge(channel):
global dooropen
global countdone
if GPIO.input(REED): # * no longer reached
if dooropen == False: # catch fridge compressor spike
print("Detect open")
countdone = False
dooropen = True
else:
print("Door closed")
dooropen = False
def main():
GPIO.add_event_detect(REED, GPIO.RISING, callback=edge, bouncetime=300)
t.start()
while True:
pass
#------------------------------------------------------------
if __name__ == "__main__": main()
Update:
Looks like I need to be using threading.Event and wait could someone advise how to implement this on my code?
Advertisement
Answer
I think i got a working script
#!/usr/bin/python
import threading
import time
import logging
import RPi.GPIO as GPIO
import smtplib
from email.MIMEMultipart import MIMEMultipart
from email.MIMEText import MIMEText
logging.basicConfig(level=logging.DEBUG,format='(%(threadName)-9s) %(message)s',)
# GPIO setup
Input = 23 # data pin of Input sensor (in)
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(Input, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
global button
button = False
global count
count = 1
#global t
t = 1
countdown = 5
def sendmail():
fromaddr = "email"
toaddr = "tomail"
msg = MIMEMultipart()
msg['From'] = fromaddr
msg['To'] = toaddr
msg['Subject'] = "DoorAlarm"
body = "This is a test mail generated with python on a RPi"
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login("username", "password")
text = msg.as_string()
server.sendmail(fromaddr, toaddr, text)
server.quit()
def timeout(e, t):
global count
global button
while True:
while button:
while not e.isSet() and count <= countdown:
if count == 1: logging.debug('starting counter')
event_is_set = e.wait(t)
if event_is_set:
count = 1
logging.debug('Door closed before countdown')
else:
count += 1
if count == countdown:
logging.debug('countdown completed - notify')
#sendmail()
def edge(channel):
global button
global count
if button == False: # catch fridge compressor spike
button = True
e.clear()
print("log door open")
if count != 1:
count = 1
else:
button = False
e.set()
print("log door closed")
if __name__ == '__main__':
e = threading.Event()
t = threading.Thread(name='non-blocking',target=timeout,args=(e, t))
t.start()
logging.debug('Waiting before calling Event.set()')
GPIO.add_event_detect(Input, GPIO.RISING, callback=edge, bouncetime=300)