徒然なる日々を送るソフトウェアデベロッパーの記録(2)

技術上思ったことや感じたことを気ままに記録していくブログです。さくらから移設しました。

ラズパイコンテスト出品作品について(2)

●S/Wについて
開発は python2.7 で行いました。
pythonが一番ライブラリが揃っているように思えるのが採用理由です。
システムは5つのプログラムからできています。
f:id:minosys:20150830082924p:plain

1.GPIO-4 監視部
焦電センサーの入力を1秒おきに監視し、信号がアサートされたら
"detected" という文字列を標準出力に投げます。

焦電センサーの出力を見ると、結構細かく揺れ動いているため、
単純に焦電センサーの報告をうのみにすると人がいない状況
でも鳴り続けてしまいます。

そこで、5秒間センサー出力が継続している場合だけ標準出力への
出力を行うようにしました。また、出力はワンショットで、
0 レベル検出でリセットされます。

import RPi.GPIO as GPIO
import time, sys

IO_NO = 4
count = 0
GPIO.setmode(GPIO.BCM)
GPIO.setup(IO_NO, GPIO.IN)

try:
        while True:
                if GPIO.input(IO_NO) == 1:
                        count = count + 1
                else:
                        count = 0
                if count == 4:
                        print "detected"
                        sys.stdout.flush()
                time.sleep(1)
except KeyboardInterrupt:
        print "bye"

GPIO.cleanup()
print "program exit"

2.マッシュアップ
標準入力から "detected" という文字列が入力されたら、あらかじめ登録されている
インターネットサイトにアクセスし、必要な情報を取得します。
YOLP と livedoor は専用の API を使っていますが、運行情報だけは無料で使える
適当な API がなかったため、Yahoo! Japan の情報をスクレイピングして使っています。

全てのサイトにアクセスしたら、音声合成エンジンに渡す文字列を構成し、
音声合成エンジンと音声出力プログラムを起動します。

#!/usr/bin/python
# coding: utf-8
from HTMLParser import HTMLParser
import json, datetime, urllib, time, subprocess, sys

myurls = { "weather": "http://weather.livedoor.com/forecast/webservice/json/v1?city=xxxxxx",
"pinpoint": "http://weather.olp.yahooapis.jp/v1/place?appid=zzzzzz&coordinates=0.0,0.0&output=json",
"train": "http://transit.yahoo.co.jp/traininfo/area/y/" }
myTrain = ("高崎線")
talker = "/usr/local/aquestalkpi/AquesTalkPi -f - | aplay -D plughw:0"
myWarningProcess = ["/root/GPIO_output.py"]

class MyHTMLParser(HTMLParser):
        def __init__(self):
                HTMLParser.__init__(self)
                self.status = 0
                self.count = 0
                self.capture_data = False
                self.line = ""
                self.line_status = ""
                self.line_title = ""
                self.result = []

        def handle_starttag(self, tag, attrs):
                for a1 in attrs:
                        if a1[0] == "id" and a1[1] == "mdStatusTroubleLine":
                                self.status = 1
                if self.status == 1 and tag == "td":
                        self.count = 0
                        self.status = 2
                if self.status == 2 and self.count == 0 and tag == "a":
                        self.capture_data = True
                if self.status == 2 and self.count == 1:
                        for a2 in attrs:
                                if a2[0] == "class" and a2[1] == "colTrouble":
                                        self.capture_data = True
                if self.status == 2 and self.count == 2 and tag == "td":
                        self.capture_data = True

        def handle_data(self, data):
                if self.capture_data:
                        if self.count == 0:
                                self.line = data
                        elif self.count == 1:
                                self.line_status = data
                        elif self.count == 2:
                                self.line_title = data
                                if self.line_status != "":
                                        self.result.append([self.line, self.line_status, self.line_title])

        def handle_endtag(self, tag):
                if tag == "td" and self.status == 2:
                        self.count = self.count + 1
                        if self.count >= 3:
                                self.count = 0
                                self.line = ""
                                self.line_status = ""
                                self.line_title = ""
                        self.capture_data = False
                if tag == "table":
                        self.status = 0

def getWeatherLivedoor():
        url = urllib.URLopener()
        #fh = url.open("http://weather.livedoor.com/forecast/webservice/json/v1?city=xxxxxx")
        fh = url.open(myurls["weather"])
        data = fh.read()
        url.close()
        return json.loads(data)

def getWeatherData():
        url = urllib.URLopener()
        fh = url.open(myurls["pinpoint"])
        data = fh.read()
        url.close()
        return json.loads(data)

def getWeatherList(jf):
        f = jf[u'Feature'][0]
        p = f[u'Property']
        wl = p[u'WeatherList']
        ww = wl[u'Weather']
        return ww

def mashup():
        outputString = u""
        # 現在時刻の表示
        now = datetime.datetime.now()
        current = now.strftime("%Y年%m月%d日 %H時%M分%S秒").decode('utf-8')
        outputString = outputString + u"現在の時刻は " +  current + u" です。"

        # 今日の予報を表示
        data = getWeatherLivedoor()
        tenkiString = u""
        if data != 0:
                for fc in data[u'forecasts']:
                        if u"今日" == fc[u'dateLabel']:
                                tenkiString = tenkiString + u"今日の天気は " +  fc[u'telop'] + u" の予報です。"
                        if u"明日" == fc[u'dateLabel']:
                                tenkiString = tenkiString + u"明日の天気は " +  fc[u'telop'] + u" の予報です。"

        # 現在と1時間後の降水量情報を表示
        data = getWeatherData()
        rainfallString = u""
        if data != 0:
                i = 0
                weather = getWeatherList(data)
                wlen = len(weather)
                while i < wlen:
                        w = weather[i]
                        if w[u'Type'] == u'observation':
                                rainfallString = rainfallString + u"現在の降水量は " + str(w[u'Rainfall']) + u"ミリです。"
                        elif i == 6:
                                rainfallString = rainfallString + u"1時間後の予想降水量は " + str(w[u'Rainfall']) + u"ミリです。"
                                if w[u'Rainfall'] != 0.0:
                                        myWarningProcess.append("G")
                                        myWarningProcess.append("0.5")
                        i = i + 1
        # 交通機関情報を取得・表示
        html = MyHTMLParser()
        url = urllib.URLopener()
        fh = url.open(myurls["train"])
        html.feed(fh.read())
        url.close()
        tetsuString = u""
        if len(html.result) == 0:
                tetsuString = tetsuString + u"列車遅延はありません"
        for r in html.result:
                tetsuString = tetsuString + u" " + r[0].decode('utf-8') +  u":" + r[1].decode('utf-8') +  u":" +  r[2].decode('utf-8')
                for tr in myTrain:
                        if r[0] == tr:
                                myWarningProcess.append("R")
                                myWarningProcess.append("0.5")
        return outputString + tenkiString + rainfallString + tetsuString

# メインループ
ph = 0
ptalk = 0
try:
        while True:
                notify = raw_input()
                if notify == "detected":
                        outputString = mashup()
                        if len(myWarningProcess) < 5:
                                myWarningProcess.append("N")
                                myWarningProcess.append("0.5")
                        if len(myWarningProcess) == 5:
                                ph = subprocess.Popen(myWarningProcess)
                        ptalk = subprocess.Popen(talker, shell=True, stdin=subprocess.PIPE)
                        outputString = outputString + u"\n"
                        print outputString
                        ptalk.communicate(outputString.encode('utf-8'))
except KeyboardInterrupt:
        print "bye"

3.LED制御
マッシュアップ部から起動されるプログラムで、収集したデータがLED発行条件に
合っていると起動されます。20秒間 LED を点滅させます。
二重起動防止の仕組みがあり、すでに LED 制御されていた場合は即座に終了します
(先着優先)。

#!/usr/bin/python2
# coding: utf-8

import RPi.GPIO as GPIO
import time, os, signal, sys

# LED 点灯時間
duration = 20

argc = len(sys.argv)

pat = []
if argc < 3:
        pat = [ ('R', 0.5), ('N', 0.5) ]
else:
        count = 1
        while count < argc and count + 1 < argc:
                pat.append((sys.argv[count], float(sys.argv[count + 1])))
                count = count + 2

now = time.time()
endtime = now + duration

IO_NO1 = 5
IO_NO2 = 6

def term_handler(signum, frame):
        GPIO.cleanup()
        os.remove("/run/GPIO_output.pid")

try:
        fh = open("/run/GPIO_output.pid", "r")
        if fh:
                pid = int(fh.read())
                try:
                        os.kill(pid, signal.SIGTERM)
                        term_handler(0, 0)
                        time.sleep(0.5)
                except OSError:
                        time.sleep(0.1)
                fh.close()
except IOError:
        fh = 0

fh = open("/run/GPIO_output.pid", "w")
if fh:
        fh.write(str(os.getpid()))
        fh.close()

signal.signal(signal.SIGTERM, term_handler)

# use GPIO pin number
GPIO.setmode(GPIO.BCM)
# use BOARD pin number
# GPIO.setmode(GPIO.BOARD)

GPIO.setup(IO_NO1, GPIO.OUT)
GPIO.setup(IO_NO2, GPIO.OUT)

current = 0
try:
        while now < endtime:
                if pat[current][0] == "Y":
                        GPIO.output(IO_NO1, False)
                        GPIO.output(IO_NO2. False)
                elif pat[current][0] == "G":
                        GPIO.output(IO_NO1, True)
                        GPIO.output(IO_NO2, False)
                elif pat[current][0] == "R":
                        GPIO.output(IO_NO1, False)
                        GPIO.output(IO_NO2, True)
                else:
                        GPIO.output(IO_NO1, True)
                        GPIO.output(IO_NO2, True)
                time.sleep(pat[current][1])
                current = current + 1
                if current == len(pat):
                        current = 0
                now = time.time()
except KeyboardInterrupt:
        print "bye"

term_handler(0, 0)

4.音声合成エンジン
AquesTalkPi を使いました。

5.音声デバイス(USBスピーカー)への出力
aplay を使いました。なお、USB スピーカーが音声デバイスリストの先頭に来るように
若干起動パラメータを変更しています。