ラズパイコンテスト出品作品について(2)
●S/Wについて
開発は python2.7 で行いました。
pythonが一番ライブラリが揃っているように思えるのが採用理由です。
システムは5つのプログラムからできています。
1.GPIO-4 監視部
焦電センサーの入力を1秒おきに監視し、信号がアサートされたら
"detected" という文字列を標準出力に投げます。
焦電センサーの出力を見ると、結構細かく揺れ動いているため、
単純に焦電センサーの報告をうのみにすると人がいない状況
でも鳴り続けてしまいます。
そこで、5秒間センサー出力が継続している場合だけ標準出力への
出力を行うようにしました。また、出力はワンショットで、
0 レベル検出でリセットされます。
import RPi.GPIO as GPIO import time, sys IO_NO = 4 count = 0 GPIO.setmode(GPIO.BCM) GPIO.setup(IO_NO, GPIO.IN) try: while True: if GPIO.input(IO_NO) == 1: count = count + 1 else: count = 0 if count == 4: print "detected" sys.stdout.flush() time.sleep(1) except KeyboardInterrupt: print "bye" GPIO.cleanup() print "program exit"
2.マッシュアップ部
標準入力から "detected" という文字列が入力されたら、あらかじめ登録されている
インターネットサイトにアクセスし、必要な情報を取得します。
YOLP と livedoor は専用の API を使っていますが、運行情報だけは無料で使える
適当な API がなかったため、Yahoo! Japan の情報をスクレイピングして使っています。
全てのサイトにアクセスしたら、音声合成エンジンに渡す文字列を構成し、
音声合成エンジンと音声出力プログラムを起動します。
#!/usr/bin/python # coding: utf-8 from HTMLParser import HTMLParser import json, datetime, urllib, time, subprocess, sys myurls = { "weather": "http://weather.livedoor.com/forecast/webservice/json/v1?city=xxxxxx", "pinpoint": "http://weather.olp.yahooapis.jp/v1/place?appid=zzzzzz&coordinates=0.0,0.0&output=json", "train": "http://transit.yahoo.co.jp/traininfo/area/y/" } myTrain = ("高崎線") talker = "/usr/local/aquestalkpi/AquesTalkPi -f - | aplay -D plughw:0" myWarningProcess = ["/root/GPIO_output.py"] class MyHTMLParser(HTMLParser): def __init__(self): HTMLParser.__init__(self) self.status = 0 self.count = 0 self.capture_data = False self.line = "" self.line_status = "" self.line_title = "" self.result = [] def handle_starttag(self, tag, attrs): for a1 in attrs: if a1[0] == "id" and a1[1] == "mdStatusTroubleLine": self.status = 1 if self.status == 1 and tag == "td": self.count = 0 self.status = 2 if self.status == 2 and self.count == 0 and tag == "a": self.capture_data = True if self.status == 2 and self.count == 1: for a2 in attrs: if a2[0] == "class" and a2[1] == "colTrouble": self.capture_data = True if self.status == 2 and self.count == 2 and tag == "td": self.capture_data = True def handle_data(self, data): if self.capture_data: if self.count == 0: self.line = data elif self.count == 1: self.line_status = data elif self.count == 2: self.line_title = data if self.line_status != "": self.result.append([self.line, self.line_status, self.line_title]) def handle_endtag(self, tag): if tag == "td" and self.status == 2: self.count = self.count + 1 if self.count >= 3: self.count = 0 self.line = "" self.line_status = "" self.line_title = "" self.capture_data = False if tag == "table": self.status = 0 def getWeatherLivedoor(): url = urllib.URLopener() #fh = url.open("http://weather.livedoor.com/forecast/webservice/json/v1?city=xxxxxx") fh = url.open(myurls["weather"]) data = fh.read() url.close() return json.loads(data) def getWeatherData(): url = urllib.URLopener() fh = url.open(myurls["pinpoint"]) data = fh.read() url.close() return json.loads(data) def getWeatherList(jf): f = jf[u'Feature'][0] p = f[u'Property'] wl = p[u'WeatherList'] ww = wl[u'Weather'] return ww def mashup(): outputString = u"" # 現在時刻の表示 now = datetime.datetime.now() current = now.strftime("%Y年%m月%d日 %H時%M分%S秒").decode('utf-8') outputString = outputString + u"現在の時刻は " + current + u" です。" # 今日の予報を表示 data = getWeatherLivedoor() tenkiString = u"" if data != 0: for fc in data[u'forecasts']: if u"今日" == fc[u'dateLabel']: tenkiString = tenkiString + u"今日の天気は " + fc[u'telop'] + u" の予報です。" if u"明日" == fc[u'dateLabel']: tenkiString = tenkiString + u"明日の天気は " + fc[u'telop'] + u" の予報です。" # 現在と1時間後の降水量情報を表示 data = getWeatherData() rainfallString = u"" if data != 0: i = 0 weather = getWeatherList(data) wlen = len(weather) while i < wlen: w = weather[i] if w[u'Type'] == u'observation': rainfallString = rainfallString + u"現在の降水量は " + str(w[u'Rainfall']) + u"ミリです。" elif i == 6: rainfallString = rainfallString + u"1時間後の予想降水量は " + str(w[u'Rainfall']) + u"ミリです。" if w[u'Rainfall'] != 0.0: myWarningProcess.append("G") myWarningProcess.append("0.5") i = i + 1 # 交通機関情報を取得・表示 html = MyHTMLParser() url = urllib.URLopener() fh = url.open(myurls["train"]) html.feed(fh.read()) url.close() tetsuString = u"" if len(html.result) == 0: tetsuString = tetsuString + u"列車遅延はありません" for r in html.result: tetsuString = tetsuString + u" " + r[0].decode('utf-8') + u":" + r[1].decode('utf-8') + u":" + r[2].decode('utf-8') for tr in myTrain: if r[0] == tr: myWarningProcess.append("R") myWarningProcess.append("0.5") return outputString + tenkiString + rainfallString + tetsuString # メインループ ph = 0 ptalk = 0 try: while True: notify = raw_input() if notify == "detected": outputString = mashup() if len(myWarningProcess) < 5: myWarningProcess.append("N") myWarningProcess.append("0.5") if len(myWarningProcess) == 5: ph = subprocess.Popen(myWarningProcess) ptalk = subprocess.Popen(talker, shell=True, stdin=subprocess.PIPE) outputString = outputString + u"\n" print outputString ptalk.communicate(outputString.encode('utf-8')) except KeyboardInterrupt: print "bye"
3.LED制御
マッシュアップ部から起動されるプログラムで、収集したデータがLED発行条件に
合っていると起動されます。20秒間 LED を点滅させます。
二重起動防止の仕組みがあり、すでに LED 制御されていた場合は即座に終了します
(先着優先)。
#!/usr/bin/python2 # coding: utf-8 import RPi.GPIO as GPIO import time, os, signal, sys # LED 点灯時間 duration = 20 argc = len(sys.argv) pat = [] if argc < 3: pat = [ ('R', 0.5), ('N', 0.5) ] else: count = 1 while count < argc and count + 1 < argc: pat.append((sys.argv[count], float(sys.argv[count + 1]))) count = count + 2 now = time.time() endtime = now + duration IO_NO1 = 5 IO_NO2 = 6 def term_handler(signum, frame): GPIO.cleanup() os.remove("/run/GPIO_output.pid") try: fh = open("/run/GPIO_output.pid", "r") if fh: pid = int(fh.read()) try: os.kill(pid, signal.SIGTERM) term_handler(0, 0) time.sleep(0.5) except OSError: time.sleep(0.1) fh.close() except IOError: fh = 0 fh = open("/run/GPIO_output.pid", "w") if fh: fh.write(str(os.getpid())) fh.close() signal.signal(signal.SIGTERM, term_handler) # use GPIO pin number GPIO.setmode(GPIO.BCM) # use BOARD pin number # GPIO.setmode(GPIO.BOARD) GPIO.setup(IO_NO1, GPIO.OUT) GPIO.setup(IO_NO2, GPIO.OUT) current = 0 try: while now < endtime: if pat[current][0] == "Y": GPIO.output(IO_NO1, False) GPIO.output(IO_NO2. False) elif pat[current][0] == "G": GPIO.output(IO_NO1, True) GPIO.output(IO_NO2, False) elif pat[current][0] == "R": GPIO.output(IO_NO1, False) GPIO.output(IO_NO2, True) else: GPIO.output(IO_NO1, True) GPIO.output(IO_NO2, True) time.sleep(pat[current][1]) current = current + 1 if current == len(pat): current = 0 now = time.time() except KeyboardInterrupt: print "bye" term_handler(0, 0)
4.音声合成エンジン
AquesTalkPi を使いました。
5.音声デバイス(USBスピーカー)への出力
aplay を使いました。なお、USB スピーカーが音声デバイスリストの先頭に来るように
若干起動パラメータを変更しています。