sey323’s blog

すーぱーえんじにあ

Cloud Pub/Subを利用したワークキューシステムの構築

はじめに

過去、センサーデータをMongoDBに保存する仕組みを作成し、日々家の温度や湿度を計測して、快適な住空間の構築に努めている。

sey323log.hatenablog.com

データを取得して保存する流れ

しかしこの仕組みでは、MongoDBサーバが落ちている時はセンサーデータを保存する処理が失敗するため、MongoDBが落ちている間のセンサーデータをロストしてしまう事象が発生した。MongoDBは自宅のサーバに構築しているため、予想外の障害(ブレーカーが落ちる、LANケーブルが切れている)が発生しやすく、落ちてしまうことが多々あった。

エラー発生時

対応策

やりたいこと

今回実現したい内容としては以下の2つ。

  1. データをロストしないこと
  2. 拡張しやすい構成であること

1は絶対として、2に関しては、現在、二酸化炭素濃度やGoogleFitのデータも同様に収集したいと考えており、それらを構築する際にMongoDBに書き込む処理を何回も実装するのは効率的でないので、共通化できるようにしたいと思っている。また今後、MongoDBにデータを保存するのみでなく、例えば温度の値がある閾値以上であればSlackに通知がいくようにするとか、データを保存だけではなく活用できるようにもしたいと考えている。

そういったときに、拡張が簡単にできるようなシステムの構成を検討したい、と言うのが「拡張しやすい構成」の目的となる。

ワークキューシステムの採用

これらの対応策として、ワークキューシステムを採用する。ワークキューシステムとは、以下のような構成を持つシステムである。

ワークキューシステムには、処理されるワークアイテム(タスク)のかたまり(バッチ)があります。それぞれのワークアイテムは他のワークアイテムからは独立していて、処理する際に相互のやり取りはありません。ワークキューシステムのゴールは通常、一定時間内にそれぞれのワークアイテムを実行することです。ワーカは、ワークアイテムが処理されるよう、スケールアップしたりスケールダウンしたりします。10章 ワークキューシステム - 分散システムデザインパターン ―コンテナを使ったスケーラブルなサービスの設計 [Book]

今回、ワークアイテムをセンサーデータ、ワークアイテムをキューに送信するシステムをパブリッシュワーカー、ワークアイテムをキューからMongoDBに保存するシステムをサブスクライブワーカーとし、以下の構成を構築する。

ワークキューシステムを採用した改善案

この構成をとることによるメリットは、以下の3つ。

  • MongoDBが落ちていてもデータはキューサーバに蓄積されるため、データロストの可能性がない
  • NatureRemoのデータを他の用途に利用したくなった場合に、拡張が容易
  • 他のセンサーデータを収集したくなった場合、MongoDBに保存する処理は存在しているので同じ処理を複数回作成する必要がない

この構成の場合、キューシステムが常に稼働している前提となるので、稼働率が高いキューサーバーを利用する必要があるが、稼働率が高いキューサーバーを自前で構築することは非常に難しい。そこでキューサーバーを、クラウドサービスのGCPが提供するCloud Pub/Subを採用することとした。ほかのキューサーバーと比較したわけではないが、一ヶ月10GBまでの利用が無料なのと、今までGCPを利用したことがなかったため、チャレンジの意も込めて採用することした。

実装

キューサーバーの構築

キューサーバーは前述したとおり、GCPのCloud Pub/Subを利用する。Cloud Pub/Subは、トピックとサブスクリプションを作成するだけで、キューサーバーを利用可能である。トピックとサブスクリプションの作成に関する説明は、他の方の記事や公式ドキュメントに譲る。

cloud.google.com

トピックとサブスクリプションの作成後、クレデンシャル情報をダウンロードし、pubsub-credential.jsonとして保存する。

Cloud Pub/SubをAPIから利用するために、以下のライブラリをインストールする。

pip install google-cloud-pubsub==2.7.1

ライブラリの利用方法はGoogleが公式に公開している、以下のリポジトリのサンプルコードが非常に分かりやすい。追加で他の機能を実装したい場合は、こちらを参考にすると良い。

github.com

パブリッシュワーカー

はじめに、NatureRemoからCloud Pub/Subにデータを送信するパブリッシュワーカーを作成する。処理の流れは、NatureRemoからデータを取得し、取得したデータを整形後、Cloud Pub/Subのトピックに送信するというものである。

パブリッシュワーカーの実装

データを取得した時間を記録するために利用するpytzをインストールする。

pip install pytz

その後、publish.pyを作成し、以下のコードを記載する。${プロジェクトID}${トピック名}の箇所には、それぞれCloud Pub/Subで作成したプロジェクト名と、トピック名を入力する。

import argparse
import datetime as dt

import requests
from google.cloud import pubsub_v1
from pytz import timezone


class PublisherFacade(object):
    def __init__(self, project_id: str, topic_id: str):
        """GCPのPub/Subクライアントに接続しPublishするクラス
 
        Args:
            project_id ([str]): トピックが存在するプロジェクトのプロジェクトID
            topic_id ([str]): データを送信する対象のトピックのトピックID
        """
        self.publisher = pubsub_v1.PublisherClient()
        self.topic_path = self.publisher.topic_path(project_id, topic_id)
 
    def publish(self, data: list) -> list:
        """データをPubSubに送信する処理
 
        Args:
            data (list): 送信するデータの配列。
 
        Returns:
            list: [description]
        """
        response_ary = []
 
        if type(data) != list:
            # 入力値がlist出ない場合は処理を終了
            print("data most be list. ")
            exit()
 
        for d in data:
            # Data must be a bytestring
            d = str(d).encode("utf-8")
            response_ary.append(self.publisher.publish(self.topic_path, d).result())
 
        return response_ary

def get_room_info(api_token: str) -> {}:
    """部屋の温度の情報を取得する。
    Args:
        api_key (str): NatureRemoから取得したAPIToken
    """
    headers = {
        "accept": "application/json",
        "Authorization": "Bearer " + api_token,
    }
    response = requests.get("https://api.nature.global/1/devices", headers=headers)
    return response.json()
 
 
def collect_to_topic(api_token: str, publisher):
    """NatureRemoからデータを取得し、GooglePubSubに送信する。
    Args:
        api_token (str): NatureRemoのHPで取得したAPIToken
    """
 
    room_info_list = []
    for room_info in get_room_info(api_token):
        request_at_date = (
            timezone("Asia/Tokyo")
            .localize(dt.datetime.now())
            .strftime("%Y-%m-%dT%H:%M:%SZ")
        )
        tmp_room_info = {
            "name": room_info["name"],
            "newest_events": room_info["newest_events"],
            "updated_at": room_info["updated_at"],
            "request_at": request_at_date,
        }
        room_info_list.append(tmp_room_info)
    print(room_info_list)
 
    # GooglePubSubに送信
    publisher.publish(room_info_list)
 
 
if __name__ == "__main__":
    # 引数の設定
    parser = argparse.ArgumentParser()
 
    parser.add_argument("api_token", help="NatureRemoから取得したAPIキー")
 
    args = parser.parse_args()
 
    # Publisher Clientの初期化
    publisher = PublisherFacade("${プロジェクトID}", "${トピック名}")
    collect_to_topic(api_token=args.api_token, publisher=publisher)

実行する際は、先ほどCloud Pub/Subからダウンロードしたpubsub-credential.jsonを、以下のように環境変数に設定する。

export GOOGLE_APPLICATION_CREDENTIALS=pubsub-credential.json

その後以下のコマンドを実行しすることで、NatureRemoのセンサーデータをCloud Pub/Subに送信する。

python publish.py ${NatureRemoから取得したAPIキー}

サブスクライブワーカー

次に、Cloud Pub/Subからデータを取得しMongoDBに保存する、サブスクライブワーカーの作成を行う。

サブスクライブワーカーの実装

まず、MongoDBと接続する為のライブラリをインストールする。

pip install pymongo

その後、subscribe.pyを作成し、以下のコードを記載する。${プロジェクトID}${サブスクリプション名}の箇所にはそれぞれCloud Pub/Subで作成したプロジェクトIDと、サブスクリプション名を入力する。${保存するデータベース名}${保存するコレクション名}はMongoDBの保存する先の情報を記載する。

import argparse
import json

from google.api_core import retry
from google.cloud import pubsub_v1
from pymongo import MongoClient


class MongoRepository(object):
    def __init__(
        self, db_name, collection_name, username, password, host="localhost", port=27017
    ):
        _client = MongoClient(
            "mongodb://%s:%s@%s:%d" % (username, password, host, port)
        )
        _db = _client[db_name]
        # 認証情報を付与
        _db.add_user(
            username,
            password,
            roles=[
                {
                    "role": "dbAdmin",
                    "db": db_name,
                },
            ],
        )
 
        self.collection = _db[collection_name]
 
    def insert_list(self, insert_list):
        return self.collection.insert_many(insert_list)
 
 
class SubscriberFacade(object):
    def __init__(self, project_id: str, debug_mode: bool = False):
        """GCPのPub/Subクライアントに接続し、データをPullするクラス
 
        Args:
            project_id ([str]): トピックが存在するプロジェクトIDのリスト
            debug_mode()
        """
        self.subscriber = pubsub_v1.SubscriberClient()
        self.project_id = project_id
        self.subscriptions_list = []
        self.ack_ids = []
        self.subscription_path = None
        self.debug_mode = debug_mode
 
    def _init_subscriber(self):
        self.subscriber = pubsub_v1.SubscriberClient()
 
    def pull(self, subscription_id: str, max_messages: int = 10) -> list:
        """指定したSubscribeからデータをPullする
 
        Args:
            subscription_id (str): 対象のサブスクリプションID
            max_messages (int): pullする最大のメッセージ数
 
        Returns:
            list: [description]
        """
        self._init_subscriber()
        self.ack_ids = []
 
        self.subscription_path = self.subscriber.subscription_path(
            self.project_id, subscription_id
        )
 
        with self.subscriber:
            response = self.subscriber.pull(
                request={
                    "subscription": self.subscription_path,
                    "max_messages": max_messages,
                },
                retry=retry.Retry(deadline=300),
            )
 
            for received_message in response.received_messages:
                self.ack_ids.append(received_message.ack_id)
 
            print(
                f"Received and acknowledged {len(response.received_messages)} messages from {self.subscription_path}."
            )
 
        return response.received_messages
 
    def send_ack(self):
        """直前のPull or Subscribe処理に関してAckを返す
        該当の処理が実行されていない場合は何もせず終了する。
        """
        if not self.subscription_path:
            print("直前のSubscribeが見つかりません。")
            return
        self._init_subscriber()
        if not self.debug_mode and not self.ack_ids == []:
            print("Ackを返します。 ack_ids: {}".format(self.ack_ids))
            with self.subscriber:
                self.subscriber.acknowledge(
                    request={
                        "subscription": self.subscription_path,
                        "ack_ids": self.ack_ids,
                    }
                )
 
 
def bytes_to_json(input_bytes: bytes) -> dict:
    """bytes形式のJsonをJsonの辞書配列に変換する
 """
    converted_byte = (
        input_bytes.decode("utf8")
        .replace("None", '"None"')
        .replace("True", "true")
        .replace("False", "false")
        .replace("'", '"')
    )
    print(converted_byte)
    return json.loads(converted_byte)
 
 
def main(args) -> None:
    """PubSubから同期的にメッセージを取得する    """
 
    # Subscriberの初期化
    subscriber = SubscriberFacade(project_id=" ${プロジェクトID}", debug_mode=args.debug_mode)
 
    subscribe_payloads_list,subscribe_messages = [], []
    try:
        # Messageの取得と整形
        subscribe_messages = subscriber.pull(
            " ${サブスクリプションID}", 10
        )
        subscribe_payloads_list = [
            bytes_to_json(x.message.data) for x in subscribe_messages
        ]
        print(subscribe_payloads_list)
    except Exception as e:
        print(e)

    # Mongo Clientの初期化
    mongo_client = MongoRepository(
        db_name="${保存するデータベース名}",
        collection_name="${保存するコレクション名}",
        username=args.mongo_username,
        password=args.mongo_password,
        host=args.mongo_host,
        port=args.mongo_port,
    )

    # データの挿入
    try:
        if subscribe_payloads_list == []:
            print("SubscriptionIdに新規メッセージは存在しませんでした。")
        else:
            mongo_client.insert_list(subscribe_payloads_list)
            subscriber.send_ack()
    except Exception as e:
        print(e)
        print(
            "保存に失敗しました。ACKを返しません。"
        )
 
 
if __name__ == "__main__":
    # 引数の設定
    parser = argparse.ArgumentParser()
 
    parser.add_argument("mongo_username", help="ユーザ名")
    parser.add_argument("mongo_password", help="パスワード")
    parser.add_argument("--mongo_host", default="localhost", help="対象のMongoDBのホスト名")
    parser.add_argument("--mongo_port", default=27017, help="対象のMongoDBのポート番号")
    parser.add_argument("--debug_mode", default=False, help="debugモードの場合はAckを返さない")
 
    args = parser.parse_args()
 
    main(args)

処理の流れは、データをキューサーバからサブスクライブし、そのデータをMongoDBに保存するだけである。しかし、181~193行目の以下の箇所にデータをロストしない工夫をしてある。

        try:
            if subscribe_payloads_list == []:
                print("SubscriptionIdに新規メッセージは存在しませんでした。")
                continue
            else:
                mongo_client.insert_list(subscribe_payloads_list)
                subscriber.send_ack()
        except Exception as e:
            print(e)
            print(
                "保存に失敗しました。ACKを返しません。"
            )

上記のコードでは、Cloud Pub/Subからデータをサブスクライブし、MongoDBへデータの保存が完了した場合にAckを返し、保存処理に失敗した場合はAckを返さないようにしている。

Ackとは、受け取り確認のことで、Cloud Pub/Subは、Ackが返されたデータは配信済みのデータとみなし、キュー上から削除する。そのため、例えば、Cloud Pub/Subからデータを受け取ったタイミングでAckを返した後、何らかのエラーが発生し、その後の処理が失敗したとする。その後、次に処理をする際に、エラーが発生した時刻のデータは、Ackが返されてすでにキューサーバ上に存在しないため、その時刻のデータをロストするという現象が発生する。

そこで、MongoDBにデータの保存が完了した場合にのみAckを返すことで、MongoDBに保存できない場合は、Ackが返されず、キューサーバにデータが残り続けるため、データがロストする心配がなくなるということである。

実行の際は、パブリッシュワーカーと同様にpubsub-credential.jsonを以下のように環境変数に設定し、コマンドを実行する。

export GOOGLE_APPLICATION_CREDENTIALS=pubsub-credential.json
python subscribe.py ${MongoDBのユーザ名} ${MongoDBのパスワード}

定期実行の設定と可視化

上記のパブリッシュワーカーとサブスクライブワーカーを、cronで定期実行するようにし、設定は完了。

前回構築したMetabaseでグラフを作成したところ直近のデータが表示されたので、データを問題なく収集できていることが確認できた。

保存されたセンサーデータをMetabaseで可視化

おわりに

Cloud Pub/Subを利用したワークキューシステムを構築できた。これによりMongoDBが落ちていてもデータはキューに保存されるし、データを他のアプリケーションで利用したくなった時も、キューからデータを取得できるので簡単に拡張ができそう。マイクロサービスいいですね!

参考にした書籍

本記事は、下記の「分散システムデザインパターン」の「第Ⅲ部 バッチ処理パターン」を参考に実装した。この本では、マイクロサービスに関連する基本的な用語から、構築パターンのサンプルコードまで掲載されており、非常に理解がしやすい。マイクロサービスに興味がある方は、是非手にとって確認いただきたい。

温度や湿度を取得するために利用しているセンサーは、NatureRemoを利用している。NatureRemoは、温度や湿度の計測以外にも、家電の操作ができ、更にAlexaなどの音声スピーカーと連携することで、音声で家電を操作できる。