ラベル Python の投稿を表示しています。 すべての投稿を表示
ラベル Python の投稿を表示しています。 すべての投稿を表示

2024年9月29日日曜日

IkaLogの開発で得た知見: ニューラルネットワークを用いたブキ分類器の思想と実装

以前IkaLogの開発で得た知見:大量の動画や画像を取り扱う際の Tips にて、 IkaLog が使用していたブキ認識において、いかに実ユーザ環境からの送信データを活用しながらいかに効率的に教師データを揃えることができたかを紹介しました。

最近は LLM や生成 AI などが話題をもっていきがちですが、独自のモデルを構築してアプリケーションに組み込もうというアイデアで取り組まれている方も増えているのではないでしょうか。

私自身が取り組んだのは 2015-2017年頭の話なので昔話でしかないとはいえ、もしかしたら新規アプリケーションを作られる方の参考になるかもしれない?と思ったので、当時どのような思考で作業をしていたか、もう少しまとめてみる事にしました。

今回は、 IkaLog のブキ認識において、最終的にシンプルなニューラルネットワークに行きついたかについて解説します。

バックグラウンド: IkaLog で目指した性能と、当時のハードウェア制約

精度99%では満足できなかった理由

スプラトゥーンでは、合計8名のプレイヤーがふたつのチームに分かれて勝敗を競います。このときひとりひとりのプレイヤーが100クラス以上のブキのなかからひとつ選んで勝負に挑みます。


ブキにはそれぞれシューター/ローラー/ブラスターなどのクラスの特徴があるほか、攻撃力や距離、連射力で差別化がされています。また、ブキには戦局を有利に進めたり、不利な場面を打開するために活用できるサブウェポンやスペシャルウェポンが用意されています。

IkaLog で処理される勝敗データでは、各プレイヤーがどのメインブキを使用しているか、またサブウェポンやスペシャルウェポンが何であるかを正しく記録することが、戦績データを扱うためのしくみとして重要だと考えていました。

※ この統計自体には IkaLog は使われていません


先述のとおり、一回の対戦において8名のプレイヤーがゲームに参加します。これは 99% の分類器があったとしても、対戦で使われたブキすべてが正しく分類できる確率は 0.99^8 = 92.2% にしかなりません。このため IkaLog ではブキ分類器で 99.9% 以上の精度が必要であろうと考えていました。

スプラトゥーンのリザルト画面からのブキ認識は今のAIの仕組みから考えれば、とても簡単な仕組みです。画面の同じ場所に同じ画像が表示されるだけ。ただ、バグありのビデオキャプチャデバイス、HDMIでさりげなく入るノイズ、ユーザーのセットアップなどの理由で、ソフトウェア観点からみると「同じ画像が毎回表示される」前提ではブキ分類器を実現できませんでした。


当時のハードウェア事情

また、 IkaLog を開発していた 2015 年頃はまだゲーミングPCらしくゲーミングPCを一般的なプレイヤー層は持っていることは想定できませんでした。

ましてや Wii U でスプラトゥーンをプレイしているユーザがとなりに AI で十分な性能を発揮できるようなハードウェアを持っておらず、ゲームをしている人が最新CPU搭載のコンピュータを所有しているかも怪しいです(今でも多くのスプラトゥーンプレイヤーでも余裕のあるPCを持っておらず、むしろスマホのほうが性能が高い可能性すらあるでしょう)。当時の事情でいえば、x86 CPUでの演算において SSE4.2 程度の命令セットしか期待できなかった時期での取り組みでした。私自身が認識できていたユーザには Core 2 Duo で IkaLog を常用している方もいらっしゃいました。

IkaLog がリアルタイムで画像認識をするというコンセプトであったため、当時の記録によると、8キャラクタ分のブキ分類を含めてリザルト画面の解析をおよそ3秒以内で実現できることをひとつの目安としていたようです。


IkaLogの開発においてたどった当初の分類器のしくみや遷移

K近傍法

IkaLog では、当初は OpenCV の K近傍法 の実装を用いて、入力画像に対してもっとも特徴量ベクトルが近い近傍があるクラスに分類するアプローチで実装していました。

開発開始当初はブキ数が30程度だったかと思いますが、アップデート終了までにクラス数が100を超えるほどに増えました。結果論ですが、単純なテンプレートマッチングやSVMなどといったアプローチをとっていたら、実行時間の観点から、より苦労していたと思います(スプラトゥーン2向けの実装では数字などのキャラクタ認識でSVMを利用することも検討し取り組みましたが、このワークロードではKNNほどコスパよくありませんでした)。

KNN の実装は当時の CPU でアイコン程度を分類するのは十分に高速だったほか、分類器の訓練も高速でしたので、開発初期からこのアプローチにたどり着いていたことはとても助かりました。データ量が少ない状態から半自動的に訓練データを集めようとしたときに、いまでもとりあえず使うことが多いです。


色相のヒストグラム

ブキの分類器を作り始めた当初、最初に実装したものは色相のヒストグラムからパターンを見つけて分類が可能かどうかを試していました。


ただ、スプラトゥーンのリザルト画面では、所属チームによりブキ画像の背景にチーム色が映り込み、場合によってはプレイヤーが選んだ装備品もブキに重なります。プレイヤーのキャプチャーデバイスの設定などにも依存することもあり、あまり実用的な精度は出せませんでした。

ラプラシアンフィルタ

輪郭抽出は古典的な画像の分類アプローチでよく使われる手法の一つかと思います。IkaLogでもラプラシアンフィルタを適用し、カラー画像から次元数を削減した特徴量を生成し、ここから分類を行う方法を試していました。一時期のバージョンの IkaLog では実際にこのアプローチで提供していたと思います。

IkaLogのブキ分類のワークロードにおいてラプラシアンフィルタはチームの背景色の影響を排除し、ブキの形状に基づいて分類するには良い方法に思えます。しかし、ユーザの映像キャプチャ環境において元画像が 720/1080p だったり、それを480pにリスケールした映像が投入されたりといったかたちで想定外の入力がされると精度を保てないという問題が生じました。

最終的なチャレンジはスプラトゥーンのアップデートそのものでも発生しました。スプラトゥーンではメインブキに対して、サブウェポン・スペシャルウェポン違いのバージョンとして「カスタム」「コラボ」といった亜種が登場します。これらの亜種ではブキ画像の右下に小さな追加マークが表示されるのですが、ラプラシアンフィルタを介して色相情報を落とした状態でこれらの特徴を合理的に見分けることはできませんでした。



ニューラルネットワーク導入の決断

IkaLog を作り始めて自然と機械学習的アプローチに関わるようになっていたことから、 Cousera の Andrew Ng 先生のコースなども一通り修了していた頃に「もうニューラルネットワークにカラー画像をそのまま入力したほうがいいんじゃないか」考えるようになってきていました。

とはいえ、AlexNet などの既存のニューラルネットワークは100MB以上の重みデータがありますが、さすがにこれは過剰ですし、ユーザがそんなもので推論できるようなプロセッサを持っていませんし、CUDAをユーザのプロセッサで実行できるわけでもありません。このため IkaLog で目的に合わせたニューラルネットワークを実装することを考え始めました。

IkaLog のブキ分類においてニューラルネットワークに画像をそのまま入力することによる一つのメリットは、ニューラルネットワークであれば背景色などを無視できることがあります。チームカラーによって何色になるかわからないようなピクセルに基づく入力値は、結果的に無視されるようになります。ブキの形状によって適切な重みが自動的に形成されることを想定できたので、おそらく簡単にうまくいくだろうと思いました。


HSV色空間

RGBとHSVでしっかり比較したわけではないのですが、ブキ分類器での分類対象ではHSV色空間で取り扱ったほうがよいだろうと判断したので、何も考えずにHSV色空間を特徴量として使用しています。

HSV色空間を利用しようと思った最大の理由は、ブキの「カスタム」「コラボ」といった亜種の特徴を表現する色相がピクセルあたりひとつのパラメータで表現されることになるので、おそらくRGB色空間で扱われるよりもいいだろう、ぐらいにしか考えていませんでした。背景の色相を無視するという観点でも重みが小さくなることで簡単に表現できるでしょう。ここについては「こうなってくれたらいいな」という思想でしかなくて、現時点でもそう思っているだけで、これによる差があったかどうかは何も検証していません。


ネットワーク構成をシンプルに

使用するレイヤとしては単純な全結合とReLUに制約することにしました。理由はふたつあり、一つ目は計算量、二つ目は再実装のしやすさです。

計算量の観点では、畳み込みフィルタなども考えましたが、当時 MacBook Pro (2014) とその上の GeForce チップ、また Haswell Refresh プロセッサで走るニューラルネットワークの速度を見ていると、 CNN やプーリングをエンドユーザのプロセッサで実行させることは現実的ではないだろうと感じていました。単純な全結合と ReLU 程度であれば、当時で型落ちとなっていたプロセッサ上でも NumPy やその下位のライブラリが現実的なスピードで動いてくれるだろうと期待しました。

再実装の観点では IkaLog に取り組んでいた当時でニューラルネットワークを動かそうと思うと、 Caffe を使うとか、もしくは Chainer を使うとか、そういったいくつかのフレームワークを利用する方法でした。 ONNX ランタイムみたいなものはまだ出てきておらず、想定するユーザ層が中学生・高校生・大学生や社会人で、主に Windows ユーザであろうことを考えると、既存のフレームワークを IkaLog のためにセットアップさせるのは不可能でしょう。

IkaLog は zip ファイルを展開して実行すれば使える状態の配布形態を維持することを心がけていたので、ブキの分類器のためにフレームワークへ依存を追加することにためらいました。このため、シンプルなネットワーク構成とすることで、IkaLog用に推論コードを作成するコストを最低限に抑えることにしました。


実装を進める前の事前確認を Azure Machine Learning で実行

それまでの取り組みである程度のデータ量は確保できていたので、まずは手元のデータセットを用いて最低限の作業でアプローチを検証するため、 Azure Machine Learning に想定する特徴量をアップロードして、 MLPで期待するようなモデルが実現するのかを確認しました。

Azure ML はこの程度のワークロードであればファイルをアップロードしポチポチするだけでいいですし、 confusion matrix などもさくっと出してくれるので、アプローチ上問題がないことを簡単に確認できましたしコードを具体的に書く前に最低限の労力で検証できたことはとても助かりました。なおこの Azure ML の体験談は 2016 年頃当時の話であることに留意してください。





学習済みモデルのインポート、推論

実際の学習は Chainer と GeForce GTX 1080 (後半は Tesla P100)で行いました。そもそもどれぐらいのノード数で性能が飽和するかを Chainer 上で検証し、ネットワークの隠れ層のサイズを決めました。

本番の学習は 24 時間などのオーダーで1000エポック以上回したような覚えがあります。 Chainer のチェックポイントとして得られたものをいくつか評価して使用するモデルをきめました。

Chainer フレームワークからネットワークの重み・バイアスを取り出して、単純な NumPy コード上で推論できることを確認できたので、モデルをただのマトリックスとして pickle してファイルに保存、そこからモデルを復元・推論することで、機械学習フレームワークへの依存を断ち切りました。

実際に生成できた実行用モデルファイルをみてみると15MBほどになっていました。この中には32ビット浮動小数点数が並んでおり、zipなどでの圧縮効果がほとんどありません。配布ファイルが大きくなることを嫌って、pickcleする際に16ビット浮動小数点数として扱うことによってファイルサイズを半分に抑えることにしました。このワークロードとモデルにおいて浮動小数点数のビット数を抑えても実用上の影響はほとんど感じられません。最終的に戦績共有サイト stat.ink に投稿されるブキ画像を 99.99% で分類できる精度が得られました。


推論の実装

先述の理由で、 IkaLog ではフルスクラッチかつ最低限のコード量で推論を再実装しました。ここで実装した内容は、のちに発売されるオライリーの「ゼロから作る Deep Learning」の最初の100ページで解説された内容そのものともいえるかと思います。

このブキ分類器は、すでに型落ちとなっていた IvyBridge 2.0 の MacBook Air でも 0.02 秒で実行できました。Core 2 Duo などでも十分な速度で動きましたし、 PYNQ (ARMコア搭載 FPGA)でも1回あたり200ms未満の実行速度で収まりました。

FPGAならPLで実行すりゃもっといけるだろとかそういうツッコミはいくらでも可能かと思いますが、技術的には可能ですが、非営利の独りプロジェクトでここまでやれば十分かなと思っています。


おまけ

当時やってみたかったこと

ネットワークの蒸留や枝切りをしてより配布ファイルのサイズを小さくできるのではないかと考えていましたが、着手しませんでした。

単に手が回っていなかったほか、ネットワークの規模が小さくなったときに、どのクラスにも該当しない画像を特定のクラスに分類してしまう可能性などを恐れていたように思います(実際取り組んだらどうなったかはわかっていません)。


最近のエコシステムに思うこと

ここまでの内容を2017年以前に取り組んだ後、こんなフレームワーク便利だなと思ったものが幾つかでてきました(現時点の選択肢として筋がいいと言いたいわけではありません)。

  • ONNX Runtime の登場によりホストプログラムが雑に使える推論ライブラリが出てきたという印象
  • Intel が OpenVINO や Movidius VPU を出してきて、 Windows PC でハードウェア支援が期待できるようになった。最悪 AMD CPU でも SSE 相当で動くっぽい
  • OpenCV に DNN に対する推論機能が強化されており、配布方法を工夫すれば GPU アクセラレーションなどをホストプログラムから呼び出せるかもしれない。

さらに、2017年の iPhone X から Neural Engine が搭載され、 Android にも同じように推論エンジンがハードウェアとして搭載されるようになりました。 Mac であれば M1 から、 Windows でも Copilot PC が出てきました。

ようやく OS レベルの推論の抽象化が進んできた

ライブラリの観点では、Windows であれば DirectML 、 Apple であれば CoreML 、Androidであれば MLKit などが普通に使えるようになってきて、私が IkaLog を作るときに困った「推論のための仕組みがない。ターゲットごとに実装してられない」という状況に大きな変化が起きているように感じています。特に個人的には DirectML は(ごくたまにしか試してませんが) Windows 環境において OS が推論ワークロードをハードウェアで実行してくれるという、まさに OS らしい抽象化をしてくれるようになりました。

私自身はふだんほとんどプログラムを書かないのですが、ローカルで推論をするアプリケーションの開発難易度は10年前から比べると大きく下がってきたなと感じています。

ローカル LLM の話題などもみかけますが、 Copilot PC の話題などをみつつ 2024 年は、ローカルで推論をするタイプのアプリケーションの開発が加速する年になるだろうなと思っています。2025 年になるとウイルス対策ソフトすら推論用アクセラレータにオフロードするような世界がくるのかもしれませんね。

2022年1月1日土曜日

OpenCVでサイゼリヤの「超難解 大人の間違い探し」を台無しにする



 あけましておめでとうございます。


先日、久々にサイゼリヤで外食をした際、レジにて間違い探しを配っているのに気づきました。自分がこれを手にした瞬間に頭のなかで間違いを見つけるプログラムができたのですが、数日後の深夜に実際にコードに落として、動くものができました。

今回は、どのようにしてこの出力を得たかについて、具体的なコード例とあわせて紹介します。


基本的な考え方

サイゼリヤの間違い探しは、左右反転した状態で絵の違いを見つけるものです。今回はAKAZE特徴量を用いてふたつの画像を重ね合わせた上で、差異部分に色をつけてマークしていくことにします。


入力画像を用意する

今回サイゼリヤで入手した間違い探しはA4サイズのコピー用紙に印刷されたものです(配達のときに同梱したりしているようですね)。後でAKAZE特徴量のアルゴリズムで取り扱うため、きちんと平面がでた状態でスキャンします。



今回は手元にあったドキュメントスキャナ(Canon ImageFormla DR-C125)を使いましたが、きちんと平らにできれば、携帯電話のカメラを使ったりしてもいいですし、コンビニのコピー機で適当な形式で保存してきても構いません。

また、ソースの特性を考慮し、可能な限り、白・黒に二値化します。必須ではないのですが、これをしておくことで後の処理がしやすくなるでしょう。二値化はPhotoshop, GIMPなどの画像編集ツールやドキュメントスキャナのスキャン時設定にて行えるでしょう。

入力画像を2ファイルに分割する



img1.jpg, img2.jpg として対象の画像を保存します。今回は macOS 標準のスクリーンショット機能を用いて、とても雑にファイルを生成しました。後でAKAZE特徴量を用いて重ね合わせるつもりですので、ここで傾きや大きさ、ピクセルのシフト量など、正確さを求める必要はありません。今回は JPEGを使用しましたが、 OpenCV が対応する形式であれば PNG などでも構いません。


作業環境

今回は Juypter Notebook 上で、 Python 3.8 の環境に OpenCV 4.x をインストールし作業していきます。 OpenCV は下記操作でインストールするとよいでしょう。 opencv-contrib-python パッケージをインストールすることで、のちに使う AKAZE 特徴量などのアルゴリズムもすぐ使える状態になっています。

!pip install opencv-contrib-python
画像の読み込み
cv2.imread() で画像を読み取って ndarray にします。元画像は原則モノクロですし、面倒を減らすため、読み込み時に1チャンネルのグレースケール画像としておきました。このほうが後での画像のマッチングや比較も簡単です。
import cv2
import numpy as np
from math import sqrt
img1 = cv2.imread("saizeriya_akaze/img1.jpg", cv2.IMREAD_GRAYSCALE)
img2 = cv2.imread("saizeriya_akaze/img2.jpg", cv2.IMREAD_GRAYSCALE)

img1 と img2 は左右反転していますので、 img2 を左右反転させておきます。

img2 = cv2.flip(img2, 1)
2枚の画像を重ね合わせる

AKAZE特徴量を使って画像をマッチングします。

akaze = cv2.AKAZE_create()
kpts1, desc1 = akaze.detectAndCompute(img1, None)
kpts2, desc2 = akaze.detectAndCompute(img2, None)
matcher = cv2.DescriptorMatcher_create(cv2.DescriptorMatcher_BRUTEFORCE_HAMMING)
nn_matches = matcher.knnMatch(desc1, desc2, 2)
matched1 = []
matched2 = []
nn_match_ratio = 0.8 # Nearest neighbor matching ratio
for m, n in nn_matches:
    if m.distance < nn_match_ratio * n.distance:
        matched1.append(kpts1[m.queryIdx])
        matched2.append(kpts2[m.trainIdx])
inliers1 = []
inliers2 = []
good_matches = []
inlier_threshold = 1000 # Distance threshold to identify inliers with homography check
for i, m in enumerate(matched1):
    col = np.ones((3,1), dtype=np.float64)
    col[0:2,0] = m.pt
    dist = sqrt(pow(col[0,0] - matched2[i].pt[0], 2) +\
                pow(col[1,0] - matched2[i].pt[1], 2))
    if dist < inlier_threshold:
        good_matches.append(cv2.DMatch(len(inliers1), len(inliers2), 0))
        inliers1.append(matched1[i])
        inliers2.append(matched2[i])

これで img1 と img2(左右反転) 画像の特徴を対応が得られます。これを画像として出力してみます。

res = np.empty((max(img1.shape[0], img2.shape[0]), img1.shape[1]+img2.shape[1], 3), dtype=np.uint8)
cv2.drawMatches(img1, inliers1, img2, inliers2, good_matches, res)
cv2.imwrite("saizeriya_akaze/result.png", res)


コピー&ペーストでソースコードをいじった時に少しおかしくなったように見えるけど、実用上問題なさそうなのでヨシ。

もちろん、間違い探しという特性上、画像が意図的に異なるので、マッチしない特徴もあるわけですが、十分な数の特徴量がマッチすれば、やりたい事に対して問題はありません。

この特徴量をもとに2枚の映像を重ね合わせます。 img1 の位置を基準とし、 img2 が img1 に重なるようにワープフィルタをかけた img3 を得ます。この処理は IkaLog というプログラムから一部を持ってきました。そのため画像名にキャリブレーションイメージ、キャプチャイメージという名前が付いていますが、そのまま利用しています。
Acalibration_image_gray = img1
capture_image_gray = img2
matcher = cv2.BFMatcher(cv2.NORM_HAMMING)


capture_image_keypoints, capture_image_descriptors = \
    akaze.detectAndCompute(capture_image_gray, None)
calibration_image_keypoints, calibration_image_descriptors = \
    akaze.detectAndCompute(calibration_image_gray, None)

print('caputure_image - %d features' % (len(capture_image_keypoints)))
print('matching...')

def filter_matches(kp1, kp2, matches, ratio=0.75):
    mkp1, mkp2 = [], []
    for m in matches:
        if len(m) == 2 and m[0].distance < m[1].distance * ratio:
            m = m[0]
            mkp1.append(kp1[m.queryIdx])
            mkp2.append(kp2[m.trainIdx])
    p1 = np.float32([kp.pt for kp in mkp1])
    p2 = np.float32([kp.pt for kp in mkp2])
    kp_pairs = zip(mkp1, mkp2)
    return p1, p2, kp_pairs
raw_matches = matcher.knnMatch(
    calibration_image_descriptors,
    trainDescriptors=capture_image_descriptors,
    k=2)

p1, p2, kp_pairs = filter_matches(
    calibration_image_keypoints,
    capture_image_keypoints,
    raw_matches,)

if len(p1) >= 4:
    H, status = cv2.findHomography(p1, p2, cv2.RANSAC, 5.0)
    print('%d / %d  inliers/matched' % (np.sum(status), len(status)))
else:
    H, status = None, None
    print('%d matches found, not enough for homography estimation' % len(p1))
    raise Exception()

assert H is not None

if len(status) < 1000:
    raise WarpCalibrationNotFound()

calibration_image_height, calibration_image_width = img1.shape

corners = np.float32(
    [[0, 0],
     [calibration_image_width, 0],
     [calibration_image_width, calibration_image_height],
     [0, calibration_image_height]])

pts1 = np.float32(cv2.perspectiveTransform(
    corners.reshape(1, -1, 2), H).reshape(-1, 2) + (0, 0))
pts2 = np.float32([
    [0, 0],
    [calibration_image_width, 0],
    [calibration_image_width, calibration_image_height],
    [0, calibration_image_height]])

print('pts1: %s' % [pts1])
print('pts2: %s' % [pts2])

M = cv2.getPerspectiveTransform(pts1, pts2)
 
img3 = cv2.warpPerspective(img2, M, (calibration_image_width, calibration_image_height))
これにより img1 とできるだけマッチするようワープフィルタを適用した img3 が得られました。


img1 と img2 の縦横ピクセル数は一致していないのですが、 img1 にあわせてワープフィルタをかけた img3 は img1 と同じピクセル数になっているので、このことを確認しておきます。

こんな面倒なことをしなくても、他のツールを使ってあらかじめ重ね合わせておくことも可能です。ただ、この重ね合わせ処理含めて実装したので、サイゼリヤの間違い探しの新しいバージョンが出ても、入力画像を差し替える以上の労力をかける必要はなくなりました。


画像の差分をとる

img1 と img3 の差の絶対値をとります。これにより1プレーン8ビット(256色調)色調の画像に対して、差がなければ0、差があれば255の値が得られます。なお img1 および img3 は符号なし8ビット整数の1プレーン モノクロ画像ですが、差を取るにあたり -255 ~ 255 までの範囲をとりうるため、 符号付き32ビット整数として扱っています。絶対値をとった時点では 0~255 に収まっているため、 img_abs は 0~255 の符号付き32ビット整数です。

img_abs = abs(img1.astype(np.int32) - img3.astype(np.int32))

img_abs の内容をモノクロ画像として出力した例を参考までに掲載しておきます。 img1, img2 で差があった部分が白く浮き上がっていることがわかるでしょう。



差分強調画像を作成し出力する

最終結果を出力します。ここではimg1を基として答えの画像を作成します。この画像は HSV フォーマットとして扱うことにします。HSVとは Hue(色相), Saturation(彩度)、明るさ(Brightness) の3要素で色を表現します。モノクロ画像をHSV色空間に変換していますので、S=0、V=0~255, Hについては未定義(結果としてゼロが入っているが)という状態になります。

img_bgr = cv2.cvtColor(img1, cv2.COLOR_GRAY2BGR)
img_hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV)

このうち彩度のプレーンを、先ほど得た img1 と img3 の絶対値 img_abs で置き換えます。差のある部分はほぼ 0 、差がある部分だけ彩度があがるため、結果として img1 の画像に対して、 img1 と img3 の差がある部分には何らかの色が付きます。どのような色がつくかは色相チャンネルによりますが、今回 cv2.cvtColor() はこのチャンネルを0で初期化していますので、これに対応して赤色になります。

img_hsv[:, :, 1] = img_abs

生成した画像を cv2.imwrite() で画像ファイルに落とします。 cv2.imwrite() は BGR フォーマットのカラー画像をとりますのでHSV色空間からBGR色空間へ変換も行います。

cv2.imwrite("saizeriya_akaze/result.png", cv2.cvtColor(img_hsv, cv2.COLOR_HSV2BGR))

出力結果


今年もよろしく御願いします。


2021年5月28日金曜日

A Puzzle A Day のソルバーを書いてみた(6/20更新)

更新内容

6/20 ... C言語版について追記しました。

面白そうなパズルとの出会い

Twitter で A Puzzle A Day というカレンダーパズルを見かけた。


このパズルは1月〜12月、1日〜31日までのセルがあり、任意の日付(たとえば5月28日だとか6月1日だとか)をパズルを通して表現できる、というもののようだ。テトリスにでてきそうなブロックを組み合わせて、目的の日付にかかわる部分以外をキレイで埋められれば完成。毎日パズルが楽しめるし、1年後に同じ日付の答えを得ようとしても改めて楽しめそうで、とても面白そうだ。

久々に「このパズルは私も欲しい!」と思った。

国内の販売店が普通に売ってるようなものだったらうっかりオーダーしていたかもしれない。どうやらノルウェーのほうでレーザープリンタを用いて作ってるような感じで、持っている人は少ないのではないか。 Twitter タイムラインをみると紙やら木材のレーザーカットやら思い思いの方法で手作りをして楽しんでいる人もいるようで、私も何か作ろうと思った。

その結果として、ソルバーができた。

https://github.com/hasegaw/puzzle-a-day-solver


ソルバー?

ソルバー(Solver)というのは答えを求めるツールだ。たとえば今回のソルバーの場合、5月28日であればこうすると良い感じにブロックがはまりますよ、というのを教えてくれる。このパズルは365通りの目標があるほか、それぞれに複数の解法があるので、趣味などとしてソルバーを書いてみても楽しいだろうと思って、作ってみた。

私はコンピュータサイエンスを学んだエンジニアでも競プロ勢でもアルゴリズムに強いわけでもないので、ここで紹介するアルゴリズムについてはツッコミどころもあると思う。この記事の内容は、とりあえず趣味で簡単なプログラムを書いているだけでも、このパズルのソルバーはこれぐらいの考えで作れる内容だよ、という紹介ということでご理解いただきたい。


ルールの確認

このパズルのボードは下記の構造になっている。



左が、パズルの舞台となる盤面で、ここにブロックを詰めていくことになる。黒色のセルはブロックが置けない場所で、オレンジ色のセルは、一例として5月28日を表現する場合においてブロックで隠れてはいけないセルだ。

右側に示した 8つのブロックを盤面に並べていく。各ひとつ使えるのだが、各ブロックは回転させても良いし、反転させても良いようだ。なので、ひとつのブロックでも盤面上では最大で8つの姿をとりうる。



盤面の黒色セルおよびオレンジのセルに重ならないように、これらの8つのブロックを配置できたらパズルが解けたことになる。


基本的な考え方

ボードサイズは7x7のマトリックスで表現できる。ここで、ブロックを置いてはいけないセルを 1 、それ以外を 0 としたマトリックスを作る。ルール上絶対にブロックが乗ってはいけない部分のみ 1 としたマトリックスの例、それに加えて目的の日付セルも 1 としたマトリックスの例を示す。



ここで黒色セルと日付セルをわけて考えてもよいのだが、候補を見つけていくには、黒色セルおよび日付セルの両方がフラグされている後者のマトリックスが基点になる。

これに対して、特定のブロックを特定の回転・反転・位置の条件で配置する場合に、どのセルにあたるかを示す行列(以後、候補とよぶ)を作成する。ここではふたつの例を示す。


候補がボードにはまるかどうかは、現状のボードの状況を示すマトリックスと候補のマトリックスを加算してみればわかる。整数値の加算結果として 1 を超えるセルが発生する場合は、ブロックと何かがぶつかってしまい、そこに当てはめられない事を意味する。(ここで、後の最適化で触れる論理積を使ってもいい)


当てはめられる候補を見つけたら、再帰的に他のブロックも当てはまる所を探していって、全てのブロックを使い切ったら解法が見つかったことになる。



実際のところ、これを適当に置いていってもブロックが素直に収まってくれる可能性はかなり低い。黒色セルとオレンジの日付セルを除いて全てのセルをブロックで埋めないと答えにたどり着かないことを利用して、マトリックスのふち側のセルから優先して埋める。これにより、隙間の発生により絶対に解法にたどり着けないことが自明な候補を探索対象から外すことができる。



私の実装では左上から右へ、行が埋まったら次の行へ、という順番でセルを埋めるようにしている。次に埋めるセルを見つけるには、ボードの最新盤面におけるゼロのセルを見つければよい。候補がそのセルを埋めてくれるかは、その候補において目的のセルが 1 になっているかを見ればよい。候補が目的セルを埋めなくても解法に繋がる可能性はあるが、後に別のセルを埋める際に改めて評価されるので、ここでは候補から除外してよい。とにかくフチから埋めて、ボード上に隙間ができない候補から試したほうがよいからだ。

これを繰り返していくと、ボード上のセルでゼロがなくなり(すべて 1 になる)、未使用ブロックもなくなっているはずだ。この方法で総当たりしていけば、解法にたどり付ける。

実装については Pyhthon + NumPy を利用した。 NumPy で上記において必要となるマトリックス演算のほとんどが実現できるほか、この後に触れるベクタライゼーションを意識した記述にすることで、さらなる高速化が期待できる。


ベクタライゼーション (Vectorization)

上記のアルゴリズムは、どのようなプログラムで書いてもよい。私は Python を使用したが、別に Ruby を使ってもいいし、 JavaScript/TypeScript でも、 Perl でも、 Golang でも C でも Pascal でも、 C# でも、 Java でも、 BASIC でもなんでもよい。

ただ、このような処理を愚直にループ処理で書いても、コンピュータはこれを速く処理ができない。このため、コンピュータが高速に処理できるような、まとまった行列演算にできるだけ落とし込んでいく。たとえば盤面に対して次の候補を探すためにマトリックスを加算する場合に、まとめて加算をしたほうが速い。

もちろん加算すべきは ボードの現状 × 候補の数 なので計算すべき量は一緒なのだが、 Python コードとして繰り返しを書くよりも、 NumPy などの C 言語ベースで書かれたライブラリ内でループをしたほうが速い。場合によっては、バックエンドのライブラリ内において、 CPU の SIMD 命令で計算することにより、より実行時間を短縮してくれる可能性がある。

具体的には、 10x10 のボードに対して 10x10 の候補が 10,000 あった場合、 Python レベルで10,000回加算するよりも、 10000x10x10 の候補に対して 10x10 のマトリックスを加算するほうが速く実行できる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import cProfile
import numpy as np

mat1 = np.zeros((10000, 10, 10), dtype=np.uint8)
mat2 = np.zeros((10, 10), dtype=np.uint8)

mat1[:, :, :] = 3
mat2[::, :] = 3


def func1():
    mat1 + mat2


def func2():
    for m in mat1:
        m + mat2


if __name__ == '__main__':
    cProfile.run('func1()')
    cProfile.run('func2()')

% python bench.py
         4 function calls in 0.000 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    0.000    0.000 :1()
        1    0.000    0.000    0.000    0.000 bench.py:12(func1)
        1    0.000    0.000    0.000    0.000 {built-in method builtins.exec}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}


         4 function calls in 0.005 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    0.005    0.005 :1()
        1    0.005    0.005    0.005    0.005 bench.py:15(func2)
        1    0.000    0.000    0.005    0.005 {built-in method builtins.exec}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}


また、大抵の場合、 for ループなどを使用し最小値を探すよりも、 NumPy.argmin() などのライブラリ関数を活用したほうが速い。

このような観点でベクタライゼーションをある程度進めたところ、当初手元のコンピュータ(MacBook Pro 2019, 2.5GHz)では 600秒(10分)かかっていた探索が60秒以内で完了するようになった 。


バイナリ化

ベクタライゼーションにより特定の日付向けの解法が60秒以内に得られるようになったが、もう少し速くしてみようかなと思い、内部表現をバイナリ化してみることにした。

ここまでの作業は (7, 7) の大きさのマトリックスで、各要素は 8bit の整数値として扱ってきた。しかし 7x7 というサイズで、上記アルゴリズムでは基本的に 0 と 1 のみ(衝突検出のみ 2 が登場)で成り立っているので、2進数で表現できる。これにより 7x7=49バイトで表現していた盤面や候補が8バイトで表現できるようになり、メモリ上の表現が7分の1になること、uint64(64ビット符号なし整数)表現でのビット演算は現在のプロセッサだと1命令で実行できる。大幅な高速化が望めるだろう。

盤面の各ビットを以下のように割り当てることにした。最上位ビットから利用して最下位ビット側を未使用にしているのは若干気持ち悪く見えるかもしれないが、当初はブロックをシフトした際の範囲外判定のために右側のビットを残しておこうと思ったのでこのような割り当てになっている(結局使わなかったが)。気に入らなければ後述するルックアップテーブルの値を調整すればよい。

追記)このようにビットアサインするつもりだったが、実際には違うビットアサインをしていた。上記のビットアサイン表は参考例としてそのまま掲載しておく。

既存のソースコードの (7, 7) のマトリックスの内部表現からこのバイナリ64ビットの内部表現に変換するため、以下のような関数を定義した。本格的なループ処理の前後で使われるだけなので、かなり適当な書き方だが、ここでの実行時間は大したことないので問題ない。

def generate_mat2bin_lut():
    lut = np.zeros((7, 7), dtype=np.uint64)
    for y in range(7):
        for x in range(7):
            b = 8 * y + x
            lut[y, x] = base_bit >> (63- 8 * y - x)
    return lut

def mat2bin(mat):
    assert mat.shape[0] == 7 and mat.shape[1] == 7
    lut = mat2bin_lut.copy()
    lut[mat == 0] = 0
    return np.sum(lut) # as np.unit64

def bin2mat(b):
    mat = np.zeros((7, 7), dtype=np.uint8)
    for y in range(7):
        for x in range(7):
            if (b & mat2bin_lut[y, x]):
                mat[y, x] = 1
mat2bin_lut = generate_mat2bin_lut()

二進数について理解している人なら(もっといいやりかたがあるよという指摘はいくらでもあるだろうが)何をしているか判ると思うので、詳細は省略する。

これで mat2bin() によりマトリックス表現からバイナリ表現に変換し、 bin2mat() によりバイナリ表現からマトリックス表現に変換できるので、既存コードにある初期値や出力用の関数はほぼそのまま使い回せる。

バイナリ表現化したことで盤面のほか候補についても64bitで表現できるようになったし、いっそのこと、あらかじめ8つのブロックの方向・反転・位置スライドの全パターンをメモリ上に展開しておくことにした。たとえばひとつのブロックの候補は以下のような196のuint64値で表現できる。

000000000000030e 000000000000061c 000000000000070c 0000000000000c07 0000000000000c38 0000000000000e03 0000000000000e18 000000000000180e 0000000000001870 0000000000001c06 0000000000001c30 000000000000301c 000000000000380c 0000000000003860 0000000000006038 0000000000007018 0000000000030e00 0000000000061c00 0000000000070c00 00000000000c0700 00000000000c3800 00000000000e0300 00000000000e1800 0000000000180e00 0000000000187000 00000000001c0600 00000000001c3000 0000000000301c00 0000000000380c00 0000000000386000 0000000000603800 0000000000701800 0000000001010302 0000000001030202 0000000002020301 0000000002020604 0000000002030101 0000000002060404 00000000030e0000 0000000004040602 0000000004040c08 0000000004060202 00000000040c0808 00000000061c0000 00000000070c0000 0000000008080c04 0000000008081810 00000000080c0404 0000000008181010 000000000c070000 000000000c380000 000000000e030000 000000000e180000 0000000010101808 0000000010103020 0000000010180808 0000000010302020 00000000180e0000 0000000018700000 000000001c060000 000000001c300000 0000000020203010 0000000020206040 0000000020301010 0000000020604040 00000000301c0000 00000000380c0000 0000000038600000 0000000040406020 0000000040602020 0000000060380000 0000000070180000 0000000101030200 0000000103020200 0000000202030100 0000000202060400 0000000203010100 0000000206040400 000000030e000000 0000000404060200 00000004040c0800 0000000406020200 000000040c080800 000000061c000000 000000070c000000 00000008080c0400 0000000808181000 000000080c040400 0000000818101000 0000000c07000000 0000000c38000000 0000000e03000000 0000000e18000000 0000001010180800 0000001010302000 0000001018080800 0000001030202000 000000180e000000 0000001870000000 0000001c06000000 0000001c30000000 0000002020301000 0000002020604000 0000002030101000 0000002060404000 000000301c000000 000000380c000000 0000003860000000 0000004040602000 0000004060202000 0000006038000000 0000007018000000 0000010103020000 0000010302020000 0000020203010000 0000020206040000 0000020301010000 0000020604040000 0000030e00000000 0000040406020000 000004040c080000 0000040602020000 0000040c08080000 0000061c00000000 0000070c00000000 000008080c040000 0000080818100000 0000080c04040000 0000081810100000 00000c0700000000 00000c3800000000 00000e0300000000 00000e1800000000 0000101018080000 0000101030200000 0000101808080000 0000103020200000 0000180e00000000 0000187000000000 00001c0600000000 00001c3000000000 0000202030100000 0000202060400000 0000203010100000 0000206040400000 0000301c00000000 0000380c00000000 0000386000000000 0000404060200000 0000406020200000 0000603800000000 0000701800000000 0001010302000000 0001030202000000 0002020301000000 0002020604000000 0002030101000000 0002060404000000 00030e0000000000 0004040602000000 0004040c08000000 0004060202000000 00040c0808000000 00061c0000000000 00070c0000000000 0008080c04000000 0008081810000000 00080c0404000000 0008181010000000 000c070000000000 000c380000000000 000e030000000000 000e180000000000 0010101808000000 0010103020000000 0010180808000000 0010302020000000 00180e0000000000 0018700000000000 001c060000000000 001c300000000000 0020203010000000 0020206040000000 0020301010000000 0020604040000000 00301c0000000000 00380c0000000000 0038600000000000 0040406020000000 0040602020000000 0060380000000000 0070180000000000

この値を bin2mat でデコードしてみるとブロックが見えてくる。

% python
Python 3.8.1 (default, Mar 12 2020, 17:27:26)
[Clang 11.0.0 (clang-1100.0.33.17)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import puzzle_a_day_solver
>>>
>>> puzzle_a_day_solver.bin2mat([0x000000000000030e])
array([[0, 1, 1, 1, 0, 0, 0],
       [1, 1, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0]], dtype=uint8)
>>> puzzle_a_day_solver.bin2mat([0x000000000000061c])
array([[0, 0, 1, 1, 1, 0, 0],
       [0, 1, 1, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0]], dtype=uint8)

8ブロック合計では、重複を除くと合計1196パターンあり、正味9568バイトのデータ量に収まるので、全てのパターンを予め展開しておいても、Core 2 DuoのプロセッサでももしかするとL1キャッシュに収まってしまうかもしれないデータ量だといえる(プロファイリングしていないので実際にどうなっているかは判らない)。

ブロックの衝突チェックは、従来方法では足し算をしていたが、今回は論理積(AND)を使用する。盤面 AND 候補を計算した結果、重複ビットがあるとゼロ以外の値になるので、論理積の結果がゼロ以外の候補は除外する。ここでも NumPy による vectorization を行う。

    blocks_f2_masked = blocks_f1_uint64 & mat_uint64
    blocks_f2_bool =blocks_f2_masked == 0
    blocks_f2_uint64 = blocks_f1_uint64[blocks_f2_bool]

特定セルを埋めてくれる候補かどうかを判断するためには、特定セルを示すビットが 1 である候補に絞り込めばよい。 mat2bin/bin2mat 向けに作った、各セルに対するビットを示すルックアップテーブルから 64bit のビットマスクを得て、先と同様にフィルタリングする。

    bit_pos = mat2bin_lut[pos[0], pos[1]]
    blocks_uint64_masked = blocks_uint64 & bit_pos
    blocks_f1_bool =blocks_uint64_masked != 0
    blocks_f1_uint64 = blocks_uint64[blocks_f1_bool]

基本的なアルゴリズムは変えずに内部表現だけ変更しているので、得られる結果は一緒だが、49秒かかっていた処理が3.5秒まで短縮できた。

単純な計算部分だけで考えればもっと速くなる(1秒は切れるだろう)と思っていたので、思ったより遅かった。



C言語で再実装してみた

翌週、実はC言語で再実装してみた

もうこれ以上は最適化なんてしないって思っていた。でも1秒を切れなかったのがやはり心残りだったんだと思う。

基本的なアルゴリズムは Python バージョンと一緒だが、本質的ではないところで 楽をしたり、異なる実装をしたりている。

まず Python バージョンでは盤面の制約やブロックの形状を配列として表現しておき、実行時に 64bit のバイナリ表現を得ていた。 C 言語でこれを書き直そうとするとちょっと面倒なので、 Python バージョンが得られたバイナリ表現をソースコード中にLUTとして埋め込みしておく。

ブロックのLUTと、ブロックのメタデータ(LUT内の開始位置と数量)を独立したデータとして持っている。各ブロックの大きさや形状の違いからブロック数量が異なる。このためLUT自体はひとつ一次元配列で持っているので、CPU L1キャッシュを無駄にしない。

現状のコードは日付の指定や結果の出力機能を持っていない。 Python バージョンと同じデータ構造なので、 Python モジュールとしてコンパイルしてしまい、 Python バージョンを宿主として足りない部分は既存コードを使い回そうかなと思っている。

C言語で書くと有り難いのが次に攻略すべきセルを選択する部分で、ここはどうしても Python 上で速くする方法を思いついていなかった。いくつかのパターンを試した感じでは NumPy に任せておいたほうが速かったので、そうなっていた。

# バイナリ化前
def next_cell(context):  mat = context['mat'] pos = np.unravel_index(np.argmin(mat), mat.shape) minval = mat[pos[0], pos[1]] if minval != 0: return None return pos
# バイナリ化後 def next_cell(mat):
mat = mat2bin_lut & mat pos = np.unravel_index(np.argmin(mat), mat.shape) minval = mat[pos[0], pos[1]] if minval != 0: return None return pos

この部分はC言語であれば素直に書ける。

/* 盤面 board に対して次のセルを選んで、その bit の値を返す */
inline uint64_t next_cell_bit(uint64_t board) {
    for (int y = 0 ; y < 7; y++)
        for (int x = 0 ;x < 7; x++) {
            uint64_t done = board & mat2bin_lut[y][x];
            if (! done)
                return mat2bin_lut[y][x];
        }
    return 0;
}

ある時点のC言語バージョンでは gcc の最適化なしで 40ms (0.04秒)、 -O3 で 20ms まで実行時間を短縮できる事を確認できた。M1 Macでは10msで完了したという報告ももらった(速っ!)。現時点のものは、もう少し最適化が進んでいる。


キャッシュミス分析

このソルバーは約10KBのLUTを使って、最大8回の再帰呼び出しするだけなので、この規模であれば、ここ10年のプロセッサーではCPUのL1 Cacheに載りきってしまうことが想像できる。以下は Ryzen 9 3900X の Linux box において cachegrind でこのソルバーをプロファイルしてみた結果である。

$ valgrind --tool=cachegrind ./solver
==114519== Cachegrind, a cache and branch-prediction profiler
==114519== Copyright (C) 2002-2017, and GNU GPL'd, by Nicholas Nethercote et al.
==114519== Using Valgrind-3.15.0 and LibVEX; rerun with -h for copyright info
==114519== Command: ./solver
==114519==
--114519-- warning: L3 cache found, using its data for the LL simulation.
50 solutions found
==114519==
==114519== I   refs:      100,596,976 .... 命令の参照数
==114519== I1  misses:          1,140 .... L1 キャッシュミス
==114519== LLi misses:          1,116 .... L3 キャッシュミス
==114519== I1  miss rate:        0.00%
==114519== LLi miss rate:        0.00%
==114519==
==114519== D   refs:       19,428,245  (18,233,828 rd   + 1,194,417 wr) .... データ読み書き数
==114519== D1  misses:          3,401  (     2,718 rd   +       683 wr) .... L1 キャッシュミス
==114519== LLd misses:          2,817  (     2,213 rd   +       604 wr) .... L3 キャッシュミス
==114519== D1  miss rate:         0.0% (       0.0%     +       0.1%  )
==114519== LLd miss rate:         0.0% (       0.0%     +       0.1%  )
==114519==
==114519== LL refs:             4,541  (     3,858 rd   +       683 wr)
==114519== LL misses:           3,933  (     3,329 rd   +       604 wr)
==114519== LL miss rate:          0.0% (       0.0%     +       0.1%  )

I1 miss rate および D1 miss rate が 0.0% となっていることで、プログラム自体もデータも L1 キャッシュに収まっていることがわかる。念のためソルバーの実行回数を増やしたものを同じくプロファイラにかけてみたりもしたが、大きくキャッシュミスが増えたり、キャッシュミス率が上昇したりする状況を確認できなかった。

LLd miss rate の 2213 rd は恐らく 64bit のキャッシュラインへの読み込みが2213回発生したということだと思うので、 2,213*8=17,704 [bytes] で、コード中で使っている LUT サイズが 10KB 前後、あとはヒープや自分が書いたものではない関数内のメモリ操作をあわせると、こんなものなんだなと思っている。

知識として CPU のキャッシュの事は知っていたけども、こういったデータを具体的に見る機会は今までなかったので、パズルついでにここまで楽しめるのはお得感があった。


これで終わりだと思うけど


一度「もうこれ以上はやらない」と書いていたのに色々と手を入れてしまったので、ほかにも思いついている最適化を試してしまうかもしれない。
  • mat2bin_lut の unroll
    試した範囲では認識できるほどの差がないことがわかっている
  • mat2bin_lut を引くのをやめて毎回計算で求める (のとどちらが速い?)
  • AVX512


おわりに

とりあえず数日間でソルバーを書いて遊んでみた内容をまとめてみた。

A Puzzle A Day のパズルはアナログでありながら、365日飽きもせずに遊べそうで、とても面白いと思う。同時に盤面は 7x7 未満とかなり小さく、私みたいに総当たりしてもそれほど難しくないので、プログラミング経験が浅くてもソルバーを実装にチャレンジするにはお手頃な問題ではないかと思う。


2016年6月15日水曜日

ルータがPPPoEで取得しているIPアドレスが変わったらRoute 53のレコードを更新するスクリプト

リモートから自宅のネットワークに接続できるようにしている方も多いと思いますが、ウチも出先からリモートアクセスしたりしています。

これまで外部の Dynamic DNS サービスに頼っていたのですが、利用していたサービスに変更があったのをきっかけに自前の仕組みに置き換えました。今回は、その仕組みについて紹介します。

私の自宅では RTX1100 が上流ネットワークとの PPPoE 接続を行っており、 pp 1 インターフェイスが外側のアドレスを持っています。このインターフェイスのアドレスを、自分が Amazon Web Services の Route 53 上で管理するゾーンのAレコードとして登録します。

ちょっと面倒だったのが RTX1100 の外側 IP アドレスの取得をどうするか、です。最初は 「SNMP クライアントでつっつけば出てくるじゃろ?」と適当に snmpwalk してみたのですが、よくわからなかったので、結局 telnet で調べるローテクで実装しています。

使っているコードのうち自宅依存部分を取り除いたものを https://github.com/hasegaw/route53update に Push してあります。


この中には3つのファイルが含まれています。

  • rtx1x00_show_status_pp_1.exp …… RTX1100 と telnet して IP アドレスを取得する
  • route53update.py …… 取得したアドレスを R53 に反映する
  • run.sh …… 実行方法

rtx1x00_show_status_pp_1.exp

RTX1100 に telnet して IP アドレスを取得します。このスクリプトは実際には RTX1100 と通信して、その内容を標準出力に書き出すまでです。実行には expect が必要です。引数としてホスト名とパスワードをとります。



route53update.py

上記 Except スクリプトを実行し、ルータからの出力を実際にパースし、 Route 53 のレコード内容と比較してみて、必要なら反映するのがこちらのスクリプトです。ファイル内にいくつか設定項目があります。(レコード名、 ゾーン名、ルータのホスト名とパスワードなど)
おまけで Slack にも対応しています。


 run.sh

route53update.py を実行する方法の例です。 AWS の API キーを設定しスクリプトを叩いています。手元ではこのスクリプトを cron に仕掛けています。


定期的にこのしくみが実行されることにより、Route 53 上の A レコードが常に自宅のルータに向くようになります。また、 Slack に新しい IP アドレスが通知されるため、ネットワークの不調に気づきやすかったり、などのメリットがあるかもしれません。

2015年12月10日木曜日

おうちハックで戦った話

この記事は おうちハック Advent Calendar の 10 日目の記事です。

前日の記事: Extreme Home Hackレポート
翌日の記事:メインブレーカーが落ちる前にドライヤーを止める

なんか面白そうなので参戦してしまいました。私は玄関ドアにネットワークカメラを設置したことがあるので、そのときの話を紹介したいと思います。


すべてのはじまり



隣人から、わたしの在宅、不在に関わらず「私の騒音がうるさい」というクレームがくるようになったのです。このクレームの一週間後、私は当時の勤務先の本社があるソルトレイクシティに出張する予定でした。このため、私が海外出張中でも物音がするのかを確認するため、急遽 Web カメラでインターホンの動体検出・録画を仕掛けました。


玄関カメラ1号


玄関のインターホンはカメラ付きで、誰かがインターホンを押せば、その瞬間に液晶画面に玄関前の映像が映ります。このため、インターホン設備の前にWebカメラを置き、MacBookに接続して、動体を検出したら自動的に録画するソフトウェアを見つけてきて仕込みました。

 
帰宅した自分自身も記録されています。

ここで使用したソフトウェアは、 YYYYMMDD_HHMMSS形式でJPEGファイルとして動体のフレームを保存します。このため、ファイルが保存されれば、いつ、誰が自宅のインターホンを押したかが記録できるわけです。

このソフトウェアの画像ファイル保存先を Dropbox に設定することで、私がアメリカ(現地時間の日中は、クレームがくる可能性がある夜間や朝)に滞在しているとき、手元の Dropbox ディレクトリ内に新しい画像ファイルが同期されてきて、リアルタイムに気づけることになります。

ついに隣人が私の家のインターホンを押しました。私は Snowbird というゲレンデで同僚とスノーボードを楽しんでいるのに、です。



これで私の無実は証明できました。しかし。しかしです。

3月3日 午前6時37分。当たり前ながら(?)私は寝ていました。そうすると突然インターホンが鳴り、いったい誰がきたのかと思ったら、例の隣人が「出てこい!」と切れています。何かが玄関ドアに対して当たる音がしました。きっとドアを殴ったか、蹴ったかのどちらかでしょう。びっくりしていると私の家の前から消えてしまったのですが、追いかけて事情をきくと、昨夜 23時〜0時にかけて物音がした事についてのクレームでした。

3月17日 午後10時30分。また下の階の住民が訪ねてきて、騒音がうるさい!とのクレーム。そのときは、まだ発売されたばかりの「シムシティ」を楽しくプレイしていた最中で、ゲームからショベルカーで建物をスクラップしたときのドーン音ぐらいしか発生させていませんが、それが下の階に届くことは考えづらいですし、自分自身はソファに座ってキーボードやマウスを操作しているのですから、騒音のたちようがありません。

どうにもならないので大家、不動産会社経由で管理会社に「隣人が騒音に苦しんでいるので助けてあげてほしい」と連絡をしました。これまでの経緯もあわせて、です。海外出張中の深夜に隣人が訪ねてきた記録が残っていることも話しました。


玄関カメラ1号、破れる


すると隣人のクレームがエスカレートしてきました。

 

3月23日 21時54分、インターホンのカメラ部分に手を当てて自分の姿を隠しながらドアを喚いたりドアを蹴ったりしています。どうやら記録の件が管理会社経由で伝わって気分を害したようです。仕方ないのでまた管理会社に連絡です。

管理会社に対応をお願いしつつ、私はインターホンのカメラシステムをアップグレードする必要性を感じていました。理由は:
  • 就寝中(午前2時〜4時)の訪問を繰り返されて、インターホンが鳴ったかどうかも正常に判断できなくなっており、客観的な記録はとり続ける必要があった
  • 先述の理由でインターホンのモニタ前にカメラを仕掛けても目的の画像が記録できなくなっていた
  • 在宅、不在にかかわらず隣人が自区画に近づいたかはログしておきたかった(訪問のほか、嫌がらせなどがあった場合の記録は残しておきたい)

玄関カメラ2号、爆誕


そんな時、秋葉原で在庫放出品と思われるネットワークカメラ PLANEX CS-W05NM があったので、買いました。


こちらは無線 LAN モデルになっており、 Web ブラウザからカメラのリアルタイムモニタリングをしたり、動体検出 SD カードや SMB/FTP によるリモートへの録画などが可能です。

早速、玄関ドアの覗き穴部分に取り付けました。築40年越えの分譲マンションのドアですが、ホームセンターで購入した玄関用覗き穴パーツ、木の板、ボルトなどをつかってマウントを自作します。賃貸契約ですし、オリジナルの部品は退去時に原状復帰できるようにすべてそのまま保管してありました。


しかし、新たな問題が発生しました。覗き穴経由の像は有効画素数の一部にしか映っておらず、結果としてネットワークカメラがもつ動体検出機能が動作しませんでした。動体検出の閾値は調整できそうにありません。


玄関カメラ2号のソフトウェア開発


仕方なく何か手段がないか探っていると、 設定次第では、ネットワークカメラに HTTP リクエストを送ると、Motion JPEG 形式でストリーミングが受けられることに気づきました。 Motion JPEG はいわば JPEG 画像のパラパラまんがですので、これをプログラムでバッファに書き出せば画像をリアルタイムで受信できるわけです。ネットを探検していたらサンプルコードを見つけたので Python で実装しなおし、二枚の画像を Numpy で引き算し差分を求めれば、画像の変化がわかるので、ニーズにあわせた動体検出&録画プログラムを書くことにします。

と、いうわけで。 Python と OpenCV を Hello world する日がきました。 Motion JPEG ストリームから JPEG データを抜き出し、 OpenCV を使って画像データ(Numpy)の配列に変換します。

あとは、すこし時間差がある画像2枚の差の絶対値をとれば、画像のなかで変化が大きいところが浮かびあがってきます。

この考え方をベースに、
  • Python で Motion JPEG のストリームを受信
  • 過去何十フレーム程度の履歴を常に保持
  • 過去のフレームと最新のフレームの差(=画像の変化)を調べる
  • 画像に変化がしばらく続いた場合に、バッファに残っているフレームから、画像の変化がなくなってしばらくするまでをひとつのMJPEGファイルとして保存
  • 検出開始した場合は Growl で手元の端末に通知
実際には蛍光灯のフリッカー、きまった時間に点灯/消灯する階段照明、カメラ自体のオートコントラスト/ゲイン機能などによる一時的な画面変化などを無視する処理もしています。

このソフトウェアが生成したビデオファイルの例を示します。

この仕組みにより、自宅の玄関に隣人が迫ってくる段階から記録を残すことができるようになりました。


玄関カメラ2号の運用結果


この仕組みでは、玄関の前を通った人たちも記録されてしまいます。見られていると思ったら周りの人々も気持ち悪いでしょうし、私自身も興味ない上に見ていてあまり気分のよいものではないので、目的の人物でなければ見なかったことにして忘れてファイルも消去です。そのほかにカメラを映りこんだ人を見てみると、
  • 表札をチェックしていく謎の男性 (地図屋?そのほかの調査?)
  • なぜか不在中に来訪した女性(たぶん宗教の勧誘とか)
  • 宅配便のスタッフ(チャイムを鳴らしながら電気メーターのまわり具合をみて在宅かどうかを見極めようとする人が多い)
などが見えてきて、いろいろと参考になりました。そして、ゴールデンウイーク。問題の隣人が夜に訪ねてきたビデオがついに記録されました。
問題の隣人が夜に訪ねてきたタイミング、および日中に手紙を玄関ポストへ投入する姿が覗き穴から確認できました。もちろんドアに近づくどころか階段を昇ってくる瞬間から動画として記録していますので、仮面ライダーのコスプレでもしていなければ誰であるかもよく判ります。

幸い、このタイミングで管理会社に改めて連絡をしてからは、隣人からクレームがくることはなくなりました。その後もしばらくこのシステムを稼働していましたが、さらに半年した時点でシステムを停止しました。今年にはいってからマンションの大規模修繕があり、玄関ドアごと付け替えになったので、先のネットワークカメラも今はありません。


関連するハック


このカメラですが、連続稼働していると突然通信できなくなることがあり、そのためにワッチドッグタイマーでリセットすることを考えて Eject-io というデバイスを開発していました。 Eject-io は カーネル/VM探検隊の Advent Calendar 2013 で紹介しましたので、そちらをご確認ください。
実際には Eject-io を使わずにタイマーを使って6時間ごとにネットワークカメラを起動しなおす運用を続け、それで目的が達成できてしまったので、 Eject-io は使いませんでした。


また、のちにスプラトゥーンの画面解析ツール IkaLog を作る際にはこの時の Python + OpenCV の経験が役に立ちました。

おわりに

ちょっとした手段で記録を残しても、いわゆるキ○ガイが静かになるかどうかは別の話だなあ、と思いました。

今回、私のケースは半年程度でに落ち着いたので(たぶん)よいのですが、火のついたマッチを玄関ドアの郵便受けに突っ込まれたり、その前に灯油がガソリンなどまかれたりしたら恐ろしいことになりますし、上の階に住んでいる子供達が心配です。

そういう自体を想定して火災アラームの増設などは行っていましたが、そもそも午前2時〜4時に度々チャイムを鳴らされて起こされていると、寝ている途中でふと「チャイムが鳴った気がして目が覚めたり、でも鳴っていなかったり、鳴ったのか鳴っていないのか判らなかったりしてきて、認識に異常をきたします。正直なところ、そもそも、そんな心配がないところに移ったほうがいい、と思っています。

おうちハック自体ではキ○ガイは居なくなりません。皆さん、お気を付けください。

おうちハック Advent Calendar の 10 日目
前日の記事: Extreme Home Hackレポート
翌日の記事:メインブレーカーが落ちる前にドライヤーを止める