sey323’s blog

にわかシステムエンジニア兼にわかデータサイエンティストです。

RaspberryPi+CO2センサー(MH-Z14B)でCO2データをPythonで収集し可視化する

はじめに

以下の記事で、家の温度や湿度を取得していたが、ついでに二酸化炭素濃度も取得したいと思いラズパイとCO2センサを利用し収集する環境を構築した。

sey323log.hatenablog.com

Amazonで販売されているCO2センサーを見ると、コロナ需要で様々なメーカから出品されており、精度、APIの使いやすさ、メンテナンス等を考慮すると、どれ購入すれば良いかわからない。「よくわからない精度のものので曖昧なデータを取るくらいだったら自作してしまおう」と思い、今回はCO2を取得するセンサーをラズパイで自作した。結果、10,000円程度の出費でCO2センサーを作成し、そのCO2濃度を操作できる環境を構築できた。

実装

二酸化炭素濃度の収集の全体構成は以下の通り。まず、二酸化炭素濃度を収集しMongoDBに保存する。その後、保存したデータをOSSのBIツールのMetabaseで可視化する。

CO2収集環境の全体像

Cloud Pub/Subからデータを取得しMongoDBに保存する処理(図中②)はこちらの記事で作成したサブスクライブワーカーをそのまま利用する。今回はCO2センサーの作成と取得したセンサーデータをCloud Pub/Subに送信するパブリッシュワーカー(図中①)の作成を行う。

1. 前準備

可視化環境(MongoDB+Metabase)の構築

収集したデータを保存して可視化する環境としてMongoDBとMetabaseを用意する。この環境は下記の記事で作成したdocker-composeを利用して構築する。

sey323log.hatenablog.com

MongoDBとMetabaseの起動のコマンドを、以下に示す。

docker-compose up -d

起動が完了後それぞれ以下のポートでアクセスが可能となる。

またMongoDBのユーザ名(mongodb_user)とパスワード(mongodb_pass)は後のサブスクライブワーカーで利用する。

利用するdocker-compose.ymlのソースコード

services:
    mongodb:
      image: mongo
      container_name: mongodb
      restart: always
      environment:
        MONGO_INITDB_ROOT_USERNAME: mongodb_user
        MONGO_INITDB_ROOT_PASSWORD: mongodb_pass
      ports:
        - 27017:27017
      volumes:
        - ./mongo/db:/data/db
        - ./mongo/configdb:/data/configdb

    metabase:
        image: metabase/metabase:v0.33.5
        container_name: metabase
        ports:
            - 3333:3000
        volumes:
            - ./metabase/data:/mnt/data

2. RaspberryPi+CO2センサ(MH-Z14B)の接続

ラズパイの初期設定などは他のサイトを参照。

利用した機材

  • Raspberry Pi 3 model B+
  • MH-Z14B(CO2センサ)
  • ジャンパワイヤ(オス-メス) 
  • ピンソケット(メス、2.54 mm)

CO2センサは以下のものを利用した。

Amanzonのページでは MH-Z14Aと記載があったが、購入して届いたものの型番は MH-Z14Bであった。秋月電子のページで検索してもMH-Z14Aの型番のものがヒットしないので、そちらは販売が終了したのかもしれない。

ラズパイのUART通信の有効化

ラズパイで配線を行いシリアル通信を行うためには、ラズパイのUARTを有効化する必要がある。UARTの有効化は以下のリンクのいずれかを参照。

qiita.com

qiita.com

最終的にラズパイ上で以下のコマンドを実行し、それぞれが以下のように割り当てられていればOK。

ls -l /dev/serial*
lrwxrwxrwx 1 root root 7  4月 24 13:17 /dev/serial0 -> ttyAMA0
lrwxrwxrwx 1 root root 5  4月 24 13:17 /dev/serial1 -> ttyS0

CO2センサー(MH-Z14B)の配線

次にCO2センサー(MH-Z14B)をラズバイに配線して接続する。MH-Z14Bの公式のデータシートが以下のリンクにある。

MH-Z14Bのデータシートより抜粋

利用するのが2、13、14、15ピンなので、これをラズパイのピンに接続していく。ラズパイ側のピンとCO2センサー側のピンの対応関係を以下に示す。

ラズパイ側 CO2センサー側 用途
2 15 Vin
6 2 GND
8 14 ラズパイからCO2センサーへのデータの送信
10 13 CO2センサーからラズパイへのデータの送信

配線後はこんな感じ。ハンダゴテがなかったので家にある物置台に固定した。

配線して設置した例

3. CO2データを収集するクライアントの作成

次に、先程配線処理したラズパイのCO2センサーから二酸化炭素濃度を取得し、そのデータをCloud Pub/Subに送信するクライアントを作成する。

二酸化炭素濃度を収集するクライアントのソースコード

${プロジェクトID}${トピック名}の箇所には、それぞれCloud Pub/Subで作成したプロジェクト名と、トピック名を入力する。

import argparse
import datetime as dt
import time
from logging import getLogger

import serial
from google.cloud import pubsub_v1
from pytz import timezone


class PublisherFacade(object):
    def __init__(self, project_id: str, topic_id: str):
        """GCPのPub/Subクライアントに接続しPublishするクラス

        Args:
            project_id ([str]): トピックが存在するプロジェクトのプロジェクトID
            topic_id ([str]): データを送信する対象のトピックのトピックID
        """
        self.publisher = pubsub_v1.PublisherClient()
        self.topic_path = self.publisher.topic_path(project_id, topic_id)

    def publish(self, data: list) -> list:
        """データをPubSubに送信する処理

        Args:
            data (list): 送信するデータの配列。

        Returns:
            list: [description]
        """
        response_ary = []

        if type(data) != list:
            # 入力値がlist出ない場合は処理を終了
            print("data most be list. ")
            exit()

        for d in data:
            # Data must be a bytestring
            d = str(d).encode("utf-8")
            response_ary.append(self.publisher.publish(self.topic_path, d).result())

        return response_ary


def setup(target_tty="/dev/ttyAMA0", timeout=10.0, sleep_time: int = 30):
    """指定したデバイスとのシリアル接続を開始し、シリアルオブジェクトを返す。

    Args:
        target_tty (str, optional): 取得するセンサーデバイスのtty. Defaults to "/dev/ttyAMA0".
        timeout (float, optional): 接続の際に何秒でタイムアウトとするか. Defaults to 10.0.
        sleep_time (int, optional): デバイスの起動の際に何秒待つか。短すぎると起動に失敗する。. Defaults to 30.
    """
    s = serial.Serial(
        target_tty,
        baudrate=9600,
        bytesize=serial.EIGHTBITS,
        parity=serial.PARITY_NONE,
        stopbits=serial.STOPBITS_ONE,
        timeout=timeout,
    )
    print(f"起動処理のため{sleep_time}秒待機します。")
    time.sleep(sleep_time)
    return s


def get_co2data_from_sensor(
    target_tty="/dev/ttyAMA0",
) -> int:
    """センサーとシリアル通信を行い、二酸化炭素濃度を取得する。

    Args:
        target_tty (str, optional): 取得するセンサーデバイスのtty. Defaults to "/dev/ttyAMA0".

    Returns:
        int: 取得した二酸化炭素濃度。
    """
    # シリアル接続の開始
    s = setup(target_tty)
    b = bytearray([0xFF, 0x01, 0x86, 0x00, 0x00, 0x00, 0x00, 0x00, 0x79])
    s.write(b)

    result = s.read(9)

    # 返り値が正常かどうかを判定
    try:
        checksum = (
            0xFF
            - (
                (
                    (result[1])
                    + (result[2])
                    + (result[3])
                    + (result[4])
                    + (result[5])
                    + (result[6])
                    + (result[7])
                )
                % 256
            )
        ) + 0x01
        if checksum == (result[8]):  # チェックサムを確認して異なっていたら、不正なデータとみなす。
            co2_value: int = str(((result[2]) * 256) + (result[3]))
            print(f"co2濃度: {co2_value}")
            return int(co2_value)
        else:
            raise Exception("センサーから取得したチェックサムに誤りがありました。")
    except IndexError as ie:
        print("センサーからデータを取得できません。接続を確認してください。", ie)
    except Exception as e:
        print(e)


def collect_to_topic(
    target_tty: str = "/dev/ttyAMA0", room_name: str = "home", mock_mode: bool = False
):
    """Co2センサーからデータを取得する。

    Args:
        target_tty (str, optional): 取得するセンサーデバイスのtty.. Defaults to "/dev/ttyAMA0".
        room_name (str, optional): デバイスが設置されているルーム名。送信するデータに不要する。. Defaults to "home".
        mock_mode (bool, optional): デバイスと接続はせずにそれ以外の動作だけ確認する。. Defaults to False.
    """

    try:
        # mockモードの時は試験用に600を返す。
        co2_response_data = (
            get_co2data_from_sensor(target_tty) if not mock_mode else 600
        )
        if co2_response_data is None:
            print("センサーからデータの取得に失敗しました。", co2_response_data)
            exit()
        room_co2_info = {
            "name": room_name,
            "co2_value": co2_response_data,
            "request_at": (
                timezone("Asia/Tokyo")
                .localize(dt.datetime.now())
                .strftime("%Y-%m-%dT%H:%M:%SZ")
            ),
        }
        print(f"Cloud Pub/Subに送信するデータ: {room_co2_info}")
        # Cloud Pub/Subに送信
        if mock_mode:
            print("Mockモードなので、データの送信を行いません。")
        else:
            publisher.publish([room_co2_info])
    except Exception as e:
        print("データの更新処理に失敗しました。", e)
        exit()


if __name__ == "__main__":
    logger = getLogger(__name__)

    # 引数の設定
    parser = argparse.ArgumentParser()

    parser.add_argument(
        "--target_tty",
        help="端末に接続されているセンサーのパス。",
        default="/dev/ttyAMA0",
    )
    parser.add_argument(
        "--room_name",
        help="センサーが設置してある場所。データに付与する。",
        default="living",
    )
    parser.add_argument(
        "--mock_mode",
        help="Mockを利用する場合このオプションを付与することで、データがCloudPubSubに送信されなくする。",
        action="store_true",
    )
    args = parser.parse_args()

    # Publisher Clientの初期化
    publisher = PublisherFacade("gotg-324412", "home_carbondioxide")
    collect_to_topic(
        target_tty=args.target_tty, room_name=args.room_name, mock_mode=args.mock_mode
    )

実行する際は、Cloud Pub/Subからダウンロードしたシークレット情報pubsub-credential.jsonのパスを、以下のように環境変数に設定する。

export GOOGLE_APPLICATION_CREDENTIALS=pubsub-credential.json

その後以下のコマンドを実行することでCO2のセンサーデータがCloud Pub/Subに送信される。

$ python publish.py
co2濃度: '556'
Cloud Pub/Subに送信するデータ: {'co2_value': 556, 'name': 'living', 'request_at': '2022-05-07T16:20:36Z'}

配線が正しく行われている場合、実行すると上記のようにCO2センサーで二酸化炭素濃度を取得できる。これをcronなどで定期実行するようにする事で、定期的に部屋の二酸化炭素濃度を取得できる。

4. 収集したデータをMongoDBに保存するクライアントの作成

次に、Cloud Pub/Subからデータを取得しMongoDBに保存するサブスクライブワーカーの作成を行う。これは過去の記事で作成したものと全く同じなので詳細は割愛する。

サブスクライブワーカーのソースコード

まず、MongoDBと接続する為のライブラリをインストールする。

pip install pymongo

その後、subscribe.pyを作成し、以下のコードを記載する。${プロジェクトID}${サブスクリプション名}の箇所にはそれぞれCloud Pub/Subで作成したプロジェクトIDと、サブスクリプション名を入力する。${保存するデータベース名}${保存するコレクション名}はMongoDBの保存する先の情報を記載する。

import argparse
import json

from google.api_core import retry
from google.cloud import pubsub_v1
from pymongo import MongoClient


class MongoRepository(object):
    def __init__(
        self, db_name, collection_name, username, password, host="localhost", port=27017
    ):
        _client = MongoClient(
            "mongodb://%s:%s@%s:%d" % (username, password, host, port)
        )
        _db = _client[db_name]
        # 認証情報を付与
        _db.add_user(
            username,
            password,
            roles=[
                {
                    "role": "dbAdmin",
                    "db": db_name,
                },
            ],
        )
 
        self.collection = _db[collection_name]
 
    def insert_list(self, insert_list):
        return self.collection.insert_many(insert_list)
 
 
class SubscriberFacade(object):
    def __init__(self, project_id: str, debug_mode: bool = False):
        """GCPのPub/Subクライアントに接続し、データをPullするクラス
 
        Args:
            project_id ([str]): トピックが存在するプロジェクトIDのリスト
            debug_mode()
        """
        self.subscriber = pubsub_v1.SubscriberClient()
        self.project_id = project_id
        self.subscriptions_list = []
        self.ack_ids = []
        self.subscription_path = None
        self.debug_mode = debug_mode
 
    def _init_subscriber(self):
        self.subscriber = pubsub_v1.SubscriberClient()
 
    def list_subscriptions_in_project(self) -> list:
        """project_idに所属するサブスクリプションの一覧を取得する。
        参考URL
        https://github.com/googleapis/python-pubsub/blob/main/samples/snippets/subscriber.py
        """
        project_path = f"projects/{self.project_id}"
 
        self.subscriptions_list = [
            sl.name
            for sl in self.subscriber.list_subscriptions(
                request={"project": project_path}
            )
        ]
        return self.subscriptions_list
 
    def pull(self, subscription_id: str, max_messages: int = 10) -> list:
        """指定したSubscribeからデータをPullする
 
        Args:
            subscription_id (str): 対象のサブスクリプションID
            max_messages (int): pullする最大のメッセージ数
 
        Returns:
            list: [description]
        """
        self._init_subscriber()
        self.ack_ids = []
 
        self.subscription_path = self.subscriber.subscription_path(
            self.project_id, subscription_id
        )
 
        with self.subscriber:
            response = self.subscriber.pull(
                request={
                    "subscription": self.subscription_path,
                    "max_messages": max_messages,
                },
                retry=retry.Retry(deadline=300),
            )
 
            for received_message in response.received_messages:
                self.ack_ids.append(received_message.ack_id)
 
            print(
                f"Received and acknowledged {len(response.received_messages)} messages from {self.subscription_path}."
            )
 
        return response.received_messages
 
    def send_ack(self):
        """直前のPull or Subscribe処理に関してAckを返す
        該当の処理が実行されていない場合は何もせず終了する。
        """
        if not self.subscription_path:
            print("直前のSubscribeが見つかりません。")
            return
        self._init_subscriber()
        if not self.debug_mode and not self.ack_ids == []:
            print("Ackを返します。 ack_ids: {}".format(self.ack_ids))
            with self.subscriber:
                self.subscriber.acknowledge(
                    request={
                        "subscription": self.subscription_path,
                        "ack_ids": self.ack_ids,
                    }
                )
 
 
def bytes_to_json(input_bytes: bytes) -> dict:
    """bytes形式のJsonをJsonの辞書配列に変換する
 """
    converted_byte = (
        input_bytes.decode("utf8")
        .replace("None", '"None"')
        .replace("True", "true")
        .replace("False", "false")
        .replace("'", '"')
    )
    print(converted_byte)
    return json.loads(converted_byte)
 
 
def main(args) -> None:
    """PubSubから同期的にメッセージを取得する    """
 
    # Subscriberの初期化
    subscriber = SubscriberFacade(project_id=" ${プロジェクトID}", debug_mode=args.debug_mode)
 
    subscribe_payloads_list,subscribe_messages = [], []
    try:
        # Messageの取得と整形
        subscribe_messages = subscriber.pull(
            " ${サブスクリプションID}", 10
        )
        subscribe_payloads_list = [
            bytes_to_json(x.message.data) for x in subscribe_messages
        ]
        print(subscribe_payloads_list)
    except Exception as e:
        print(e)

    # Mongo Clientの初期化
    mongo_client = MongoRepository(
        db_name="${保存するデータベース名}",
        collection_name="${保存するコレクション名}",
        username=args.mongo_username,
        password=args.mongo_password,
        host=args.mongo_host,
        port=args.mongo_port,
    )

    # データの挿入
    try:
        if subscribe_payloads_list == []:
            print("SubscriptionIdに新規メッセージは存在しませんでした。")
        else:
            mongo_client.insert_list(subscribe_payloads_list)
            subscriber.send_ack()
    except Exception as e:
        print(e)
        print(
            "保存に失敗しました。ACKを返しません。"
        )
 
 
if __name__ == "__main__":
    # 引数の設定
    parser = argparse.ArgumentParser()
 
    parser.add_argument("mongo_username", help="ユーザ名")
    parser.add_argument("mongo_password", help="パスワード")
    parser.add_argument("--mongo_host", default="localhost", help="対象のMongoDBのホスト名")
    parser.add_argument("--mongo_port", default=27017, help="対象のMongoDBのポート番号")
    parser.add_argument("--debug_mode", default=False, help="debugモードの場合はAckを返さない")
 
    args = parser.parse_args()
 
    main(args)

実行の際は、パブリッシュワーカーと同様にpubsub-credential.jsonを以下のように環境変数に設定し、コマンドを実行する。

export GOOGLE_APPLICATION_CREDENTIALS=pubsub-credential.json
python subscribe.py ${MongoDBのユーザ名} ${MongoDBのパスワード}

上記を実行する事でCloud Pub/Subからデータを取得しMongoDBに保存することができる。

5. データの可視化

上記のパブリッシュワーカーとサブスクライブワーカーを、cronで5分ごとに定期実行するようにし設定は完了。

Metabaseでグラフを作成して、5分おきの二酸化炭素濃度を取得できていることが確認できる。

5分おきの二酸化炭素濃度の推移

ここら辺の資料から、一般的に室内の二酸化炭素濃度は1000ppm以下であることが好ましいよう。なので基準値を超えたら分かるように「異常値」の波線もグラフに表示しておいた。

二酸化炭素は、少量であれば人体に影響は見られないが、濃度が高くなると、倦怠感、頭痛、耳鳴り等の症状を訴える者が多くなること、また、室内の二酸化炭素濃度は全般的な室内空気の汚染度や換気の状況を評価する1つの指標としても用いられており、二酸化炭素濃度の基準値は 1000ppm 以下と定められている。 https://www.mhlw.go.jp/content/11130500/000771215.pdf

おわりに

二酸化炭素濃度が高いとパフォーマンスに悪影響を及ぼすということで、それが可視化できて満足。次は二酸化炭素濃度が1000ppmを超えそうになると、家にあるAlexaとかにアラートを発砲できるようにしたいなあと画策中。

今回初めてラズパイの電子工作的なことを行ったが、ハンダゴテ持っていななくて、暫定で固定したかんじなのでいつかしっかり半田付けしたい。

参考

RaspberryPiとセンサーの接続に関して、以下の方の情報を参考にさせていただきました。

マンションの無料のWiFiが安定しない時に対応したこと

はじめに

最近、家のネットワーク回線が安定しなくなった。我が家ではプロバイダを契約しておらず、マンションの居住者が無料で利用できるセイワBBを利用しているのだが、入居したての時は問題なく利用できたにもかかわらず、ここ最近になって何故か安定しなくなった。

無料な物に対して大きい声で文句は言えないが、今までは安定していたものなのでなんとかできないかと思い、原因を調査し改善を試みた。

現状

ネットワークの状態としては、良い時はYoutubeを複数端末で視聴しても問題なく動作するが、悪い時は全く接続できず、阿部寛のHPすら閲覧できないレベルである。回線速度は、早い時は100Mbpsは出るが、遅い時は0.01Mbpsも出ない、という非常に波のある状態だった。

ルータは、3年前に購入したTP-Link AC2600を利用している。

結論

結論だけ先に書くと、ルータの設定をルータモードからブリッジモードに変更することで解決した。マンションの無料WiFiが安定しない人は、ルータの設定がルータモードになっていないか、ルータモードになっていたら、ブリッジモードに変更することを試みて欲しい。

f:id:sey323:20220227212812p:plain
ブリッジモードを選択する

ルータをブリッジモードにすることで、いくつかの機能は利用できなくなる。

上記の3つは一般的な利用用途には全く影響がないが、我が家ではサーバを5台運用しており、それらのサーバをローカルDNSで名前解決していたり、QoSで優先度をコントロールしていたので、この機能が使えなくなったのは運用上めんどくさい。

名前解決できない問題に関しては、サーバ側の設定によるIPの固定化はできたので、IPを固定し、全ての端末の/etc/hostsにIPとホスト名のマッピングを記載することで無理やり解決した。

原因

ルータモードが安定しない原因は、マンションのルータと我が家のルータが2台のルータが共存しており、2重ルータ構成となっていたためであった。2重ルータの場合、通信経路が一つ増えたり、NATが2重で行われたりするため、ネットワークが不安定になりやすいとのこと。我が家のネットワークはセイワBBを利用させてもらっているのだが、HPをよく見たら 必ずルーター機能をOFF※に設定のうえ、ご利用ください。との記載が…。すみません、セイワBBさん。。

kariage.co.jp

そういえば、以下の記事でFreeIPAでDNSを構築した時に、ルータのDCHPの機能を使ってDNSの設定を各端末に自動で割り振るために、ルータの設定をブリッジモード からルータモードに変更したことを思い出した。おそらくそれが発端となり、ネットワークが不安定になっていた模様。

sey323log.hatenablog.com

2重ルータになっているかどうかは、以下のコマンドで確認できる。家のルータのIPは192.168.8.1で、マンションのルータのIPが192.168.2.254のようである。このようにtracerouteコマンドで出力される経路に192.168.*.*を持つIPが2つ出力されている場合、2重ルータ構成となっている可能性が高い。

$ traceroute 8.8.8.8
traceroute to 8.8.8.8 (8.8.8.8), 64 hops max, 52 byte packets
 1  192.168.8.1 (192.168.8.1)  4.182 ms  1.202 ms  1.176 ms
 2 192.168.2.254 (192.168.2.254)  4.182 ms  1.202 ms  1.176 ms
~~~省略~~~

2重ルータのまま利用する方法

2重ルータ構成は絶対ダメというわけではなく、2重ルータ状態でも安定した通信を実現する方法がある。しかし、親のルータ(上記の例だと192.168.2.254)を操作する権限が必要で、マンションの場合、一般の居住者に親のルータの操作権限が与えられていることはほぼない。なのでマンションタイプの場合は、下記の2重ルータの設定は、ほぼ利用できないと思って良いだろう。

www.tp-link.com

その他2重ルータのまま利用するために実施した対応策

家でどうしてもDNSを利用したく、ルータモードのままでも安定させようと以下の対策を実施したが、効果は全くなかった。

  • QoSによる優先度設定
  • IPv6の停止
  • 我が家のルータの接続方式を静的IPに変更し、動的IPの時に割り振られた情報を入力する
  • 我が家のルータのセグメントを親ルータ(192.168.2.255)と同じにする

おわりに

ブリッジモードにしてから、ネットワークが非常に安定する。無料WiFiよくない!と言われるが、我が物件に関してはそんなことはなさそう。回線を個別に契約することも考えていたため、無駄なお金を節約できて非常に満足。DNSは使えなくなったが、月3~5000円近くかけてまでDNSを実現するのはアホらしいので、これでOKとした。

公式ドキュメントはしっかり読みましょう(自戒)

参考記事

Cloud Pub/Subを利用したワークキューシステムの構築

はじめに

過去、センサーデータをMongoDBに保存する仕組みを作成し、日々家の温度や湿度を計測して、快適な住空間の構築に努めている。

sey323log.hatenablog.com

データを取得して保存する流れ

しかしこの仕組みでは、MongoDBサーバが落ちている時はセンサーデータを保存する処理が失敗するため、MongoDBが落ちている間のセンサーデータをロストしてしまう事象が発生した。MongoDBは自宅のサーバに構築しているため、予想外の障害(ブレーカーが落ちる、LANケーブルが切れている)が発生しやすく、落ちてしまうことが多々あった。

エラー発生時

対応策

やりたいこと

今回実現したい内容としては以下の2つ。

  1. データをロストしないこと
  2. 拡張しやすい構成であること

1は絶対として、2に関しては、現在、二酸化炭素濃度やGoogleFitのデータも同様に収集したいと考えており、それらを構築する際にMongoDBに書き込む処理を何回も実装するのは効率的でないので、共通化できるようにしたいと思っている。また今後、MongoDBにデータを保存するのみでなく、例えば温度の値がある閾値以上であればSlackに通知がいくようにするとか、データを保存だけではなく活用できるようにもしたいと考えている。

そういったときに、拡張が簡単にできるようなシステムの構成を検討したい、と言うのが「拡張しやすい構成」の目的となる。

ワークキューシステムの採用

これらの対応策として、ワークキューシステムを採用する。ワークキューシステムとは、以下のような構成を持つシステムである。

ワークキューシステムには、処理されるワークアイテム(タスク)のかたまり(バッチ)があります。それぞれのワークアイテムは他のワークアイテムからは独立していて、処理する際に相互のやり取りはありません。ワークキューシステムのゴールは通常、一定時間内にそれぞれのワークアイテムを実行することです。ワーカは、ワークアイテムが処理されるよう、スケールアップしたりスケールダウンしたりします。10章 ワークキューシステム - 分散システムデザインパターン ―コンテナを使ったスケーラブルなサービスの設計 [Book]

今回、ワークアイテムをセンサーデータ、ワークアイテムをキューに送信するシステムをパブリッシュワーカー、ワークアイテムをキューからMongoDBに保存するシステムをサブスクライブワーカーとし、以下の構成を構築する。

ワークキューシステムを採用した改善案

この構成をとることによるメリットは、以下の3つ。

  • MongoDBが落ちていてもデータはキューサーバに蓄積されるため、データロストの可能性がない
  • NatureRemoのデータを他の用途に利用したくなった場合に、拡張が容易
  • 他のセンサーデータを収集したくなった場合、MongoDBに保存する処理は存在しているので同じ処理を複数回作成する必要がない

この構成の場合、キューシステムが常に稼働している前提となるので、稼働率が高いキューサーバーを利用する必要があるが、稼働率が高いキューサーバーを自前で構築することは非常に難しい。そこでキューサーバーを、クラウドサービスのGCPが提供するCloud Pub/Subを採用することとした。ほかのキューサーバーと比較したわけではないが、一ヶ月10GBまでの利用が無料なのと、今までGCPを利用したことがなかったため、チャレンジの意も込めて採用することした。

実装

キューサーバーの構築

キューサーバーは前述したとおり、GCPのCloud Pub/Subを利用する。Cloud Pub/Subは、トピックとサブスクリプションを作成するだけで、キューサーバーを利用可能である。トピックとサブスクリプションの作成に関する説明は、他の方の記事や公式ドキュメントに譲る。

cloud.google.com

トピックとサブスクリプションの作成後、クレデンシャル情報をダウンロードし、pubsub-credential.jsonとして保存する。

Cloud Pub/SubをAPIから利用するために、以下のライブラリをインストールする。

pip install google-cloud-pubsub==2.7.1

ライブラリの利用方法はGoogleが公式に公開している、以下のリポジトリのサンプルコードが非常に分かりやすい。追加で他の機能を実装したい場合は、こちらを参考にすると良い。

github.com

パブリッシュワーカー

はじめに、NatureRemoからCloud Pub/Subにデータを送信するパブリッシュワーカーを作成する。処理の流れは、NatureRemoからデータを取得し、取得したデータを整形後、Cloud Pub/Subのトピックに送信するというものである。

パブリッシュワーカーの実装

データを取得した時間を記録するために利用するpytzをインストールする。

pip install pytz

その後、publish.pyを作成し、以下のコードを記載する。${プロジェクトID}${トピック名}の箇所には、それぞれCloud Pub/Subで作成したプロジェクト名と、トピック名を入力する。

import argparse
import datetime as dt

import requests
from google.cloud import pubsub_v1
from pytz import timezone


class PublisherFacade(object):
    def __init__(self, project_id: str, topic_id: str):
        """GCPのPub/Subクライアントに接続しPublishするクラス
 
        Args:
            project_id ([str]): トピックが存在するプロジェクトのプロジェクトID
            topic_id ([str]): データを送信する対象のトピックのトピックID
        """
        self.publisher = pubsub_v1.PublisherClient()
        self.topic_path = self.publisher.topic_path(project_id, topic_id)
 
    def publish(self, data: list) -> list:
        """データをPubSubに送信する処理
 
        Args:
            data (list): 送信するデータの配列。
 
        Returns:
            list: [description]
        """
        response_ary = []
 
        if type(data) != list:
            # 入力値がlist出ない場合は処理を終了
            print("data most be list. ")
            exit()
 
        for d in data:
            # Data must be a bytestring
            d = str(d).encode("utf-8")
            response_ary.append(self.publisher.publish(self.topic_path, d).result())
 
        return response_ary

def get_room_info(api_token: str) -> {}:
    """部屋の温度の情報を取得する。
    Args:
        api_key (str): NatureRemoから取得したAPIToken
    """
    headers = {
        "accept": "application/json",
        "Authorization": "Bearer " + api_token,
    }
    response = requests.get("https://api.nature.global/1/devices", headers=headers)
    return response.json()
 
 
def collect_to_topic(api_token: str, publisher):
    """NatureRemoからデータを取得し、GooglePubSubに送信する。
    Args:
        api_token (str): NatureRemoのHPで取得したAPIToken
    """
 
    room_info_list = []
    for room_info in get_room_info(api_token):
        request_at_date = (
            timezone("Asia/Tokyo")
            .localize(dt.datetime.now())
            .strftime("%Y-%m-%dT%H:%M:%SZ")
        )
        tmp_room_info = {
            "name": room_info["name"],
            "newest_events": room_info["newest_events"],
            "updated_at": room_info["updated_at"],
            "request_at": request_at_date,
        }
        room_info_list.append(tmp_room_info)
    print(room_info_list)
 
    # GooglePubSubに送信
    publisher.publish(room_info_list)
 
 
if __name__ == "__main__":
    # 引数の設定
    parser = argparse.ArgumentParser()
 
    parser.add_argument("api_token", help="NatureRemoから取得したAPIキー")
 
    args = parser.parse_args()
 
    # Publisher Clientの初期化
    publisher = PublisherFacade("${プロジェクトID}", "${トピック名}")
    collect_to_topic(api_token=args.api_token, publisher=publisher)

実行する際は、先ほどCloud Pub/Subからダウンロードしたpubsub-credential.jsonを、以下のように環境変数に設定する。

export GOOGLE_APPLICATION_CREDENTIALS=pubsub-credential.json

その後以下のコマンドを実行しすることで、NatureRemoのセンサーデータをCloud Pub/Subに送信する。

python publish.py ${NatureRemoから取得したAPIキー}

サブスクライブワーカー

次に、Cloud Pub/Subからデータを取得しMongoDBに保存する、サブスクライブワーカーの作成を行う。

サブスクライブワーカーの実装

まず、MongoDBと接続する為のライブラリをインストールする。

pip install pymongo

その後、subscribe.pyを作成し、以下のコードを記載する。${プロジェクトID}${サブスクリプション名}の箇所にはそれぞれCloud Pub/Subで作成したプロジェクトIDと、サブスクリプション名を入力する。${保存するデータベース名}${保存するコレクション名}はMongoDBの保存する先の情報を記載する。

import argparse
import json

from google.api_core import retry
from google.cloud import pubsub_v1
from pymongo import MongoClient


class MongoRepository(object):
    def __init__(
        self, db_name, collection_name, username, password, host="localhost", port=27017
    ):
        _client = MongoClient(
            "mongodb://%s:%s@%s:%d" % (username, password, host, port)
        )
        _db = _client[db_name]
        # 認証情報を付与
        _db.add_user(
            username,
            password,
            roles=[
                {
                    "role": "dbAdmin",
                    "db": db_name,
                },
            ],
        )
 
        self.collection = _db[collection_name]
 
    def insert_list(self, insert_list):
        return self.collection.insert_many(insert_list)
 
 
class SubscriberFacade(object):
    def __init__(self, project_id: str, debug_mode: bool = False):
        """GCPのPub/Subクライアントに接続し、データをPullするクラス
 
        Args:
            project_id ([str]): トピックが存在するプロジェクトIDのリスト
            debug_mode()
        """
        self.subscriber = pubsub_v1.SubscriberClient()
        self.project_id = project_id
        self.subscriptions_list = []
        self.ack_ids = []
        self.subscription_path = None
        self.debug_mode = debug_mode
 
    def _init_subscriber(self):
        self.subscriber = pubsub_v1.SubscriberClient()
 
    def list_subscriptions_in_project(self) -> list:
        """project_idに所属するサブスクリプションの一覧を取得する。
        参考URL
        https://github.com/googleapis/python-pubsub/blob/main/samples/snippets/subscriber.py
        """
        project_path = f"projects/{self.project_id}"
 
        self.subscriptions_list = [
            sl.name
            for sl in self.subscriber.list_subscriptions(
                request={"project": project_path}
            )
        ]
        return self.subscriptions_list
 
    def pull(self, subscription_id: str, max_messages: int = 10) -> list:
        """指定したSubscribeからデータをPullする
 
        Args:
            subscription_id (str): 対象のサブスクリプションID
            max_messages (int): pullする最大のメッセージ数
 
        Returns:
            list: [description]
        """
        self._init_subscriber()
        self.ack_ids = []
 
        self.subscription_path = self.subscriber.subscription_path(
            self.project_id, subscription_id
        )
 
        with self.subscriber:
            response = self.subscriber.pull(
                request={
                    "subscription": self.subscription_path,
                    "max_messages": max_messages,
                },
                retry=retry.Retry(deadline=300),
            )
 
            for received_message in response.received_messages:
                self.ack_ids.append(received_message.ack_id)
 
            print(
                f"Received and acknowledged {len(response.received_messages)} messages from {self.subscription_path}."
            )
 
        return response.received_messages
 
    def send_ack(self):
        """直前のPull or Subscribe処理に関してAckを返す
        該当の処理が実行されていない場合は何もせず終了する。
        """
        if not self.subscription_path:
            print("直前のSubscribeが見つかりません。")
            return
        self._init_subscriber()
        if not self.debug_mode and not self.ack_ids == []:
            print("Ackを返します。 ack_ids: {}".format(self.ack_ids))
            with self.subscriber:
                self.subscriber.acknowledge(
                    request={
                        "subscription": self.subscription_path,
                        "ack_ids": self.ack_ids,
                    }
                )
 
 
def bytes_to_json(input_bytes: bytes) -> dict:
    """bytes形式のJsonをJsonの辞書配列に変換する
 """
    converted_byte = (
        input_bytes.decode("utf8")
        .replace("None", '"None"')
        .replace("True", "true")
        .replace("False", "false")
        .replace("'", '"')
    )
    print(converted_byte)
    return json.loads(converted_byte)
 
 
def main(args) -> None:
    """PubSubから同期的にメッセージを取得する    """
 
    # Subscriberの初期化
    subscriber = SubscriberFacade(project_id=" ${プロジェクトID}", debug_mode=args.debug_mode)
 
    subscribe_payloads_list,subscribe_messages = [], []
    try:
        # Messageの取得と整形
        subscribe_messages = subscriber.pull(
            " ${サブスクリプションID}", 10
        )
        subscribe_payloads_list = [
            bytes_to_json(x.message.data) for x in subscribe_messages
        ]
        print(subscribe_payloads_list)
    except Exception as e:
        print(e)

    # Mongo Clientの初期化
    mongo_client = MongoRepository(
        db_name="${保存するデータベース名}",
        collection_name="${保存するコレクション名}",
        username=args.mongo_username,
        password=args.mongo_password,
        host=args.mongo_host,
        port=args.mongo_port,
    )

    # データの挿入
    try:
        if subscribe_payloads_list == []:
            print("SubscriptionIdに新規メッセージは存在しませんでした。")
        else:
            mongo_client.insert_list(subscribe_payloads_list)
            subscriber.send_ack()
    except Exception as e:
        print(e)
        print(
            "保存に失敗しました。ACKを返しません。"
        )
 
 
if __name__ == "__main__":
    # 引数の設定
    parser = argparse.ArgumentParser()
 
    parser.add_argument("mongo_username", help="ユーザ名")
    parser.add_argument("mongo_password", help="パスワード")
    parser.add_argument("--mongo_host", default="localhost", help="対象のMongoDBのホスト名")
    parser.add_argument("--mongo_port", default=27017, help="対象のMongoDBのポート番号")
    parser.add_argument("--debug_mode", default=False, help="debugモードの場合はAckを返さない")
 
    args = parser.parse_args()
 
    main(args)

処理の流れは、データをキューサーバからサブスクライブし、そのデータをMongoDBに保存するだけである。しかし、181~193行目の以下の箇所にデータをロストしない工夫をしてある。

        try:
            if subscribe_payloads_list == []:
                print("SubscriptionIdに新規メッセージは存在しませんでした。")
                continue
            else:
                mongo_client.insert_list(subscribe_payloads_list)
                subscriber.send_ack()
        except Exception as e:
            print(e)
            print(
                "保存に失敗しました。ACKを返しません。"
            )

上記のコードでは、Cloud Pub/Subからデータをサブスクライブし、MongoDBへデータの保存が完了した場合にAckを返し、保存処理に失敗した場合はAckを返さないようにしている。

Ackとは、受け取り確認のことで、Cloud Pub/Subは、Ackが返されたデータは配信済みのデータとみなし、キュー上から削除する。そのため、例えば、Cloud Pub/Subからデータを受け取ったタイミングでAckを返した後、何らかのエラーが発生し、その後の処理が失敗したとする。その後、次に処理をする際に、エラーが発生した時刻のデータは、Ackが返されてすでにキューサーバ上に存在しないため、その時刻のデータをロストするという現象が発生する。

そこで、MongoDBにデータの保存が完了した場合にのみAckを返すことで、MongoDBに保存できない場合は、Ackが返されず、キューサーバにデータが残り続けるため、データがロストする心配がなくなるということである。

実行の際は、パブリッシュワーカーと同様にpubsub-credential.jsonを以下のように環境変数に設定し、コマンドを実行する。

export GOOGLE_APPLICATION_CREDENTIALS=pubsub-credential.json
python subscribe.py ${MongoDBのユーザ名} ${MongoDBのパスワード}

定期実行の設定と可視化

上記のパブリッシュワーカーとサブスクライブワーカーを、cronで定期実行するようにし、設定は完了。

前回構築したMetabaseでグラフを作成したところ直近のデータが表示されたので、データを問題なく収集できていることが確認できた。

保存されたセンサーデータをMetabaseで可視化

おわりに

Cloud Pub/Subを利用したワークキューシステムを構築できた。これによりMongoDBが落ちていてもデータはキューに保存されるし、データを他のアプリケーションで利用したくなった時も、キューからデータを取得できるので簡単に拡張ができそう。マイクロサービスいいですね!

参考にした書籍

本記事は、下記の「分散システムデザインパターン」の「第Ⅲ部 バッチ処理パターン」を参考に実装した。この本では、マイクロサービスに関連する基本的な用語から、構築パターンのサンプルコードまで掲載されており、非常に理解がしやすい。マイクロサービスに興味がある方は、是非手にとって確認いただきたい。

温度や湿度を取得するために利用しているセンサーは、NatureRemoを利用している。NatureRemoは、温度や湿度の計測以外にも、家電の操作ができ、更にAlexaなどの音声スピーカーと連携することで、音声で家電を操作できる。

【データコンペ】Signate 医学論文の自動仕分けチャレンジ

はじめに

昨年Signateで開催された医学論文の自動仕分けチャレンジに挑戦したので、その時に実施した内容を備忘録として記載します。

signate.jp

コンペの内容は、医学論文のタイトルと概要から、その論文がシステマティックレビューの対象となる論文かどうかを判別するというもの。英語の自然言語処理と、不均衡データ(システマティックレビューの対象は全体のうち非常に少ない)へのアプローチが必要になる。

終結果は、60%ラインをギリギリ超える346位で、初めての分野の挑戦にもかかわらずランキングの半分以上に食い込めたので個人的には満足。

2022/02/01時点での順位

実行環境

実施

データの前処理から学習までの流れは以下の図の流れで実施した。一度モデル作成をした後に学習したモデルを用いた特徴量選択と、学習データのアンダーサンプリングを行い、再度学習したモデルを最終的なモデルとした。

f:id:sey323:20220219191638p:plain
学習の全体像

前処理

前処理には、こちらの記事に記載した前処理のうち、いくつかの手法を利用している。

sey323log.hatenablog.com

クリーニング

英文のクリーニング処理として、TextHeroを利用したクリーニング処理を行った。上記の記事でも取り上げたが、TextHeroは英文のクリーニングを簡単に実施してくれるため非常に便利。

特徴量作成と次元削減

特徴量作成では論文のタイトルと論文の概要に対して、それぞれ以下の表の手法を適応した。

特徴抽出手法 次元削減 次元数
GoogleNewsで全ての単語をベクトル化し平均をとる Umap 10次元
FastTextで全ての単語をベクトル化し平均をとる Umap 10次元
TFIDF(sklearnのTfidfVectorizerを利用) 主成分分析 64次元
単語の出現頻度のカウント(sklearnのCountVectorizerを利用) 主成分分析 64次元

文章のベクトル化に関して、上位の解法ではBERTを利用しているものが多かったが、私の端末で実行したら2日かかっても処理が終わらなかったためBERTは利用していない。(性能の良いPCほしい)

特徴量を作成後、作成した特徴量に対してUmapや主成分分析を用いた次元削減を行った。次元削減手法にUmapか主成分分析のどちらを利用するかは特に根拠はなく、複数組み合わせて最も性能が良い手法を適応した。

特徴量作成と次元削減により、合計296個の特徴量が作成された。

モデル構築

今回はlightGBMでモデルを構築する。

初期モデル構築

作成した特徴量を全て利用しlightGBMで学習する。交差検証は、StratifiedKFoldを利用し学習データを3分割にしている。StratifiedKFoldは、目的変数のデータに不均衡がある場合に用いる分割手法で、目的変数の分布を維持したまま分割をするという手法である。そのため今回のタスクのような不均衡なデータでは、StratifiedKFoldを用いるのが適切らしい。

qiita.com

特徴量選択

学習が終了後、作成されたモデルの特徴量の重要度を可視化し、重要度が高い特徴量を確認する。lightGBMで学習した際のそれぞれの特徴量の重要度をグラフにしたものを以下に示す。

LightGBMで学習した際のそれぞれの特徴量の重要度

ほとんどの特徴量がベクトル化と次元削減された特徴量のため各カラムの意味は不明だが、これを基準に重要度が低い特徴量を削減していく。今回は298個の特徴量のうち、重要度の上位100個の特徴量を学習に利用する。

アンダーサンプリング

今回のデータは、正例の数が極めて少ない不均衡データが対象である。不均衡データの場合、データが偏っている方(今回であれば負例)に偏った出力をしてしまいがちである。そこで正例と負例を同じデータ数にすることで、データの偏りをなくすアンダーサンプリングを実施する。アンダーサンプリングはimblearnライブラリで以下のように実装できる。

imbalanced-learn.org

pip install -U imbalanced-learn

train_xが説明変数、train_yを目的変数とすると、以下のように実装する。train_xtrain_yのデータ数は同じである必要がある。

# アンダーサンプリング
from imblearn.under_sampling import RandomUnderSampler

X_resampled, y_resampled = RandomUnderSampler(random_state=123).fit_resample(train_x, train_y)

アンダーサンプリングを実施することで、元の偏った分布を同じ割合にすることができた。

f:id:sey323:20220219185914p:plain
目的変数の元の分布

アンダーサンプリング後のデータの分布

再学習

アンダーサンプリングと特徴量選択を行ったデータで、再度lightGBMで学習を行う。1回目の学習と異なりデータを4分割して学習を行い、評価の際はそれぞれのモデルの出力の平均値をモデル全体の予測値とする。

学習したモデルの検証データに対するFbetaスコアは0.8449であり、大体85%くらいの精度を確認できた。学習したモデルによる混合行列を見ても、検証用データに関してはかなり高い精度で分割ができたことが確認できる。

f:id:sey323:20220219190618p:plain
再学習したモデルで混合行列

こちらで学習したモデルで提出したところ、最初に添付した順位の暫定評価0.806、最終評価0.841を得られた。

終わりに

自然言語処理には初めて触れたため結果は散々だったが、自然言語処理という分野について多くを学べる機会となった。これを機に今後は自然言語処理系のコンペにも挑戦していきたい。

FreeIPAで複数端末間のSSOを実現する

概要

家にはミニPC2台、ラズパイ3台が稼働しており、現在はそれぞれの端末にそれぞれのログインユーザを作成している。しかし、偶にこのPCのユーザ名やパスワードなんだっけとか思い出せない時があり、今後端末が増えていく前にアカウントを統合管理をしてみたいなと思ったので、Linuxの統合認証ソフトウェアのFreeIPAを導入してみた。

最終的に、端末Aで作成したユーザで端末Bにもログインできること(SSO)ができることまで確認する

FreeIPAとは?

FreeIPAはディレクトリサーバ、Kerberos、DNSサーバなどから構成されるセキュリティ管理のための統合化ソフトウェアである。一言で言うとLinux版のActive Directoryである。

FreeIPA

IPA基板を構築することで、主に以下の機能を実現できる。

  • Kerberos認証によるシングルサインオン(SSO)
  • DogTagを利用したクライアント認証
  • certmanagerによる証明書の管理/更新
  • DNS機能の提供
  • sudoルールの管理
  • etc

各端末のアカウントを統合的に管理したい要件は満たしており、free-ipaが提供するツールセットにより、LDAPより導入がかなり楽ということを聞いたので、こちらを採用することにした。

環境

  • alpmalinux (main)
  • centos8 (replica-1)

IPAサーバの構築

ipa-serverをalpmalinux(以下main)にインストールをする。

前準備

ホスト名を更新

FreeIPAサーバを構築する前に/etc/hostname/etc/hostsを更新する。注意点がここで/etc/hostname/etc/hostsで指定する端末のホスト名は同じにする必要がある。

main$ sudo hostnamectl set-hostname main.example.home 
main$ echo "192.168.0.110 main.example.home main" | sudo tee -a /etc/hosts

hostnameコマンドで/etc/hostnameを指定し更新を実施する。

main$ sudo hostname -f /etc/hostname

ホスト名が更新されていることを確認する.

main$ hostname
main.example.home

必要ライブラリのインストール

実行前にパッケージ管理ツールのdnfの更新を行う。私の環境では必要だったが、必須ではないかもしれない。

main$ sudo dnf update --allowerasing --skip-broken --nobest
main$ sudo dnf install @idm:DL1

ipa-serverをインストール

main$ sudo dnf install ipa-server

FreeIPAでdnsを利用する場合は、追加でコンポーネントをインストールする。今回は家のDNSサーバを、FreeIPAで提供されるDNSに置き換えるためインストールを実施した。

main$ sudo dnf install ipa-server-dns bind-dyndb-ldap

main端末にIPAサーバの構築

以下のコマンドを実行し、FreeIPAサーバを構築する。

main$ sudo ipa-server-install

The log file for this installation can be found in /var/log/ipaserver-install.log
==============================================================================
This program will set up the IPA Server.
Version 4.9.2

This includes:
  * Configure a stand-alone CA (dogtag) for certificate management
  * Configure the NTP client (chronyd)
  * Create and configure an instance of Directory Server
  * Create and configure a Kerberos Key Distribution Center (KDC)
  * Configure Apache (httpd)
  * Configure the KDC to enable PKINIT

To accept the default shown in brackets, press the Enter key.

Do you want to configure integrated DNS (BIND)? [no]: yes

Enter the fully qualified domain name of the computer
on which you're setting up server software. Using the form
<hostname>.<domainname>
Example: master.example.home.


Server host name [main.example.home]:

Warning: skipping DNS resolution of host main.example.home
The domain name has been determined based on the host name.

Please confirm the domain name [example.home]:

The kerberos protocol requires a Realm name to be defined.
This is typically the domain name converted to uppercase.

Please provide a realm name [EXAMPLE.HOME]:
Certain directory server operations require an administrative user.
This user is referred to as the Directory Manager and has full access
to the Directory for system management tasks and will be added to the
instance of directory server created for IPA.
The password must be at least 8 characters long.

# LDAPの管理者ユーザ(Manager)のパスワード
Directory Manager password:
Password (confirm):

The IPA server requires an administrative user, named 'admin'.
This user is a regular system account used for IPA server administration.

# IPAの管理者ユーザ(admin)のパスワード
IPA admin password:
Password (confirm):

Checking DNS domain example.home., please wait ...
Do you want to configure DNS forwarders? [yes]: yes # A Recordに存在しないドメインを他のDNSに問い合わせるかどうか。
Following DNS servers are configured in /etc/resolv.conf: 192.168.8.12, 2001:4860:4860::8888, 2001:4860:4860::8844
Do you want to configure these servers as DNS forwarders? [yes]: no # /etc/resolv.confからDNSを選択されて、このDNSで問題ないかを尋ねられる。192.168.8.12が我が家のDNSで今回はこれを利用せずGoogleのDNSを利用するのでnoを入力
Enter an IP address for a DNS forwarder, or press Enter to skip: 8.8.8.8 # GoogleのDNSを選択
DNS forwarder 8.8.8.8 added. You may add another.
Enter an IP address for a DNS forwarder, or press Enter to skip:
DNS forwarders: 8.8.8.8
Checking DNS forwarders, please wait ...
Do you want to search for missing reverse zones? [yes]: no
Do you want to configure chrony with NTP server or pool address? [no]: no

The IPA Master Server will be configured with:
Hostname:       main.example.home
IP address(es): 192.168.0.110
Domain name:    example.home
Realm name:     EXAMPLE.HOME

The CA will be configured with:
Subject DN:   CN=Certificate Authority,O=EXAMPLE.HOME
Subject base: O=EXAMPLE.HOME
Chaining:     self-signed

BIND DNS server will be configured to serve IPA domain with:
Forwarders:       8.8.8.8
Forward policy:   only
Reverse zone(s):  No reverse zone

Continue to configure the system with these values? [no]: yes

以下実行ログ

The following operations may take some minutes to complete.
Please wait until the prompt is returned.

Disabled p11-kit-proxy
Synchronizing time
No SRV records of NTP servers found and no NTP server or pool address was provided.
Using default chrony configuration.
Attempting to sync time with chronyc.
Time synchronization was successful.
Configuring directory server (dirsrv). Estimated time: 30 seconds
  [1/41]: creating directory server instance
  [2/41]: tune ldbm plugin
  [3/41]: adding default schema
  [4/41]: enabling memberof plugin
  [5/41]: enabling winsync plugin
  [6/41]: configure password logging
  [7/41]: configuring replication version plugin
  [8/41]: enabling IPA enrollment plugin
  [9/41]: configuring uniqueness plugin
  [10/41]: configuring uuid plugin
  [11/41]: configuring modrdn plugin
  [12/41]: configuring DNS plugin
  [13/41]: enabling entryUSN plugin
  [14/41]: configuring lockout plugin
  [15/41]: configuring topology plugin
  [16/41]: creating indices
  [17/41]: enabling referential integrity plugin
  [18/41]: configuring certmap.conf
  [19/41]: configure new location for managed entries
  [20/41]: configure dirsrv ccache and keytab
  [21/41]: enabling SASL mapping fallback
  [22/41]: restarting directory server
  [23/41]: adding sasl mappings to the directory
  [24/41]: adding default layout
  [25/41]: adding delegation layout
  [26/41]: creating container for managed entries
  [27/41]: configuring user private groups
  [28/41]: configuring netgroups from hostgroups
  [29/41]: creating default Sudo bind user
  [30/41]: creating default Auto Member layout
  [31/41]: adding range check plugin
  [32/41]: creating default HBAC rule allow_all
  [33/41]: adding entries for topology management
  [34/41]: initializing group membership
  [35/41]: adding master entry
  [36/41]: initializing domain level
  [37/41]: configuring Posix uid/gid generation
  [38/41]: adding replication acis
  [39/41]: activating sidgen plugin
  [40/41]: activating extdom plugin
  [41/41]: configuring directory to start on boot
Done configuring directory server (dirsrv).
Configuring Kerberos KDC (krb5kdc)
  [1/10]: adding kerberos container to the directory
  [2/10]: configuring KDC
  [3/10]: initialize kerberos container
  [4/10]: adding default ACIs
  [5/10]: creating a keytab for the directory
  [6/10]: creating a keytab for the machine
  [7/10]: adding the password extension to the directory
  [8/10]: creating anonymous principal
  [9/10]: starting the KDC
  [10/10]: configuring KDC to start on boot
Done configuring Kerberos KDC (krb5kdc).
Configuring kadmin
  [1/2]: starting kadmin
  [2/2]: configuring kadmin to start on boot
Done configuring kadmin.
Configuring ipa-custodia
  [1/5]: Making sure custodia container exists
  [2/5]: Generating ipa-custodia config file
  [3/5]: Generating ipa-custodia keys
  [4/5]: starting ipa-custodia
  [5/5]: configuring ipa-custodia to start on boot
Done configuring ipa-custodia.
Configuring certificate server (pki-tomcatd). Estimated time: 3 minutes
  [1/28]: configuring certificate server instance
  [2/28]: stopping certificate server instance to update CS.cfg
  [3/28]: backing up CS.cfg
  [4/28]: Add ipa-pki-wait-running
  [5/28]: secure AJP connector
  [6/28]: reindex attributes
  [7/28]: exporting Dogtag certificate store pin
  [8/28]: disabling nonces
  [9/28]: set up CRL publishing
  [10/28]: enable PKIX certificate path discovery and validation
  [11/28]: authorizing RA to modify profiles
  [12/28]: authorizing RA to manage lightweight CAs
  [13/28]: Ensure lightweight CAs container exists
  [14/28]: starting certificate server instance
  [15/28]: configure certmonger for renewals
  [16/28]: requesting RA certificate from CA
  [17/28]: publishing the CA certificate
  [18/28]: adding RA agent as a trusted user
  [19/28]: configure certificate renewals
  [20/28]: Configure HTTP to proxy connections
  [21/28]: updating IPA configuration
  [22/28]: enabling CA instance
  [23/28]: migrating certificate profiles to LDAP
  [24/28]: importing IPA certificate profiles
  [25/28]: adding default CA ACL
  [26/28]: adding 'ipa' CA entry
  [27/28]: configuring certmonger renewal for lightweight CAs
  [28/28]: deploying ACME service
Done configuring certificate server (pki-tomcatd).
Configuring directory server (dirsrv)
  [1/3]: configuring TLS for DS instance
  [2/3]: adding CA certificate entry
  [3/3]: restarting directory server
Done configuring directory server (dirsrv).
Configuring ipa-otpd
  [1/2]: starting ipa-otpd
  [2/2]: configuring ipa-otpd to start on boot
Done configuring ipa-otpd.
Configuring the web interface (httpd)
  [1/21]: stopping httpd
  [2/21]: backing up ssl.conf
  [3/21]: disabling nss.conf
  [4/21]: configuring mod_ssl certificate paths
  [5/21]: setting mod_ssl protocol list
  [6/21]: configuring mod_ssl log directory
  [7/21]: disabling mod_ssl OCSP
  [8/21]: adding URL rewriting rules
  [9/21]: configuring httpd
Nothing to do for configure_httpd_wsgi_conf
  [10/21]: setting up httpd keytab
  [11/21]: configuring Gssproxy
  [12/21]: setting up ssl
  [13/21]: configure certmonger for renewals
  [14/21]: publish CA cert
  [15/21]: clean up any existing httpd ccaches
  [16/21]: configuring SELinux for httpd
  [17/21]: create KDC proxy config
  [18/21]: enable KDC proxy
  [19/21]: starting httpd
  [20/21]: configuring httpd to start on boot
  [21/21]: enabling oddjobd
Done configuring the web interface (httpd).
Configuring Kerberos KDC (krb5kdc)
  [1/1]: installing X509 Certificate for PKINIT
Done configuring Kerberos KDC (krb5kdc).
Applying LDAP updates
Upgrading IPA:. Estimated time: 1 minute 30 seconds
  [1/10]: stopping directory server
  [2/10]: saving configuration
  [3/10]: disabling listeners
  [4/10]: enabling DS global lock
  [5/10]: disabling Schema Compat
  [6/10]: starting directory server
  [7/10]: upgrading server
  [8/10]: stopping directory server
  [9/10]: restoring configuration
  [10/10]: starting directory server
Done.
Restarting the KDC
dnssec-validation yes
Configuring DNS (named)
  [1/11]: generating rndc key file
  [2/11]: adding DNS container
  [3/11]: setting up our zone
  [4/11]: setting up our own record
  [5/11]: setting up records for other masters
  [6/11]: adding NS record to the zones
  [7/11]: setting up kerberos principal
  [8/11]: setting up named.conf
created new /etc/named.conf
created named user config '/etc/named/ipa-ext.conf'
created named user config '/etc/named/ipa-options-ext.conf'
  [9/11]: setting up server configuration
  [10/11]: configuring named to start on boot
  [11/11]: changing resolv.conf to point to ourselves
Done configuring DNS (named).
Restarting the web server to pick up resolv.conf changes
Configuring DNS key synchronization service (ipa-dnskeysyncd)
  [1/7]: checking status
  [2/7]: setting up bind-dyndb-ldap working directory
  [3/7]: setting up kerberos principal
  [4/7]: setting up SoftHSM
  [5/7]: adding DNSSEC containers
  [6/7]: creating replica keys
  [7/7]: configuring ipa-dnskeysyncd to start on boot
Done configuring DNS key synchronization service (ipa-dnskeysyncd).
Restarting ipa-dnskeysyncd
Restarting named
Updating DNS system records
Configuring client side components
This program will set up IPA client.
Version 4.9.2

Using existing certificate '/etc/ipa/ca.crt'.
Sudo version 1.8.29
Configure options: --build=x86_64-redhat-linux-gnu --host=x86_64-redhat-linux-gnu --program-prefix= --disable-dependency-tracking --prefix=/usr --exec-prefix=/usr --bindir=/usr/bin --sbindir=/usr/sbin --sysconfdir=/etc --datadir=/usr/share --includedir=/usr/include --libdir=/usr/lib64 --libexecdir=/usr/libexec --localstatedir=/var --sharedstatedir=/var/lib --mandir=/usr/share/man --infodir=/usr/share/info --prefix=/usr --sbindir=/usr/sbin --libdir=/usr/lib64 --docdir=/usr/share/doc/sudo --disable-root-mailer --with-logging=syslog --with-logfac=authpriv --with-pam --with-pam-login --with-editor=/bin/vi --with-env-editor --with-ignore-dot --with-tty-tickets --with-ldap --with-ldap-conf-file=/etc/sudo-ldap.conf --with-selinux --with-passprompt=[sudo] password for %p:  --with-linux-audit --with-sssd
Sudoers policy plugin version 1.8.29
Sudoers file grammar version 46

Sudoers path: /etc/sudoers
nsswitch path: /etc/nsswitch.conf
ldap.conf path: /etc/sudo-ldap.conf
ldap.secret path: /etc/ldap.secret
Authentication methods: 'pam'
Syslog facility if syslog is being used for logging: authpriv
Syslog priority to use when user authenticates successfully: notice
Syslog priority to use when user authenticates unsuccessfully: alert
Ignore '.' in $PATH
Send mail if the user is not in sudoers
Lecture user the first time they run sudo
Require users to authenticate by default
Root may run sudo
Always set $HOME to the target user's home directory
Allow some information gathering to give useful error messages
Visudo will honor the EDITOR environment variable
Set the LOGNAME and USER environment variables
Length at which to wrap log file lines (0 for no wrap): 80
Authentication timestamp timeout: 5.0 minutes
Password prompt timeout: 5.0 minutes
Number of tries to enter a password: 3
Umask to use or 0777 to use user's: 022
Path to mail program: /usr/sbin/sendmail
Flags for mail program: -t
Address to send mail to: root
Subject line for mail messages: *** SECURITY information for %h ***
Incorrect password message: Sorry, try again.
Path to lecture status dir: /var/db/sudo/lectured
Path to authentication timestamp dir: /run/sudo/ts
Default password prompt: [sudo] password for %p:
Default user to run commands as: root
Value to override user's $PATH with: /sbin:/bin:/usr/sbin:/usr/bin
Path to the editor for use by visudo: /bin/vi
When to require a password for 'list' pseudocommand: any
When to require a password for 'verify' pseudocommand: all
File descriptors >= 3 will be closed before executing a command
Reset the environment to a default set of variables
Environment variables to check for sanity:
    TZ
    TERM
    LINGUAS
    LC_*
    LANGUAGE
    LANG
    COLORTERM
Environment variables to remove:
    *=()*
    RUBYOPT
    RUBYLIB
    PYTHONUSERBASE
    PYTHONINSPECT
    PYTHONPATH
    PYTHONHOME
    TMPPREFIX
    ZDOTDIR
    READNULLCMD
    NULLCMD
    FPATH
    PERL5DB
    PERL5OPT
    PERL5LIB
    PERLLIB
    PERLIO_DEBUG
    JAVA_TOOL_OPTIONS
    SHELLOPTS
    BASHOPTS
    GLOBIGNORE
    PS4
    BASH_ENV
    ENV
    TERMCAP
    TERMPATH
    TERMINFO_DIRS
    TERMINFO
    _RLD*
    LD_*
    PATH_LOCALE
    NLSPATH
    HOSTALIASES
    RES_OPTIONS
    LOCALDOMAIN
    CDPATH
    IFS
Environment variables to preserve:
    XAUTHORITY
    _XKB_CHARSET
    LINGUAS
    LANGUAGE
    LC_ALL
    LC_TIME
    LC_TELEPHONE
    LC_PAPER
    LC_NUMERIC
    LC_NAME
    LC_MONETARY
    LC_MESSAGES
    LC_MEASUREMENT
    LC_IDENTIFICATION
    LC_COLLATE
    LC_CTYPE
    LC_ADDRESS
    LANG
    USERNAME
    QTDIR
    PS2
    PS1
    MAIL
    LS_COLORS
    KDEDIR
    HISTSIZE
    HOSTNAME
    DISPLAY
    COLORS
Locale to use while parsing sudoers: C
Compress I/O logs using zlib
Directory in which to store input/output logs: /var/log/sudo-io
File in which to store the input/output log: %{seq}
Add an entry to the utmp/utmpx file when allocating a pty
PAM service name to use: sudo
PAM service name to use for login shells: sudo-i
Attempt to establish PAM credentials for the target user
Create a new PAM session for the command to run in
Perform PAM account validation management
Maximum I/O log sequence number: 0
Enable sudoers netgroup support
Check parent directories for writability when editing files with sudoedit
Query the group plugin for unknown system groups
Allow commands to be run even if sudo cannot write to the audit log
Allow commands to be run even if sudo cannot write to the log file
Resolve groups in sudoers and match on the group ID, not the name
Log entries larger than this value will be split into multiple syslog messages: 960
File mode to use for the I/O log files: 0600
Execute commands by file descriptor instead of by path: digest_only
Type of authentication timestamp record: tty
Ignore case when matching user names
Ignore case when matching group names
Log when a command is allowed by sudoers
Log when a command is denied by sudoers
Don't pre-resolve all group names

Local IP address and netmask pairs:
    192.168.0.110/255.255.255.0
    172.18.0.1/255.255.0.0
    172.17.0.1/255.255.0.0
    fe80::6a1d:efff:fe25:72ad/ffff:ffff:ffff:ffff::
    fe80::42:d4ff:fe89:ce42/ffff:ffff:ffff:ffff::
    fe80::42:8cff:fe45:7e23/ffff:ffff:ffff:ffff::

Sudoers I/O plugin version 1.8.29
Client hostname: main.example.home
Realm: EXAMPLE.HOME
DNS Domain: example.home
IPA Server: main.example.home
BaseDN: dc=example,dc=home

Configured sudoers in /etc/authselect/user-nsswitch.conf
Configured /etc/sssd/sssd.conf
Systemwide CA database updated.
Adding SSH public key from /etc/ssh/ssh_host_ecdsa_key.pub
Adding SSH public key from /etc/ssh/ssh_host_ed25519_key.pub
Adding SSH public key from /etc/ssh/ssh_host_rsa_key.pub
SSSD enabled
Configured /etc/openldap/ldap.conf
Configured /etc/ssh/ssh_config
Configured /etc/ssh/sshd_config
Configuring example.home as NIS domain.
Client configuration complete.
The ipa-client-install command was successful

==============================================================================
Setup complete

Next steps:
    1. You must make sure these network ports are open:
        TCP Ports:
          * 80, 443: HTTP/HTTPS
          * 389, 636: LDAP/LDAPS
          * 88, 464: kerberos
          * 53: bind
        UDP Ports:
          * 88, 464: kerberos
          * 53: bind
          * 123: ntp

    2. You can now obtain a kerberos ticket using the command: 'kinit admin'
       This ticket will allow you to use the IPA tools (e.g., ipa user-add)
       and the web user interface.

Be sure to back up the CA certificates stored in /root/cacert.p12
These files are required to create replicas. The password for these
files is the Directory Manager password
The ipa-server-install command was successful

こちらで構築が完了。

動作確認

kerberos認証を用いてipaのコマンド実行権限を取得する。パスワードが求められるので、ipa-server構築時に入力したパスワードを入力する。

main$ kinit admin
Password for admin@EXAMPLE.HOME:
server$

インストールに問題ないかを確認する。

main$ ipa env
  api_version: 2.237
  basedn: dc=example,dc=home
  bin: /usr/bin
  ca_agent_install_port: None
  ca_agent_port: 443
  ca_ee_install_port: None
  ca_ee_port: 443
  ca_host: replica-1.example.home
  ca_install_port: None
  ca_port: 80
  conf: /etc/ipa/cli.conf
  conf_default: /etc/ipa/default.conf
  confdir: /etc/ipa
  config_loaded: True
=== 省略 ===

上記のようにIPAコマンドの実行でエラーが出なければ問題ない。

エラーが出たりした場合は、以下の点を確認するか、一度アンインストールして再度インストールし直す。

  • /etc/hosts/etc/hostnameで指定したホスト名が同じか
  • hostnameコマンド実行時に表示される名前が/etc/hostnameの値と同じか
  • 各ポートが空いているか
main$ ipa-server-install --uninstall

補足: FreeIPAの管理画面

FreeIPAは管理画面を提供しており、以下のURLにアクセスすると管理画面が表示され、インストール時に入力したadminアカウントでログインができる。今回はコマンドで実行するが、ユーザ追加やA Recordの追加といった一般的なFreeIPAの機能はこちらの管理画面からも実行可能である。

https://main.example.home/

f:id:sey323:20211123214731p:plain
free-ipaの管理画面

クライアント側

前準備

DNSの設定

mainの端末上でipaクライアントとなるreplica-1IPアドレスを名前解決できるように、IPAサーバのDNSに登録する。

main$ ipa dnsrecord-add example.home replica-1 --a-rec 192.168.0.111
  Record name: replica-1
  A record: 192.168.0.111

名前解決ができていれば動作はOK

main$ dig replica-1.example.home

; <<>> DiG 9.11.4-P2-RedHat-9.11.4-26.P2.el7_9.7 <<>> replica-1.example.home
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 16268
;; flags: qr aa rd ra; QUERY: 1, ANSWER: 1, AUTHORITY: 1, ADDITIONAL: 2

;; OPT PSEUDOSECTION:
; EDNS: version: 0, flags:; udp: 1232
;; QUESTION SECTION:
;replica-1.example.home.      IN  A

;; ANSWER SECTION:
replica-1.example.home.   1200    IN  A   192.168.0.111 # 名前解決できていること

;; AUTHORITY SECTION:
example.home.     86400   IN  NS  main.example.home.

;; ADDITIONAL SECTION:
main.example.home.    1200    IN  A   192.168.0.110

;; Query time: 1 msec
;; SERVER: 192.168.0.110#53(192.168.0.110) # ipa-serverのIPアドレスであることを確認
;; WHEN: 月 11月 22 15:28:21 JST 2021
;; MSG SIZE  rcvd: 99

replica-1での作業

replica-1に移動して/etc/resolv.confを編集し,先ほど設定したドメイン名でサーバに接続できるように設定する.

replica-1$ vi /etc/resolv.conf
# Generated by NetworkManager
search example.home
nameserver 192.168.0.110

その後clientをインストールする。

replica-1$ sudo yum install ipa-client

replica-1端末にIPAクライアントの構築

前準備が終わったら、ipa-client-installipa-clientをインストールする。

replica-1$ ipa-client-install
Discovery was successful!
Client hostname: replica-1.example.home
Realm: EXAMPLE.HOME
DNS Domain: example.home
IPA Server: main.example.home
BaseDN: dc=example,dc=home

Continue to configure the system with these values? [no]: yes

Synchronizing time with KDC...
Attempting to sync time using ntpd.  Will timeout after 15 seconds

User authorized to enroll computers: admin
Password for admin@EXAMPLE.HOME:
Successfully retrieved CA cert
    Subject:     CN=Certificate Authority,O=EXAMPLE.HOME
    Issuer:      CN=Certificate Authority,O=EXAMPLE.HOME
    Valid From:  2018-08-08 05:44:32
    Valid Until: 2038-08-08 05:44:32

Enrolled in IPA realm EXAMPLE.HOME
Created /etc/ipa/default.conf
New SSSD config will be created
Configured sudoers in /etc/nsswitch.conf
Configured /etc/sssd/sssd.conf
Configured /etc/krb5.conf for IPA realm EXAMPLE.HOME
trying https://main.example.home/ipa/json
[try 1]: Forwarding 'schema' to json server 'https://main.example.home/ipa/json'
trying https://main.example.home/ipa/session/json
[try 1]: Forwarding 'ping' to json server 'https://main.example.home/ipa/session/json'
[try 1]: Forwarding 'ca_is_enabled' to json server 'https://main.example.home/ipa/session/json'
Systemwide CA database updated.
Adding SSH public key from /etc/ssh/ssh_host_rsa_key.pub
Adding SSH public key from /etc/ssh/ssh_host_ecdsa_key.pub
Adding SSH public key from /etc/ssh/ssh_host_ed25519_key.pub
[try 1]: Forwarding 'host_mod' to json server 'https://main.example.home/ipa/session/json'
Could not update DNS SSHFP records.
SSSD enabled
Configured /etc/openldap/ldap.conf
NTP enabled
Configured /etc/ssh/ssh_config
Configured /etc/ssh/sshd_config
Configuring example.home as NIS domain.
Client configuration complete.
The ipa-client-install command was successful

インストール完了後、ユーザの初回ログイン時に自動的にホームディレクトリを作成する設定を行う。

replica-1$ authconfig --enablemkhomedir --update

動作確認

replica-1側でadminでkerberos認証をし、freeipaのコマンドを実行してみる。

clinet$ kinit admin
Password for admin@EXAMPLE.HOME:
replica-1$ ipa host-find
ipa host-find
---------------
2 hosts matched
---------------
  Host name: main.example.home
  Principal name: host/main.example.home@EXAMPLE.HOME
  Principal alias: host/main.example.home@EXAMPLE.HOME
  SSH public key fingerprint: SHA256:FgvynoevabTgjUMXVLFkUX+p7FjU675RYnF+ECR1OcA (ecdsa-sha2-nistp256), SHA256:KqiWgKHNRkg6cxzq/5gSZrlLswRUUV3LgTclOn4jXJs (ssh-ed25519),
                              SHA256:6lWEOhQ4jgDVIU0iOTTjMTVtnIkahv3BSLBDxawVU2c (ssh-rsa)

  Host name: replica-1.example.home
  Platform: x86_64
  Operating system: 3.10.0-957.12.2.el7.x86_64
  Principal name: host/replica-1.example.home@EXAMPLE.HOME
  Principal alias: host/replica-1.example.home@EXAMPLE.HOME
  SSH public key fingerprint: SHA256:fqHQMBplDossLc/MjZ7eZc8GHSm4EYIQyGnLM/fknxw (ssh-rsa), SHA256:mZAYwch21Sg095u8O8cmZqsJuSj8FILD4MdZj59s6tw (ecdsa-sha2-nistp256), SHA256:PKr/P33cBMqWV8zljAvtQQR1iDQWmOaBodRbJuyxFZo
                              (ssh-ed25519)
----------------------------
Number of entries returned 2
----------------------------

ipa-serverとipa-clientに属するのホストを表示することができ、問題なくインストールできていることとmainreplica-1ipaの管理下におかれていることが確認できる。

ユーザのSSOの確認

replica-1でユーザを作成し、そのユーザでmainにログインできるか確認してみる。ユーザ追加は以下のコマンドで実施する。

replica-1$ kinit admin # 権限を取得する。
Password for admin@EXAMPLE.HOME:
replica-1$ ipa user-add testuser01 --first=test --last=user --password
Password:
確認のため再び Password を入力してください:
-----------------------
Added user "testuser01"
-----------------------
  User login: testuser01
  First name: test
  Last name: user
  Full name: test user
  Display name: teest user
  Initials: tu
  Home directory: /home/testuser01
  GECOS: teest user
  Login shell: /bin/bash
  Principal name: testuser01@EXAMPLE.HOME
  Principal alias: testuser01@EXAMPLE.HOME
  User password expiration: 20211122233204Z
  Email address: testuser01@example.home
  UID: 1800400009
  GID: 1800400009
  Password: True
  Member of groups: ipausers
  Kerberos keys available: True

ユーザが作成されたことを確認できる。

作成したtestuser01でサーバ側にログインしてみる。

replica-1$ ssh testuser01@main.example.home
Password:
Password expired. Change your password now.
Current Password:
New password:
Retype new password:
Web console: https://main.example.home:9090/ or https://192.168.0.110:9090/

Last failed login: Tue Nov 23 08:33:38 JST 2021 from 192.168.8.112 on ssh:notty
There was 1 failed login attempt since the last successful login.
[testuser01@main ~]$

クライアント側で作成したユーザでサーバ側にログインできることが確認できた。

【資格試験】LPIC Level3 303 Version2

過去にLPIC Leve2を取得していたのですが、その試験の有効性の期限が切そうになっており、有効性を伸ばすためにLPICのLevel3に挑戦してみました。

かなり短期決戦で取り組んだのですが、無事1ヶ月の勉強で取得できたのでその体験記です。

どんなもの?

LPICは、LPIが提供しているLinuxの試験。以下にあるとおり、Level3はLinuxのプロフェッショナル該当するレベルの試験となるらしい。

LPIC-3認定は、Linux Professional Institute(LPI)のマルチレベルプロフェッショナル認定プログラムの頂点です。 LPIC-3は、エンタープライズレベルのLinuxプロフェッショナル向けに設計されており、業界で最高レベルのプロフェッショナルなディストリビューションニュートラLinux認定を表しています。

www.lpi.org

Level3には301(混在環境)、303(セキュリティ)、304(仮想化と高可用性)の3つがあり、今回はそのうちの303(セキュリティ)を受験したものとなる。

301は、業務ではLinuxをメインで扱っている為、混在環境の知識の必要性を感じなかった。また304は、将来的に305(仮想化とコンテナ化)、306(高可用性システムとストレージ)に分割される予定のようで、どうせ更新されるなら305を取得したいなと思いった。 そういった理由から、消去法で303(セキュリティ)を取得することにした。

304は1つのセクションとしてまとめられていたDockerやVagrantが、305ではそれぞれ1つの章として取り扱われるようになったり、Terraformが試験項目になったりと、かなり最新の傾向を取り入れている印象がある。

wiki.lpi.org

試験対策

利用したもの

利用したものは、ping-tとナレッジデザイン社が出版している「 Linuxセキュリティ - LinuC/LPIC3 303 Version 2.0 対応」の教科書。
よくLPIC Level3の学習に黒本が良いと言われているが、社内の合格者から伺ったところ、体系的に知識を学ぶのであればナレッジデザインの書籍を利用すると良いとのアドバイスを頂いたので、私はこちらも購入して勉強した。ナレッジデザインの書籍は、LPI-JAPANにより唯一認定を受けている書籍のようなので、信頼性も高い。

Amazon.co.jp: Linuxセキュリティ - LinuC/LPIC3 303 Version 2.0 対応 - eBook : 有限会社 ナレッジデザイン: 本

他にはLPICの勉強でお馴染みのping-tも契約した。ping-tは1ヶ月だけで契約した。契約期限以内であれば、延長にかかる追加費用が2ヶ月で契約した際の割引が適応されるので、最初から長期間で契約する必要はなさそう。

ping-t.com

学習の仕方

今回は時間もなかったため1ヶ月で取り組んだ。土日も勉強に費やしたため、かなり無理したスケジュールだったと思う。

1ヶ月前

Linuxセキュリティ - LinuC/LPIC3 303 Version 2.0 対応」の書籍を確認しながら、必要に応じて自宅に同じ環境を構築してコマンドの動作の確認を行った。家に空いていたMiniPCがあったので、そこにCentOSを焼き直し、その環境で随時コマンドを実行したりした。

全て実行したわけでなく、各章の章末に「まとめ」の項目があるので、その項目に出てくるコマンドを重点的に実施した。こちらも社内の合格者より「まとめ」の内容が試験に出やすいとのことだったので、その内容にターゲットを絞って実施した。

2週間前

2週間前は復習期間として、各章ごとにping-tの問題を解いた。わからない問題は書籍に戻って復習したり、再度CentOSの環境でコマンドを実行してみたりした。コマンド打ち込む作業はかなり時間が取られるが、実際の動作を体験することができ、記憶の定着に最も役立ったと思う。各章毎にヒット(ping-tで1回正解した状態)が70%を超えたら次の章の学習に取り組むようにした。

入浴中や通勤時間に少しづつ進めて、残った部分は土日にまとめて実施するなどして進めた。

1週間前

1週間前からはひたすら

1)模擬問題の60問を解く 2)解説を熟読 3)疑問に思った場合は書籍で確認

のサイクルを時間の許す限り実行した。解説の熟読はかなり時間がかかるので、コンボ(ping-tで1回正解した状態)以上の問題は解説を読まず飛ばす、ヒットだけ熟読するなどで効率的に回して行った。

こちらも隙間時間で進めていき、1週間で15回は模擬をやった気がする。

最終的なping-tのスコアはこんな感じ。100%を目指すよりも、問題の解説を熟読して中身を理解することに重点を置いていたので進みは遅かった。しかしLPICLevl3では、1つ1つの要素に対する深いな知識の理解が重要だと思うので、多少時間がかかっても、出題される要素の仕組みを腹落ちするまで理解することが重要だと思う。

試験当日

当日は都内のテストセンターで受験。試験問題に関して詳細は言えないが、ping-tの問題のようなオプションを答える問題はほとんどなく、ping-tの問題の解説にある内容を問うような少し捻ったような問題が多く出題された。 どちらかと言うと「Linuxセキュリティ - LinuC/LPIC3 303 Version 2.0 対応」の内容に沿った出題が多い印象にあり、こちらに一通り目を通しておいて良かったと今になって思う。

結果発表

試験結果は、試験終了ボタンを押した瞬間表示される。合格不合格が一瞬で判明するので、ボタンを押す瞬間は非常に緊張したが無事合格していたようで何より。詰め込みがうまく行ったようで、ミスも5問のみ、個人的にはかなり満足いく合格となった。

その後、その日のうちにLPIからメールでpdfの合格証が送られて、正式に合格証を受領した。紙の合格証も6〜8週間で送られてくる様子。

次受ける試験

実はLPICLevel2の取得の際に2回落ちているので、今回受けたLevel3は一発で取得できて安心。

新しくリリースされる305(仮想化とコンテナ化)が数少ないDockerやkubernetesに関する試験で、業務でも利用しており興味がある範囲なので、余裕があればそちら取得したいですね。

【データコンペ】Signate Apple引越し社需要予測をPyCaretで挑戦

はじめに

SotaになっていたSignateのApple引越社 需要予測のコンペに挑戦したので、その記録を書きます。

signate.jp

今回はモデル作成にPyCaretというAutoMLライブラリを利用した。PyCaretは、複数のモデルの構築と評価を少ないコードで簡単に実装できるAutoMLライブラリで、個人的に最近注目しているツール。モデル学習をPyCaretのみで実施してみて、どれだけの順位に組み込めるか試してみた。

先に結果を言うと、PyCaretのみで作成したモデルの評価値は、アップロード時点では150位くらいの結果となった。しかし最近見たら200位まで下がっていたので、現時点でもかなり挑戦している人は多そう。私自身時系列データ分析の経験は少ないのだが、データ数やタスクの難易度的に時系列タスクにちょうど良い課題だったと思うのと、PyCaretの手軽さを実感できた。

f:id:sey323:20211103231802p:plain
ランキング(2021/10/30現在)

実行環境

Home - PyCaret

実施

1. PyCaretを用いたモデル作成

PyCaretを用いたモデル作成を実施する。 SignateからダウンロードしたCSVデータを./dataフォルダに格納し、データをプログラムにインポートする。

import pandas as pd

train_df = pd.read_csv("./data/train.csv")
test_df = pd.read_csv("./data/test.csv")
sample_submit = pd.read_csv("data/sample_submit.csv", header=None)

print(train_df.shape, test_df.shape, sample_submit.shape)
(2101, 6) (365, 5) (365, 2)

学習データが2101件、テストデータが365件あることが確認できる。今回のデータは特徴量が5つしかないので特徴量を作成する。

前処理

元データに含まれる特徴量が5つしかなく非常に少ないので、特徴量を増やす目的で前処理を行う。各値を結合したり統計量を取得することで特徴量作成を行った。特徴量の作成で具体的に行った手法は以下の通り。

  • 日付を月、日、曜日に分割、年は削除
  • 午前の料金と午後の料金の合計(price_am_pm)
  • 料金に関わる3つの特徴量(price_am_pmprice_amprice_pm)のラグ特徴量/リード特徴量

ラグ/リード特徴量は、それぞれ過去未来の地点のデータを利用する特徴量である。本タスクにおけるラグ特徴量の例としては、1週間前の日の周辺3日間の料金区分の統計量を取得しそれをその日の特徴量とする、といったものである。過去の傾向を利用できるので非常に強力な特徴量となりるが、例えば初日のデータが過去のデータが存在しないのでラグ特徴量を取得できない、というように、ラグで遡る日数以内のデータが存在しない端のデータは特徴量が正しく取得できないため、端のデータを取り除くか、許容してデータ分析を行う、といった注意が必要となる。

今回は一括で特徴量が作成できるように、以下のようなpreprocessクラスを作成した。

def preprocess(df, column_name, is_remove=False):
    '''
    文字列型の日付を年月日に分割する
    df           : 対象のデータフレーム
    colume_name  : 対象の行
    is_remove    : 日付を削除するかどうか
    '''
    # 日付から時間を計算
    df[column_name + '_month'] = pd.to_datetime(df[column_name]).dt.month
    df[column_name + '_day'] = pd.to_datetime(df[column_name]).dt.day
    df[column_name + '_dayofweek'] = pd.to_datetime(df[column_name]).dt.dayofweek
    df[column_name] = pd.to_datetime(df[column_name]).dt.strftime('%Y-%m-%d')
    
    # その日の料金の合計
    df['price_am_pm'] = df['price_am'] + df['price_am']
    
    # Lead特徴量
    df = lead_encording(df, 'price_pm',shift_num=1)
    df = lead_encording(df, 'price_am',shift_num=1)
    df = lead_encording(df, 'price_am_pm',shift_num=1)
    df = lead_encording(df, 'price_pm',shift_num=2)
    df = lead_encording(df, 'price_am',shift_num=2)
    df = lead_encording(df, 'price_am_pm',shift_num=2)
    df = lead_encording(df, 'price_pm',shift_num=7)
    df = lead_encording(df, 'price_am',shift_num=7)
    df = lead_encording(df, 'price_am_pm',shift_num=7)

    # Lag特徴量
    df = lag_encording(df, 'price_am',shift_num=7, window_size=3)
    df = lag_encording(df, 'price_pm',shift_num=7, window_size=3)
    df = lag_encording(df, 'price_am_pm',shift_num=7, window_size=3)
    df = lag_encording(df, 'price_am',shift_num=7, window_size=3)
    df = lag_encording(df, 'price_pm',shift_num=7, window_size=3)
    df = lag_encording(df, 'price_am_pm',shift_num=7, window_size=3)
    df = lag_encording(df, 'price_am',shift_num=14, window_size=7)
    df = lag_encording(df, 'price_pm',shift_num=14, window_size=7)
    df = lag_encording(df, 'price_am_pm',shift_num=14, window_size=7)
    df = lag_encording(df, 'price_am',shift_num=28, window_size=7)
    df = lag_encording(df, 'price_pm',shift_num=28, window_size=7)
    df = lag_encording(df, 'price_am_pm',shift_num=28, window_size=7)
    
    if is_remove:
        df = df.drop(column_name, axis=1)

    return df

def lag_encording(df, column_name, shift_num=1, window_size=3):
    '''
    過去地点とその周辺の統計情報を取得
    '''
    df[column_name + '_lag_'+ str(shift_num) + '_' + str(window_size)] = df[column_name].shift(shift_num)
    df[column_name + '_lag_mean_'+ str(shift_num) + '_' + str(window_size)] = df[column_name].shift(shift_num).rolling(window=window_size).mean()
    df[column_name + '_lag_sum_'+ str(shift_num) + '_' + str(window_size)] = df[column_name].shift(shift_num).rolling(window=window_size).sum()
    df[column_name + '_lag_max_'+ str(shift_num) + '_' + str(window_size)] = df[column_name].shift(shift_num).rolling(window=window_size).max()
    df[column_name + '_lag_min_'+ str(shift_num) + '_' + str(window_size)] = df[column_name].shift(shift_num).rolling(window=window_size).min()
    df[column_name + '_lag_median_'+ str(shift_num) + '_' + str(window_size)] = df[column_name].shift(shift_num).rolling(window=window_size).median()
    df[column_name + '_lag_std_'+ str(shift_num) + '_' + str(window_size)] = df[column_name].shift(shift_num).rolling(window=window_size).std()
    return df

def lead_encording(df, column_name, shift_num=1, window_size=3):
    '''
    未来地点とその周辺の統計情報を取得
    '''
    df[column_name + '_lead_'+ str(shift_num) ] = df[column_name].shift(-1 * shift_num)
    df[column_name + '_lead_mean_'+ str(shift_num) + '_' + str(window_size)] = df[column_name].shift(shift_num).rolling(window=window_size).mean()
    df[column_name + '_lead_sum_'+ str(shift_num) + '_' + str(window_size)] = df[column_name].shift(shift_num).rolling(window=window_size).sum()
    df[column_name + '_lead_max_'+ str(shift_num) + '_' + str(window_size)] = df[column_name].shift(shift_num).rolling(window=window_size).max()
    df[column_name + '_lead_min_'+ str(shift_num) + '_' + str(window_size)] = df[column_name].shift(shift_num).rolling(window=window_size).min()
    df[column_name + '_lead_median_'+ str(shift_num) + '_' + str(window_size)] = df[column_name].shift(shift_num).rolling(window=window_size).median()
    df[column_name + '_lead_std_'+ str(shift_num) + '_' + str(window_size)] = df[column_name].shift(shift_num).rolling(window=window_size).std()
    return df

上記のメソッドで、学習データとテストデータに対して前処理を実施する。また、ラグ特徴量を取得するにあたってデータが欠損した、先頭の34行を削除する。

train_x, train_y = train_df.drop("y", axis=1), train_df["y"]

preprocess_all = pd.concat([train_x, test_df])
preprocessed_all_x = preprocess(preprocess_all, 'datetime')

# 前処理を行なったデータを、学習データとテストデータに再分割
preprocessed_train_x, preprocessed_test_x = preprocessed_all_x[:train_x.shape[0]], preprocessed_all_x[train_x.shape[0]:]

# ラグ特徴量により先頭の34行はnullのカラムが複数含まれるので、学習データから取り除く。
preprocessed_train_x = preprocessed_train_x[34:]
preprocessed_train_y = train_y[34:]

前処理の結果を出力してみる。

preprocessed_train_x

f:id:sey323:20211103232718p:plain
作成された特徴量の一部

学習データは2106→2067まで減少したが、136個の特徴量を生成することができた。

モデル構築

PyCaretで学習するために、追加でデータの整形を行う。PyCaretは量的データ・質的データを自動判別し、それに応じた前処理をしてくれるが、稀に量的データであっても質的データと判別されることがある。なので量的変数のカラムを明示的に渡すためにget_column_typeを用いて量的データのカラムを抽出し、それをPyCaretの引数に与える。

def get_column_type(df, type_names: []):
    
    target_column_name = []
    for column_name in df:
        if df[column_name].dtype in type_names:
            target_column_name.append(column_name)
    
    del df, type_names
    return target_column_name


# Pycaretで実行するようにデータの前処理
train_all = pd.concat([preprocessed_train_x, train_y], axis=1)
numeric_columns = get_column_type(train_all, [int, float])
numeric_columns.remove('y')

PyCaretにデータをセットする。

from pycaret.regression import *

# initialize setup
s = setup(data = preprocessed_train_x, 
          target = 'y', 
          fold_strategy = 'timeseries', 
          numeric_features = numeric_columns,
          silent=True,
          fold = 3,
          session_id = 123)

各引数の詳細は以下となる。

引数名 概要
data 対象のDataFrame
target 目的変数。dataに指定したDataFrameに含まれるカラムを指定する必要がある。
fold_storategy 交差検定の手法。今回のタスクのように時系列データを利用する場合はtimeseriesを指定する
numeric_features dataに与えたDataFrameの説明変数のうち、量的データのカラムを明示的に指定する場合、 量的データのカラムの配列を指定する。
silent setup() 実行前、確認するフェイズを挟む場合はFalseを指定する。
fold データの分割数
session_id 乱数シードを固定する場合に任意の数値を指定する

上記の実行が完了すれば、PyCaretで学習するパイプラインにデータがセットされたこととなる。

はじめに、デフォルトの設定compare_models()で全てのモデルで探索をしてみる。sort='MAE'とすることで、MAEの評価値をキーに並び替えることができる。

compare_models(sort = 'MAE') 

f:id:sey323:20211103232807p:plain
pycaretの全探索の結果

出力結果を見ると、特徴量が多いことが影響してか決定木ベースのモデルが性能がよいことが分かる。今回は、その中でも上位のモデルであるgdr(勾配ブースティング木)rf(Random Forest)を対象にパラメータ調整してみる。

パラメータチューニング

PyCaretではパラメータチューニングも簡単に実施できる。tune_model()の引数にチューニングしたいモデルを渡し実行することで、自動でパラメータ調整とモデルの予測結果に対する平均と分散を表示してくれる。さらにoptimize=にチューニングの指標となる評価指標を指定するとその値を最小にするようにチューニングが行われるので、今回はタスクに合わせてoptimize='MAE'とする。

# GDBのパラメータチューニング
gbr = create_model('gbr')
gbr = tune_model(gbr, optimize='MAE')

f:id:sey323:20211103232918p:plain
XGBoostにりょう評価指標

ランダムフォレストもパラメータチューニング。

# RandomForestのパラメータチューニング
rf = create_model('rf')
rf = tune_model(rf, optimize='MAE')

f:id:sey323:20211103232854p:plain
ランダムフォレストによる評価指標

パラメータチューニングを行うことで、ランダムフォレストでもGDBでもMAEのスコアを向上させることができた。今までOptunaとかで頑張ってパラーメター調整していた手間が、だいぶ削減されたのは素晴らしい。

ブレンド

今回はGDBとランダムフォレストのモデルの両方の結果を利用するので、モデルのブレンドを実行する。モデルのブレンドも今までと同様に、ブレンドしたいモデルと評価指標を指定するのみでブレンドが実行できる。

# 2つのモデルをブレンド
blender = blend_models(estimator_list = [rf, gbr], optimize='MAE') 

f:id:sey323:20211103233128p:plain
ブレンドされたモデルによる評価指標

最終的なモデルを固定し、パラメータの詳細値を表示する。

finalized_model = finalize_model(blender)
finalized_model.get_params

get_paramsにより最終的なモデルのパラメータを取得できる。

<bound method _BaseHeterogeneousEnsemble.get_params of VotingRegressor(estimators=[('rf',
                             RandomForestRegressor(bootstrap=True,
                                                   ccp_alpha=0.0,
                                                   criterion='mse', max_depth=9,
                                                   max_features=1.0,
                                                   max_leaf_nodes=None,
                                                   max_samples=None,
                                                   min_impurity_decrease=0.1,
                                                   min_impurity_split=None,
                                                   min_samples_leaf=4,
                                                   min_samples_split=7,
                                                   min_weight_fraction_leaf=0.0,
                                                   n_estimators=100, n_jobs=-1,
                                                   oob_score=False,
                                                   random_state=123, verbo...
                                                       max_features=1.0,
                                                       max_leaf_nodes=None,
                                                       min_impurity_decrease=0.02,
                                                       min_impurity_split=None,
                                                       min_samples_leaf=5,
                                                       min_samples_split=5,
                                                       min_weight_fraction_leaf=0.0,
                                                       n_estimators=230,
                                                       n_iter_no_change=None,
                                                       presort='deprecated',
                                                       random_state=123,
                                                       subsample=0.85,
                                                       tol=0.0001,
                                                       validation_fraction=0.1,
                                                       verbose=0,
                                                       warm_start=False))],
                n_jobs=-1, verbose=False, weights=None)>

可視化

plot_modelを実行することで、学習データと検証データに対するR2係数やその分布を取得できる。

plot_model(finalized_model)

f:id:sey323:20211103233007p:plain
plot1

分布を見ると学習データ検証データともにR2係数が0.97前後の値が計算されているので、非常に精度の高いモデルであることが確認できる。

ブレンドしたモデルのMAEの値と本結果から、テストデータに対して悪くても6前後のMAEを獲得できそうなので、このモデルを最終的なモデルとして提出してみる。

テストデータの予測と提出データの作成

テストデータの予測はpredict_modelで実施する。pycaretで作成したモデルと予測したいデータを入力することで簡単に予測データを作成できる。

# テストデータを予測
proba = predict_model(finalized_model, data=preprocessed_test_x)
proba.tail(100)

最後にデータ整形して提出データを作成する。

# データフレームの整形
submit_df = pd.DataFrame({"Y": proba['Label']})
submit_df.index = sample_submit[0]

import os
from datetime import datetime

# csvに保存
save_folder = "results"
if not os.path.exists(save_folder):
    os.makedirs(save_folder)

submit_df.to_csv("{}/submit_{}.csv".format(save_folder, datetime.now().strftime("%Y-%m-%d-%H%M%S")),header=False, index=True)

提出したところMAEは0.982となっており、検証した結果とは大きく異なっていた。Apple引越し社のyの値をプロットしてみると年ごとに上昇傾向にあり、一般的な決定木ベースではこの上昇の傾向を獲得できないのではと考えられる。

f:id:sey323:20211104223000p:plain
yのプロット結果
https://www.dropbox.com/s/2uwirl9xnr81556/%E3%82%B9%E3%82%AF%E3%83%AA%E3%83%BC%E3%83%B3%E3%82%B7%E3%83%A7%E3%83%83%E3%83%88%202021-11-04%2022.29.36.png?dl=0

以下のブログにあるように、時系列的な傾向を取得するためにProphetを利用した時系列モデリングとかの出力結果を組み合わせると性能向上につながるかもしれない。

lee-ann-al-chan.com

終わりに

AutoMLライブラリであるPyCaretを利用することでモデル構築の手間を削減し、特徴量作成に力を入れることができた。

本気で上位を狙うのであればそれぞれのライブラリを利用するのがベターだと思われるが、さくっと成果を見たかったり、タスクのベースラインを作成する際にはPyCaretを利用するのが便利そう。

次回はもう少し高い順位を狙いたい。