sey323’s blog

にわかシステムエンジニア兼にわかデータサイエンティストです。

RaspberryPi+CO2センサー(MH-Z14B)でCO2データをPythonで収集し可視化する

はじめに

以下の記事で、家の温度や湿度を取得していたが、ついでに二酸化炭素濃度も取得したいと思いラズパイとCO2センサを利用し収集する環境を構築した。

sey323log.hatenablog.com

Amazonで販売されているCO2センサーを見ると、コロナ需要で様々なメーカから出品されており、精度、APIの使いやすさ、メンテナンス等を考慮すると、どれ購入すれば良いかわからない。「よくわからない精度のものので曖昧なデータを取るくらいだったら自作してしまおう」と思い、今回はCO2を取得するセンサーをラズパイで自作した。結果、10,000円程度の出費でCO2センサーを作成し、そのCO2濃度を操作できる環境を構築できた。

実装

二酸化炭素濃度の収集の全体構成は以下の通り。まず、二酸化炭素濃度を収集しMongoDBに保存する。その後、保存したデータをOSSのBIツールのMetabaseで可視化する。

CO2収集環境の全体像

Cloud Pub/Subからデータを取得しMongoDBに保存する処理(図中②)はこちらの記事で作成したサブスクライブワーカーをそのまま利用する。今回はCO2センサーの作成と取得したセンサーデータをCloud Pub/Subに送信するパブリッシュワーカー(図中①)の作成を行う。

1. 前準備

可視化環境(MongoDB+Metabase)の構築

収集したデータを保存して可視化する環境としてMongoDBとMetabaseを用意する。この環境は下記の記事で作成したdocker-composeを利用して構築する。

sey323log.hatenablog.com

MongoDBとMetabaseの起動のコマンドを、以下に示す。

docker-compose up -d

起動が完了後それぞれ以下のポートでアクセスが可能となる。

またMongoDBのユーザ名(mongodb_user)とパスワード(mongodb_pass)は後のサブスクライブワーカーで利用する。

利用するdocker-compose.ymlのソースコード

services:
    mongodb:
      image: mongo
      container_name: mongodb
      restart: always
      environment:
        MONGO_INITDB_ROOT_USERNAME: mongodb_user
        MONGO_INITDB_ROOT_PASSWORD: mongodb_pass
      ports:
        - 27017:27017
      volumes:
        - ./mongo/db:/data/db
        - ./mongo/configdb:/data/configdb

    metabase:
        image: metabase/metabase:v0.33.5
        container_name: metabase
        ports:
            - 3333:3000
        volumes:
            - ./metabase/data:/mnt/data

2. RaspberryPi+CO2センサ(MH-Z14B)の接続

ラズパイの初期設定などは他のサイトを参照。

利用した機材

  • Raspberry Pi 3 model B+
  • MH-Z14B(CO2センサ)
  • ジャンパワイヤ(オス-メス) 
  • ピンソケット(メス、2.54 mm)

CO2センサは以下のものを利用した。

Amanzonのページでは MH-Z14Aと記載があったが、購入して届いたものの型番は MH-Z14Bであった。秋月電子のページで検索してもMH-Z14Aの型番のものがヒットしないので、そちらは販売が終了したのかもしれない。

ラズパイのUART通信の有効化

ラズパイで配線を行いシリアル通信を行うためには、ラズパイのUARTを有効化する必要がある。UARTの有効化は以下のリンクのいずれかを参照。

qiita.com

qiita.com

最終的にラズパイ上で以下のコマンドを実行し、それぞれが以下のように割り当てられていればOK。

ls -l /dev/serial*
lrwxrwxrwx 1 root root 7  4月 24 13:17 /dev/serial0 -> ttyAMA0
lrwxrwxrwx 1 root root 5  4月 24 13:17 /dev/serial1 -> ttyS0

CO2センサー(MH-Z14B)の配線

次にCO2センサー(MH-Z14B)をラズバイに配線して接続する。MH-Z14Bの公式のデータシートが以下のリンクにある。

MH-Z14Bのデータシートより抜粋

利用するのが2、13、14、15ピンなので、これをラズパイのピンに接続していく。ラズパイ側のピンとCO2センサー側のピンの対応関係を以下に示す。

ラズパイ側 CO2センサー側 用途
2 15 Vin
6 2 GND
8 14 ラズパイからCO2センサーへのデータの送信
10 13 CO2センサーからラズパイへのデータの送信

配線後はこんな感じ。ハンダゴテがなかったので家にある物置台に固定した。

配線して設置した例

3. CO2データを収集するクライアントの作成

次に、先程配線処理したラズパイのCO2センサーから二酸化炭素濃度を取得し、そのデータをCloud Pub/Subに送信するクライアントを作成する。

二酸化炭素濃度を収集するクライアントのソースコード

${プロジェクトID}${トピック名}の箇所には、それぞれCloud Pub/Subで作成したプロジェクト名と、トピック名を入力する。

import argparse
import datetime as dt
import time
from logging import getLogger

import serial
from google.cloud import pubsub_v1
from pytz import timezone


class PublisherFacade(object):
    def __init__(self, project_id: str, topic_id: str):
        """GCPのPub/Subクライアントに接続しPublishするクラス

        Args:
            project_id ([str]): トピックが存在するプロジェクトのプロジェクトID
            topic_id ([str]): データを送信する対象のトピックのトピックID
        """
        self.publisher = pubsub_v1.PublisherClient()
        self.topic_path = self.publisher.topic_path(project_id, topic_id)

    def publish(self, data: list) -> list:
        """データをPubSubに送信する処理

        Args:
            data (list): 送信するデータの配列。

        Returns:
            list: [description]
        """
        response_ary = []

        if type(data) != list:
            # 入力値がlist出ない場合は処理を終了
            print("data most be list. ")
            exit()

        for d in data:
            # Data must be a bytestring
            d = str(d).encode("utf-8")
            response_ary.append(self.publisher.publish(self.topic_path, d).result())

        return response_ary


def setup(target_tty="/dev/ttyAMA0", timeout=10.0, sleep_time: int = 30):
    """指定したデバイスとのシリアル接続を開始し、シリアルオブジェクトを返す。

    Args:
        target_tty (str, optional): 取得するセンサーデバイスのtty. Defaults to "/dev/ttyAMA0".
        timeout (float, optional): 接続の際に何秒でタイムアウトとするか. Defaults to 10.0.
        sleep_time (int, optional): デバイスの起動の際に何秒待つか。短すぎると起動に失敗する。. Defaults to 30.
    """
    s = serial.Serial(
        target_tty,
        baudrate=9600,
        bytesize=serial.EIGHTBITS,
        parity=serial.PARITY_NONE,
        stopbits=serial.STOPBITS_ONE,
        timeout=timeout,
    )
    print(f"起動処理のため{sleep_time}秒待機します。")
    time.sleep(sleep_time)
    return s


def get_co2data_from_sensor(
    target_tty="/dev/ttyAMA0",
) -> int:
    """センサーとシリアル通信を行い、二酸化炭素濃度を取得する。

    Args:
        target_tty (str, optional): 取得するセンサーデバイスのtty. Defaults to "/dev/ttyAMA0".

    Returns:
        int: 取得した二酸化炭素濃度。
    """
    # シリアル接続の開始
    s = setup(target_tty)
    b = bytearray([0xFF, 0x01, 0x86, 0x00, 0x00, 0x00, 0x00, 0x00, 0x79])
    s.write(b)

    result = s.read(9)

    # 返り値が正常かどうかを判定
    try:
        checksum = (
            0xFF
            - (
                (
                    (result[1])
                    + (result[2])
                    + (result[3])
                    + (result[4])
                    + (result[5])
                    + (result[6])
                    + (result[7])
                )
                % 256
            )
        ) + 0x01
        if checksum == (result[8]):  # チェックサムを確認して異なっていたら、不正なデータとみなす。
            co2_value: int = str(((result[2]) * 256) + (result[3]))
            print(f"co2濃度: {co2_value}")
            return int(co2_value)
        else:
            raise Exception("センサーから取得したチェックサムに誤りがありました。")
    except IndexError as ie:
        print("センサーからデータを取得できません。接続を確認してください。", ie)
    except Exception as e:
        print(e)


def collect_to_topic(
    target_tty: str = "/dev/ttyAMA0", room_name: str = "home", mock_mode: bool = False
):
    """Co2センサーからデータを取得する。

    Args:
        target_tty (str, optional): 取得するセンサーデバイスのtty.. Defaults to "/dev/ttyAMA0".
        room_name (str, optional): デバイスが設置されているルーム名。送信するデータに不要する。. Defaults to "home".
        mock_mode (bool, optional): デバイスと接続はせずにそれ以外の動作だけ確認する。. Defaults to False.
    """

    try:
        # mockモードの時は試験用に600を返す。
        co2_response_data = (
            get_co2data_from_sensor(target_tty) if not mock_mode else 600
        )
        if co2_response_data is None:
            print("センサーからデータの取得に失敗しました。", co2_response_data)
            exit()
        room_co2_info = {
            "name": room_name,
            "co2_value": co2_response_data,
            "request_at": (
                timezone("Asia/Tokyo")
                .localize(dt.datetime.now())
                .strftime("%Y-%m-%dT%H:%M:%SZ")
            ),
        }
        print(f"Cloud Pub/Subに送信するデータ: {room_co2_info}")
        # Cloud Pub/Subに送信
        if mock_mode:
            print("Mockモードなので、データの送信を行いません。")
        else:
            publisher.publish([room_co2_info])
    except Exception as e:
        print("データの更新処理に失敗しました。", e)
        exit()


if __name__ == "__main__":
    logger = getLogger(__name__)

    # 引数の設定
    parser = argparse.ArgumentParser()

    parser.add_argument(
        "--target_tty",
        help="端末に接続されているセンサーのパス。",
        default="/dev/ttyAMA0",
    )
    parser.add_argument(
        "--room_name",
        help="センサーが設置してある場所。データに付与する。",
        default="living",
    )
    parser.add_argument(
        "--mock_mode",
        help="Mockを利用する場合このオプションを付与することで、データがCloudPubSubに送信されなくする。",
        action="store_true",
    )
    args = parser.parse_args()

    # Publisher Clientの初期化
    publisher = PublisherFacade("gotg-324412", "home_carbondioxide")
    collect_to_topic(
        target_tty=args.target_tty, room_name=args.room_name, mock_mode=args.mock_mode
    )

実行する際は、Cloud Pub/Subからダウンロードしたシークレット情報pubsub-credential.jsonのパスを、以下のように環境変数に設定する。

export GOOGLE_APPLICATION_CREDENTIALS=pubsub-credential.json

その後以下のコマンドを実行することでCO2のセンサーデータがCloud Pub/Subに送信される。

$ python publish.py
co2濃度: '556'
Cloud Pub/Subに送信するデータ: {'co2_value': 556, 'name': 'living', 'request_at': '2022-05-07T16:20:36Z'}

配線が正しく行われている場合、実行すると上記のようにCO2センサーで二酸化炭素濃度を取得できる。これをcronなどで定期実行するようにする事で、定期的に部屋の二酸化炭素濃度を取得できる。

4. 収集したデータをMongoDBに保存するクライアントの作成

次に、Cloud Pub/Subからデータを取得しMongoDBに保存するサブスクライブワーカーの作成を行う。これは過去の記事で作成したものと全く同じなので詳細は割愛する。

サブスクライブワーカーのソースコード

まず、MongoDBと接続する為のライブラリをインストールする。

pip install pymongo

その後、subscribe.pyを作成し、以下のコードを記載する。${プロジェクトID}${サブスクリプション名}の箇所にはそれぞれCloud Pub/Subで作成したプロジェクトIDと、サブスクリプション名を入力する。${保存するデータベース名}${保存するコレクション名}はMongoDBの保存する先の情報を記載する。

import argparse
import json

from google.api_core import retry
from google.cloud import pubsub_v1
from pymongo import MongoClient


class MongoRepository(object):
    def __init__(
        self, db_name, collection_name, username, password, host="localhost", port=27017
    ):
        _client = MongoClient(
            "mongodb://%s:%s@%s:%d" % (username, password, host, port)
        )
        _db = _client[db_name]
        # 認証情報を付与
        _db.add_user(
            username,
            password,
            roles=[
                {
                    "role": "dbAdmin",
                    "db": db_name,
                },
            ],
        )
 
        self.collection = _db[collection_name]
 
    def insert_list(self, insert_list):
        return self.collection.insert_many(insert_list)
 
 
class SubscriberFacade(object):
    def __init__(self, project_id: str, debug_mode: bool = False):
        """GCPのPub/Subクライアントに接続し、データをPullするクラス
 
        Args:
            project_id ([str]): トピックが存在するプロジェクトIDのリスト
            debug_mode()
        """
        self.subscriber = pubsub_v1.SubscriberClient()
        self.project_id = project_id
        self.subscriptions_list = []
        self.ack_ids = []
        self.subscription_path = None
        self.debug_mode = debug_mode
 
    def _init_subscriber(self):
        self.subscriber = pubsub_v1.SubscriberClient()
 
    def list_subscriptions_in_project(self) -> list:
        """project_idに所属するサブスクリプションの一覧を取得する。
        参考URL
        https://github.com/googleapis/python-pubsub/blob/main/samples/snippets/subscriber.py
        """
        project_path = f"projects/{self.project_id}"
 
        self.subscriptions_list = [
            sl.name
            for sl in self.subscriber.list_subscriptions(
                request={"project": project_path}
            )
        ]
        return self.subscriptions_list
 
    def pull(self, subscription_id: str, max_messages: int = 10) -> list:
        """指定したSubscribeからデータをPullする
 
        Args:
            subscription_id (str): 対象のサブスクリプションID
            max_messages (int): pullする最大のメッセージ数
 
        Returns:
            list: [description]
        """
        self._init_subscriber()
        self.ack_ids = []
 
        self.subscription_path = self.subscriber.subscription_path(
            self.project_id, subscription_id
        )
 
        with self.subscriber:
            response = self.subscriber.pull(
                request={
                    "subscription": self.subscription_path,
                    "max_messages": max_messages,
                },
                retry=retry.Retry(deadline=300),
            )
 
            for received_message in response.received_messages:
                self.ack_ids.append(received_message.ack_id)
 
            print(
                f"Received and acknowledged {len(response.received_messages)} messages from {self.subscription_path}."
            )
 
        return response.received_messages
 
    def send_ack(self):
        """直前のPull or Subscribe処理に関してAckを返す
        該当の処理が実行されていない場合は何もせず終了する。
        """
        if not self.subscription_path:
            print("直前のSubscribeが見つかりません。")
            return
        self._init_subscriber()
        if not self.debug_mode and not self.ack_ids == []:
            print("Ackを返します。 ack_ids: {}".format(self.ack_ids))
            with self.subscriber:
                self.subscriber.acknowledge(
                    request={
                        "subscription": self.subscription_path,
                        "ack_ids": self.ack_ids,
                    }
                )
 
 
def bytes_to_json(input_bytes: bytes) -> dict:
    """bytes形式のJsonをJsonの辞書配列に変換する
 """
    converted_byte = (
        input_bytes.decode("utf8")
        .replace("None", '"None"')
        .replace("True", "true")
        .replace("False", "false")
        .replace("'", '"')
    )
    print(converted_byte)
    return json.loads(converted_byte)
 
 
def main(args) -> None:
    """PubSubから同期的にメッセージを取得する    """
 
    # Subscriberの初期化
    subscriber = SubscriberFacade(project_id=" ${プロジェクトID}", debug_mode=args.debug_mode)
 
    subscribe_payloads_list,subscribe_messages = [], []
    try:
        # Messageの取得と整形
        subscribe_messages = subscriber.pull(
            " ${サブスクリプションID}", 10
        )
        subscribe_payloads_list = [
            bytes_to_json(x.message.data) for x in subscribe_messages
        ]
        print(subscribe_payloads_list)
    except Exception as e:
        print(e)

    # Mongo Clientの初期化
    mongo_client = MongoRepository(
        db_name="${保存するデータベース名}",
        collection_name="${保存するコレクション名}",
        username=args.mongo_username,
        password=args.mongo_password,
        host=args.mongo_host,
        port=args.mongo_port,
    )

    # データの挿入
    try:
        if subscribe_payloads_list == []:
            print("SubscriptionIdに新規メッセージは存在しませんでした。")
        else:
            mongo_client.insert_list(subscribe_payloads_list)
            subscriber.send_ack()
    except Exception as e:
        print(e)
        print(
            "保存に失敗しました。ACKを返しません。"
        )
 
 
if __name__ == "__main__":
    # 引数の設定
    parser = argparse.ArgumentParser()
 
    parser.add_argument("mongo_username", help="ユーザ名")
    parser.add_argument("mongo_password", help="パスワード")
    parser.add_argument("--mongo_host", default="localhost", help="対象のMongoDBのホスト名")
    parser.add_argument("--mongo_port", default=27017, help="対象のMongoDBのポート番号")
    parser.add_argument("--debug_mode", default=False, help="debugモードの場合はAckを返さない")
 
    args = parser.parse_args()
 
    main(args)

実行の際は、パブリッシュワーカーと同様にpubsub-credential.jsonを以下のように環境変数に設定し、コマンドを実行する。

export GOOGLE_APPLICATION_CREDENTIALS=pubsub-credential.json
python subscribe.py ${MongoDBのユーザ名} ${MongoDBのパスワード}

上記を実行する事でCloud Pub/Subからデータを取得しMongoDBに保存することができる。

5. データの可視化

上記のパブリッシュワーカーとサブスクライブワーカーを、cronで5分ごとに定期実行するようにし設定は完了。

Metabaseでグラフを作成して、5分おきの二酸化炭素濃度を取得できていることが確認できる。

5分おきの二酸化炭素濃度の推移

ここら辺の資料から、一般的に室内の二酸化炭素濃度は1000ppm以下であることが好ましいよう。なので基準値を超えたら分かるように「異常値」の波線もグラフに表示しておいた。

二酸化炭素は、少量であれば人体に影響は見られないが、濃度が高くなると、倦怠感、頭痛、耳鳴り等の症状を訴える者が多くなること、また、室内の二酸化炭素濃度は全般的な室内空気の汚染度や換気の状況を評価する1つの指標としても用いられており、二酸化炭素濃度の基準値は 1000ppm 以下と定められている。 https://www.mhlw.go.jp/content/11130500/000771215.pdf

おわりに

二酸化炭素濃度が高いとパフォーマンスに悪影響を及ぼすということで、それが可視化できて満足。次は二酸化炭素濃度が1000ppmを超えそうになると、家にあるAlexaとかにアラートを発砲できるようにしたいなあと画策中。

今回初めてラズパイの電子工作的なことを行ったが、ハンダゴテ持っていななくて、暫定で固定したかんじなのでいつかしっかり半田付けしたい。

参考

RaspberryPiとセンサーの接続に関して、以下の方の情報を参考にさせていただきました。