2022年4月30日土曜日

ES Modules から YouTube IFrame Player API を使う

YouTube IFrame Player API は、手元の Web サイトなどに YouTube 動画を埋め込みできる YouTube 公式のAPIです。

iframe 組み込みの YouTube Player API リファレンス

以下に Iframe Player API のサイトにあるサンプルコードを引用します。

<!DOCTYPE html>
<html>
  <body>
    <!-- 1. The <iframe> (and video player) will replace this <div> tag. -->
    <div id="player"></div>

    <script>
      // 2. This code loads the IFrame Player API code asynchronously.
      var tag = document.createElement('script');

      tag.src = "https://www.youtube.com/iframe_api";
      var firstScriptTag = document.getElementsByTagName('script')[0];
      firstScriptTag.parentNode.insertBefore(tag, firstScriptTag);

      // 3. This function creates an <iframe> (and YouTube player)
      //    after the API code downloads.
      var player;
      function onYouTubeIframeAPIReady() {
        player = new YT.Player('player', {
          height: '360',
          width: '640',
          videoId: 'M7lc1UVf-VE',
          events: {
            'onReady': onPlayerReady,
            'onStateChange': onPlayerStateChange
          }
        });
      }

      // 4. The API will call this function when the video player is ready.
      function onPlayerReady(event) {
        event.target.playVideo();
      }

      // 5. The API calls this function when the player's state changes.
      //    The function indicates that when playing a video (state=1),
      //    the player should play for six seconds and then stop.
      var done = false;
      function onPlayerStateChange(event) {
        if (event.data == YT.PlayerState.PLAYING && !done) {
          setTimeout(stopVideo, 6000);
          done = true;
        }
      }
      function stopVideo() {
        player.stopVideo();
      }
    </script>
  </body>
</html>


Iframe Player API は ESM として実装されていないので、 ESM として実装した Javascript などのコードから利用する場合にはいくらかの工夫が必要になります。

API の紹介サイトでは同 API の初期化終了後にコールバックされる onYouTubeIframeAPIReady() をグローバル名前空間で宣言していますが、これは ESM の名前空間からみれば window.onYouTubeIframeAPIReady() にあたります。また、同 API の初期化が完了すると YT が定義されますが、これは ESM の名前空間からみると window.YT になります。

この考慮点を反映して ESM としてサンプルを再実装したものが下記になります。

実際には ESM として実装する場合には、クラスとして実装するケースが多いでしょう。下記の例では、埋め込んだプレイヤーからのコールバックをインスタンスのメンバ変数として実装しています。

※コード上は動画のロードが完了し次第自動再生するような実装になっていますが、本記事執筆時点ではそのように動作していないようです。これは API のサイトにあるオリジナルのサンプルの問題です。

2022年3月28日月曜日

シェルスクリプトで unixtime を扱う

シェルスクリプト内で unixtime を得るには /bin/date +%s が利用できます。この方法で取得した unixtime はシェルスクリプト内で加算・減算したり、比較したりできます。

以下は、 sleep に頼らずに $TIMEOUT 秒間待機する例です。単純に sleep $TIMEOUT と記述するよりも、柔軟な待機ループを実装できます。

TIMEOUT=10
TIME_START=`/bin/date +%s`             # ループ開始時刻(いま)
TIME_END=$((${TIME_START} + ${TIMEOUT})) # ループ脱出時刻

echo -n running
while [ `/bin/date +%s` -lt ${TIME_END} ]; do
        sleep 1
        echo -n .
        
        # MySQL Server への接続性が確認できたら $TIMEOUT 秒待たずにループを抜ける 
        mysql -e 'SHOW STATUS' 1> /dev/null 2> /dev/null && break
done

2022年3月25日金曜日

Linux KVM + virtio-net のレイテンシーを削減するヘンな方法

Linux KVM の virtio-net を利用している際にリモートホストへの PING 結果がマイクロ秒のオーダーで安定しない(ブレる)という話を見かけました。

Linux KVM の仕組みや QEMU のデバイスエミュレーションの仕組みを考えれば「そんなもの」かなと思い、調査および追加の実験をしてみました。なお、本記事は x86 アーキテクチャの Linux KVM の動作に基づきます。


パケットが送信される仕組み

ping コマンドなどにより発生した、送信パケットは OS のネットワークスタック(L3, L2)を通じてイーサネットのドライバに引き渡されます。
今回はLinux KVMのデバイスエミュレーションを利用した際のレイテンシーに注目したいため、プロセスからデバイスドライバまでデータ届く流れについては触れません。 Linux KVM の仮想マシンで使われる virtio-net に注目します。
仮想的なネットワークインターフェイスである virtio-net 、またそのベースとなる、ホスト−ゲスト間の通信に使われる virtio-pci では、ゲストOSのメモリ空間にある送信対象データをバッファへのポインタをリングバッファに積み、I/Oポートを叩くことで VMM (Linux KVM) 側に通知します。
このあたりの仕組み(PCIデバイスに模倣する virtio のゲスト・ホスト間通信の仕組み)は過去にエンジニアなら知っておきたい仮想マシンのしくみ スライドにて解説しているので、そちらもご確認ください。

https://github.com/torvalds/linux/blob/4f50ef152ec652cf1f1d3031019828b170406ebf/drivers/net/virtio_net.c#L1762 に、ホストへ通知すべき送信データが準備できた際ホスト側の virtio-net に通知するブロックがあります。
        if (kick || netif_xmit_stopped(txq)) {
            if (virtqueue_kick_prepare(sq->vq) && virtqueue_notify(sq->vq)) {
			u64_stats_update_begin(&sq->stats.syncp);
			sq->stats.kicks++;
			u64_stats_update_end(&sq->stats.syncp);
		}
	}

virtqueue_notify(sq->vq) は仮想デバイスの I/O ポートへのアクセスする実装となっています。そのセンシティブ命令を契機としてゲストから VMM に制御が移ります。デバイスモデル(qemu-kvm)のI/Oポートハンドラがゲストからの割り込み理由を判断し、 virtio-net のゲスト側リングバッファにあるデータをバックエンドのネットワークインターフェイスに渡します。
この際(Intel 表記で) VMX non-root → VMX root の kvm → qemu-kvm (ring3) と遷移し、場合によっては qemu-kvm がネットワークスタックにパケットを渡す前にプリエンプションが発生する可能性があり、送信タイミングが遅れる原因になり得ます。

 パケットが受信される仕組み


では、パケットが届いた場合はどうなるでしょうか? qemu-kvm がゲスト行きの受信データを受け取ると、予め用意されているゲスト側のメモリ領域に受信データを書き込み「ゲストに対して割り込みをかけます」。
本物のx86のプロセッサであれば、IRQ(Interrupt ReQuest)を受け付けるための信号線があり、それを介して割り込みを通知する方法と、割り込み通知用のアドレス空間に対するメモリ書き込みトランザクションにより割り込みを通知するMSI(MSI-X)割り込みがあります。仮想マシンの場合、仮想プロセッサにはIRQ線、MSI通知用のアドレス空間にメモリトランザクションを行ってもそれを受けてくれる相手がいないので、仮想マシンのプロセッサを「割り込みがかかった状態」にすることで割り込む挿入を実現することになります。

https://github.com/torvalds/linux/blob/0564eeb71bbb0e1a566fb701f90155bef9e7a224/arch/x86/kvm/vmx/vmx.c#L4574 にある vmx_inject_irq() に、 Intel VT でゲストに割り込みを通知する実装があります。
static void vmx_inject_irq(struct kvm_vcpu *vcpu)
{
	struct vcpu_vmx *vmx = to_vmx(vcpu);
	uint32_t intr;
	int irq = vcpu->arch.interrupt.nr;

	trace_kvm_inj_virq(irq);

	++vcpu->stat.irq_injections;
	if (vmx->rmode.vm86_active) {
		int inc_eip = 0;
		if (vcpu->arch.interrupt.soft)
			inc_eip = vcpu->arch.event_exit_inst_len;
		kvm_inject_realmode_interrupt(vcpu, irq, inc_eip);
		return;
	}
	intr = irq | INTR_INFO_VALID_MASK;
	if (vcpu->arch.interrupt.soft) {
		intr |= INTR_TYPE_SOFT_INTR;
		vmcs_write32(VM_ENTRY_INSTRUCTION_LEN,
			     vmx->vcpu.arch.event_exit_inst_len);
	} else
		intr |= INTR_TYPE_EXT_INTR;
	vmcs_write32(VM_ENTRY_INTR_INFO_FIELD, intr);

	vmx_clear_hlt(vcpu);
}
ここでのキモは intr |= INTR_TYPE_EXT_INTR もしくは INTR_TYPE_SOFT_INTR です。 Intel VT では、仮想プロセッサの状態を管理するVMCS構造体に対して特定のビットを立てておくと、「次にその仮想マシンを実行するときに、ゲスト側で割り込みハンドラに処理が移される」仕組みです。ただし、ここでは実際に仮想マシンの実行し、割り込みハンドラに処理を移してはいないことに注意が必要です。

では、具体的にいつ、仮想マシンの割り込みハンドラが実行されるのでしょうか?答えは、Linux KVM + qemu-kvm の場合 qemu-kvm のメインループでCPUスレッドが改めて VMX non-root モードに移行するように要求したときです。

「次に仮想CPUを実行するときは割り込みハンドラを実行する必要があるよ」設定されても、その次にスケジュールされるタイミングは qemu-kvm の仮想CPU実行スケジュール、Linuxのスケジューラに依存することになります。仮想マシンのCPU負荷が低い場合には、qemu-kvmはVMX non-rootへの遷移を抑制して他のプロセスへのプリエンプションを許すことにより負荷を下げます。しかし、パケットが届いた後にqemu-kvmのCPUスレッドにプリエンプションが発生し、VMX non-rootへ遷移するまでがネットワークレイテンシーの増加として見えることになります。


VMX non-root でのビジーループは解決策になるか

では、ゲストがビジー状態でCPUを使い続ければ他のプロセスへのプリエンプションが発生せず、レイテンシーに改善が見られるのではないか?という仮説がありました。この方法で、ゲストOSがHLT命令などを発効してCPUを手放すことが減るというメリットは確かにありそうですが、2つの考慮点があります。
  • 仮想プロセッサが割り込みハンドラに制御を移すのはVMX rootからVMX non-rootに制御を移すタイミングです。このため、VMX non-rootでビジーループを回っていても割り込みハンドラに制御が移るわけではありません。
  • VMX non-rootでビジーループを回り続けた果てにVMX rootに遷移したとき、Linux カーネルから見れば、そのCPUスレッドは「大量のCPU時間を消費した」プロセスと認識します。このため、負荷が高いシステムでは、Linuxカーネルは他のプロセスに積極的にプリエンプションしようとするでしょう。これでは、次回のVMX non-rootへの遷移が遅れてしまいます。


ちょっとしたカーネルモジュールでレイテンシー性能を改善する

ここでは、ちょっとしたカーネルモジュールをロードして、レイテンシー改善が可能かを試してみました。コードは https://github.com/hasegaw/cheetah/blob/master/cheetah.c です。以下の関数をカーネルスレッドとしてビジーループで回します。

static void kthread_main(void)
{
        u32 vmx_msr_low, vmx_msr_high;

        rdmsr(MSR_IA32_UCODE_REV, vmx_msr_low, vmx_msr_high);

        schedule();
}

rdmsr は、 x86 アーキテクチャにある Machine-Specific Register を読み取ります。この命令はデバイスモデルでのエミュレーションが必要になるセンシティブ命令であるため、強制的に VMEXIT が発生して VMX non-root → VMX root への遷移が発生し、qemu-kvmでRDMSRのエミュレーションが行われ、その結果をもって再度VMENTERされます。
RDMSRのエミュレーションは未改造のqemu-kvmにおいて比較的コストの低いエミュレーション処理で、特に副作用が想定されないため、これを選んでいます。HLTなどを利用するとVMMがすぐにゲストに処理を戻さないかもしれませんが、RDMSRであれば、他プロセスへのプリエンプションがない限りは、すぐに再度VMENTERによりVMX non-rootに移行することが想定され、このタイミングで割り込みハンドラへの制御が移るわけです。


測定結果

Linux KVMで動作するシステム上のLinuxゲスト間でpingコマンドを実行した際のレイテンシーを測定してみました。下記が90%パーセンタイル値です。
  • RDMSRのビジーループなし(通常の状態) —— 614us (100.0%)
  • RDMSRのビジーループなし、ゲスト内で perl -e 'while(1) {}' でビジーループ —— 516us (84.0%)
  • RDMSRのビジーループあり —— 482us (78.5%)
ごく一般的なLinuxゲストと比較すると、先のRDMSRのビジーループを回したLinuxゲストでは20%程度のレイテンシー低下が確認できました。

このデータは2021年3月(本記事の執筆時点からちょうど1年前)に割とノリで試したもので、であまり細かいデータを取っていないため、上記の値しか残っていないことにはご容赦ください。


割り込み通知からポーリングループへの移行

ゲストがビジー状態で回り続けていても通知を受けられないのなら、ひとつの選択肢としてはポーリングループでホストからの通知が受けられたらいいのに、という発想になるかと思います。実際、DPDKではプロセスがループでポーリングし続けることで大幅にレイテンシーを短縮しているわけです。ホストOSからゲストOSへの通知をポーリングループで監視すればよいのではないでしょうか?

今回は、virtio-netに対して手を入れるような事はしなかったのですが、ホストOSからの割り込み通知を待たずにリングバッファへ新着データを拾いにいくことは可能な気がします。しかし、本当にレイテンシー面を気にするのでれば SRIOV などを検討するほうがいいでしょう。

Linux KVM ではありませんが、同じく Intel VT ベースで仮想マシン技術を提供する VMware 社のホワイトペーパー Best Practices for Performance Tuning of Latency-Sensitive Workloads in vSphere VMs には、下記の記述があります。



まとめ

Linux KVM の virtio-net を利用している際にリモートホストへの PING 結果がマイクロ秒のオーダーで安定しない(ブレる)理由について、Linux KVM、qemu-kvmとゲスト側カーネルの間で何がおきるかを考えてみました。
また、ゲストへの割り込み挿入タイミングを増やすために、ゲストカーネル上で数行で書けるカーネルスレッドを動かすことにとって、実際にレイテンシー性能がいくらか向上することを確認しました。
マイクロ秒オーダーの時間のブレを気にする場合は、最終的にはSRIOVやポーリングモードの導入、そもそも(VMENTER/VMEXITだって遅いので)VMM上ではなく物理的なマシンの利用が解決策になると思いますが、仮想マシンのしくみがこんな物なんだと理解しておくのは悪くない事かと思います。




2022年3月24日木曜日

シェルスクリプトで MySQL サーバーが起動してくるのを待つ

 10年ほど前にメッチャ MySQL で TPC-C を回していたときに使っていた方法です。

echo -n "Waiting for the database ready...."
while true; do
        sleep 1
        mysql -h ${TPCC_HOST} -P ${TPCC_PORT} -u ${TPCC_USER} --password=${TPCC_PASSWORD} \
                -e 'show status' 1> /dev/null 2> /dev/null  &&  break
        echo -n '.'
done
echo 'up'

MySQLクライアントでサーバに接続し適当なクエリを実行できるかでアクセス性があるか判断しています。このクエリが実行できるようになるまでここでブロックし、問題なくクエリが実行できれば終了コードとして 0 が返されますので && break が実行されループを抜けます。

今回はサーバがクエリを受け付けられるようになるまで待っていますが、同じ方法を用いて、クエリが実行できるかどうかで処理内容を条件分けすることもできますね。

mysql -h ${TPCC_HOST} -P ${TPCC_PORT} -u ${TPCC_USER} --password=${TPCC_PASSWORD} \
        -e 'show status' 1> /dev/null 2> /dev/null
# 上記 mysql コマンドの終了コードが格納された $? を見て処理を分岐する
if [ $? -eq 0 ]; then
    echo "クエリが成功した"
else
    echo "サーバに接続できなかった、クエリが失敗した"
    exit 1 # ゼロ以外の値でシェルスクリプトを異常終了させる
fi

exit 0 # 正常終了

2022年3月1日火曜日

IkaLogの開発で得た知見:大量の動画や画像を取り扱う際の Tips

スプラトゥーンの画像解析ツール IkaLog を作った際に得た知見をいくつか共有したいと思います。特に動画や画像を取り扱った際に苦しんだ点、工夫した点、良かった点などを紹介します。

資産として動画データを蓄積する

IkaLog では Nintendo Switch コンソールが出力した生の画像データを扱いますが、ただし開発にあたって「取りっぱなし」のビデオの利用が適切とは言えません。撮りっぱなしの 1 時間 1GB の動画ファイルと、1分間15MBの動画ファイルであれば、後者のほうが取り扱いやすいこと感覚的にわかるかと思います。

長い動画ファイルから目的のシーンを探し出すことには作業として手間がかかります。VLC などの動画再生ソフト、もしくは動画編集ソフトによる作業をしようとした場合、長い動画から目的のシーンを的確に見つけて作業するためには、シークバーを正確に操作しなければいけません。そもそもコンソール上で作業しづらいという事になります。

また、自動テストをまわすとしても時間がかかります。たくさんのフレームをデコードするためにはプロセッサなどの性能を使用するほか、ビデオ内の時間を指定してビデオをシークするとしても、ファイルの先頭から可変長のフレームデータをパースしフレーム数を数えなければいけないこともあり、時間がかかるのです。

この問題を解決するためには、以下のような方針でビデオを管理します。
  • 目的の画像がわかっていて、確実に取得する方法がある場合には、そのシーンを再現して、そのシーンだけ録画する。
  • 撮りっぱなしは1時間程度〜長くても数時間程度にとどめる。長くなりすぎた撮りっぱなしビデオは、ある程度の長さ(2-3時間程度)に分割してから扱う
  • 最終的には目的のシーンを切り出して、内容が明確にわかるファイル名などを付与して扱う
  • 必要なレベルのフレームレートに落とす(ファイルサイズ削減、画質向上が期待できる)

動画ファイルから目的の部分を切り出す

動画ファイルからの切り出しには FFMPEG を使用する場合、 ffmpeg -i 入力ファイル名 -ss 開始時間 -t 秒数 -c:a copy -c:v copy 出力ファイル名 などを使用します。開始時間が 38分22秒であれば、その時刻は bash 等のシェル上で $((38*60)+22) などとして秒数に変換ができることも憶えておくと便利です。

GUIで編集しやすいソフトウェアとして VidCutter があります。このソフトウェアは動画ファイル内にある圧縮済みの動画データを再エンコードしないため画質が劣化せず、高速にシーンを切り出せるため、とても便利です。

自分の目を活用する:画像タイルの中から違うものを探すのが得意

以下の画像を見て、分類に間違いがあるものを見つけてみてください。


どうでしょうでしょうか?不規則なものにすぐ気づけると思います。

画像をこのように並べて表示するにはどうしたらいいでしょうか? 分類済の画像ファイルがある場合、 IMG タグ(だけ)を並べたHTMLファイルを準備するだけでよいですから、最小限でよければシェル上でワンライナーとして書くこともできます。

多数のデータを扱う場合は、ほどほどの個数にわけて扱う

初代スプラトゥーン向けの IkaLog では、ブキ画像の分類問題の機械学習やその精度検証用としておよそ300万サンプルのブキ画像(PNGファイル)を用意していました。便利なデータ管理ツールなどを使っているわけではありません。このようなデータはいくつかのサブグループに分けて扱うほうが簡単です。

スプラトゥーンの場合およそ100のブキが存在し、それらのブキのクラス分類をするために機械学習を利用しました。1クラスあたり3万サンプルと思えばデータセットは少し小さくなってくるのですが、作業には手間がかかります。

データセットが大きいと「目 grep 」時に見落としたり、注目したデータを見つけたり加工したりするにも時間がかかります。一方、扱いやすい規模よりも小さなデータセットに細かく分割してしまうと、手間が増えるため効率が悪くなります。

経験上、先に示したようなブキ画像を扱う場合などでは、データセットを 5,000 ~ 10,000 程度ごとにグループ化すると扱いやすいです。この規模は、アイコンサイズの画像であれば、横 25カラムであれば縦200行〜400行程度。この規模であれば、先のIMGタグによるタイル表示や Finder / Explorer などを用いて目視で確認する場合も、ざっと確認したいだけなら10秒〜20秒で見渡すことができる分量です。

たとえば IMG タグを多数並べたファイルを作成した HTML ファイルを閲覧して誤分類された画像ファイルが 5つほど目に留まり、それを Finder などで除外したい場合を想像してみてください。30,000のファイルから5つに注目するより、5,000のファイルから5つに注目するほうが簡単で、素早く対応できます。


”不労所得”を得る:自分が努力しなくてもソースが集まるような工夫

大量のデータを用いて機械学習やそのテストをしたい場合、もしくは目的達成のために大量のデータを集めたい場合、自分自身でそのデータを集めてまわるのは非効率です。

IkaLog の場合は開発当初から常用してくれるユーザという資産、また stat.ink との連携ができたので、ここからリザルト画面などの画像データを得ることができました。自分ひとりがゲームをプレイして1日10ゲーム程度だとして、 stat.ink には毎日のように数千ゲームのスクリーンショットが投稿されていました。また、その中には録画状況が不適切なものなどが混じっているなど、実際のワークロードとしてどれぐらいの分散(variance)があるのかという情報も含まれています。一通りの開発が終わった後は、この基盤があることによりブキ判定の強化を素早く進めることができました。

2022年になって、スプラトゥーン2向けに幾らかエンジニアリングしなおしていますが、ここでは YouTube に投稿された有名配信者の生配信ビデオを多く使用しています。スプラトゥーン向けのIkaLogを開発していた際もほかの方のプレイ動画を利用できないか検討しましたが、当時の動画配信サービス上のビデオは開発にあたり必要としている画質を満たすものが少なく、難しく感じました。現在は、開発に利用できるほど良好な画質のものが多く投稿されています。

使えそうな動画があれば、コマンドラインで利用できるダウンローダを用いてそのアーカイブを取得するためにコマンドラインで一行入力すればよく、数時間分のビデオが十分ほどで入手できたりします。ある程度の精度で目的のシーンを検出できるようになっていれば、大量のビデオに対してバッチ実行しておけば目的のシーンを自動的に探し出せますし、機械学習用のデータを自動的に集められることになります。

ただし、特定のチャンネルの動画のみを利用すると、エンコードされた動画の画質が偏る、たとえばそのプレイヤーが色覚サポートモードを有効にしている場合ゲーム内のインク色が特定2色に偏る(実ワークロードに対して偏ったデータが集まる)といったこともあります。画質については手元でビットレートを変更して再エンコードし種を増やすことが可能ですが、データセットに含まれるインク色が偏る問題は、そういった偏りがないチャンネルを選んだり、(色覚サポートを使うユーザばかりにならないよう)複数のチャンネルをデータセットに加えるといった対策が必要です。




2022年1月1日土曜日

OpenCVでサイゼリヤの「超難解 大人の間違い探し」を台無しにする



 あけましておめでとうございます。


先日、久々にサイゼリヤで外食をした際、レジにて間違い探しを配っているのに気づきました。自分がこれを手にした瞬間に頭のなかで間違いを見つけるプログラムができたのですが、数日後の深夜に実際にコードに落として、動くものができました。

今回は、どのようにしてこの出力を得たかについて、具体的なコード例とあわせて紹介します。


基本的な考え方

サイゼリヤの間違い探しは、左右反転した状態で絵の違いを見つけるものです。今回はAKAZE特徴量を用いてふたつの画像を重ね合わせた上で、差異部分に色をつけてマークしていくことにします。


入力画像を用意する

今回サイゼリヤで入手した間違い探しはA4サイズのコピー用紙に印刷されたものです(配達のときに同梱したりしているようですね)。後でAKAZE特徴量のアルゴリズムで取り扱うため、きちんと平面がでた状態でスキャンします。



今回は手元にあったドキュメントスキャナ(Canon ImageFormla DR-C125)を使いましたが、きちんと平らにできれば、携帯電話のカメラを使ったりしてもいいですし、コンビニのコピー機で適当な形式で保存してきても構いません。

また、ソースの特性を考慮し、可能な限り、白・黒に二値化します。必須ではないのですが、これをしておくことで後の処理がしやすくなるでしょう。二値化はPhotoshop, GIMPなどの画像編集ツールやドキュメントスキャナのスキャン時設定にて行えるでしょう。

入力画像を2ファイルに分割する



img1.jpg, img2.jpg として対象の画像を保存します。今回は macOS 標準のスクリーンショット機能を用いて、とても雑にファイルを生成しました。後でAKAZE特徴量を用いて重ね合わせるつもりですので、ここで傾きや大きさ、ピクセルのシフト量など、正確さを求める必要はありません。今回は JPEGを使用しましたが、 OpenCV が対応する形式であれば PNG などでも構いません。


作業環境

今回は Juypter Notebook 上で、 Python 3.8 の環境に OpenCV 4.x をインストールし作業していきます。 OpenCV は下記操作でインストールするとよいでしょう。 opencv-contrib-python パッケージをインストールすることで、のちに使う AKAZE 特徴量などのアルゴリズムもすぐ使える状態になっています。

!pip install opencv-contrib-python
画像の読み込み
cv2.imread() で画像を読み取って ndarray にします。元画像は原則モノクロですし、面倒を減らすため、読み込み時に1チャンネルのグレースケール画像としておきました。このほうが後での画像のマッチングや比較も簡単です。
import cv2
import numpy as np
from math import sqrt
img1 = cv2.imread("saizeriya_akaze/img1.jpg", cv2.IMREAD_GRAYSCALE)
img2 = cv2.imread("saizeriya_akaze/img2.jpg", cv2.IMREAD_GRAYSCALE)

img1 と img2 は左右反転していますので、 img2 を左右反転させておきます。

img2 = cv2.flip(img2, 1)
2枚の画像を重ね合わせる

AKAZE特徴量を使って画像をマッチングします。

akaze = cv2.AKAZE_create()
kpts1, desc1 = akaze.detectAndCompute(img1, None)
kpts2, desc2 = akaze.detectAndCompute(img2, None)
matcher = cv2.DescriptorMatcher_create(cv2.DescriptorMatcher_BRUTEFORCE_HAMMING)
nn_matches = matcher.knnMatch(desc1, desc2, 2)
matched1 = []
matched2 = []
nn_match_ratio = 0.8 # Nearest neighbor matching ratio
for m, n in nn_matches:
    if m.distance < nn_match_ratio * n.distance:
        matched1.append(kpts1[m.queryIdx])
        matched2.append(kpts2[m.trainIdx])
inliers1 = []
inliers2 = []
good_matches = []
inlier_threshold = 1000 # Distance threshold to identify inliers with homography check
for i, m in enumerate(matched1):
    col = np.ones((3,1), dtype=np.float64)
    col[0:2,0] = m.pt
    dist = sqrt(pow(col[0,0] - matched2[i].pt[0], 2) +\
                pow(col[1,0] - matched2[i].pt[1], 2))
    if dist < inlier_threshold:
        good_matches.append(cv2.DMatch(len(inliers1), len(inliers2), 0))
        inliers1.append(matched1[i])
        inliers2.append(matched2[i])

これで img1 と img2(左右反転) 画像の特徴を対応が得られます。これを画像として出力してみます。

res = np.empty((max(img1.shape[0], img2.shape[0]), img1.shape[1]+img2.shape[1], 3), dtype=np.uint8)
cv2.drawMatches(img1, inliers1, img2, inliers2, good_matches, res)
cv2.imwrite("saizeriya_akaze/result.png", res)


コピー&ペーストでソースコードをいじった時に少しおかしくなったように見えるけど、実用上問題なさそうなのでヨシ。

もちろん、間違い探しという特性上、画像が意図的に異なるので、マッチしない特徴もあるわけですが、十分な数の特徴量がマッチすれば、やりたい事に対して問題はありません。

この特徴量をもとに2枚の映像を重ね合わせます。 img1 の位置を基準とし、 img2 が img1 に重なるようにワープフィルタをかけた img3 を得ます。この処理は IkaLog というプログラムから一部を持ってきました。そのため画像名にキャリブレーションイメージ、キャプチャイメージという名前が付いていますが、そのまま利用しています。
Acalibration_image_gray = img1
capture_image_gray = img2
matcher = cv2.BFMatcher(cv2.NORM_HAMMING)


capture_image_keypoints, capture_image_descriptors = \
    akaze.detectAndCompute(capture_image_gray, None)
calibration_image_keypoints, calibration_image_descriptors = \
    akaze.detectAndCompute(calibration_image_gray, None)

print('caputure_image - %d features' % (len(capture_image_keypoints)))
print('matching...')

def filter_matches(kp1, kp2, matches, ratio=0.75):
    mkp1, mkp2 = [], []
    for m in matches:
        if len(m) == 2 and m[0].distance < m[1].distance * ratio:
            m = m[0]
            mkp1.append(kp1[m.queryIdx])
            mkp2.append(kp2[m.trainIdx])
    p1 = np.float32([kp.pt for kp in mkp1])
    p2 = np.float32([kp.pt for kp in mkp2])
    kp_pairs = zip(mkp1, mkp2)
    return p1, p2, kp_pairs
raw_matches = matcher.knnMatch(
    calibration_image_descriptors,
    trainDescriptors=capture_image_descriptors,
    k=2)

p1, p2, kp_pairs = filter_matches(
    calibration_image_keypoints,
    capture_image_keypoints,
    raw_matches,)

if len(p1) >= 4:
    H, status = cv2.findHomography(p1, p2, cv2.RANSAC, 5.0)
    print('%d / %d  inliers/matched' % (np.sum(status), len(status)))
else:
    H, status = None, None
    print('%d matches found, not enough for homography estimation' % len(p1))
    raise Exception()

assert H is not None

if len(status) < 1000:
    raise WarpCalibrationNotFound()

calibration_image_height, calibration_image_width = img1.shape

corners = np.float32(
    [[0, 0],
     [calibration_image_width, 0],
     [calibration_image_width, calibration_image_height],
     [0, calibration_image_height]])

pts1 = np.float32(cv2.perspectiveTransform(
    corners.reshape(1, -1, 2), H).reshape(-1, 2) + (0, 0))
pts2 = np.float32([
    [0, 0],
    [calibration_image_width, 0],
    [calibration_image_width, calibration_image_height],
    [0, calibration_image_height]])

print('pts1: %s' % [pts1])
print('pts2: %s' % [pts2])

M = cv2.getPerspectiveTransform(pts1, pts2)
 
img3 = cv2.warpPerspective(img2, M, (calibration_image_width, calibration_image_height))
これにより img1 とできるだけマッチするようワープフィルタを適用した img3 が得られました。


img1 と img2 の縦横ピクセル数は一致していないのですが、 img1 にあわせてワープフィルタをかけた img3 は img1 と同じピクセル数になっているので、このことを確認しておきます。

こんな面倒なことをしなくても、他のツールを使ってあらかじめ重ね合わせておくことも可能です。ただ、この重ね合わせ処理含めて実装したので、サイゼリヤの間違い探しの新しいバージョンが出ても、入力画像を差し替える以上の労力をかける必要はなくなりました。


画像の差分をとる

img1 と img3 の差の絶対値をとります。これにより1プレーン8ビット(256色調)色調の画像に対して、差がなければ0、差があれば255の値が得られます。なお img1 および img3 は符号なし8ビット整数の1プレーン モノクロ画像ですが、差を取るにあたり -255 ~ 255 までの範囲をとりうるため、 符号付き32ビット整数として扱っています。絶対値をとった時点では 0~255 に収まっているため、 img_abs は 0~255 の符号付き32ビット整数です。

img_abs = abs(img1.astype(np.int32) - img3.astype(np.int32))

img_abs の内容をモノクロ画像として出力した例を参考までに掲載しておきます。 img1, img2 で差があった部分が白く浮き上がっていることがわかるでしょう。



差分強調画像を作成し出力する

最終結果を出力します。ここではimg1を基として答えの画像を作成します。この画像は HSV フォーマットとして扱うことにします。HSVとは Hue(色相), Saturation(彩度)、明るさ(Brightness) の3要素で色を表現します。モノクロ画像をHSV色空間に変換していますので、S=0、V=0~255, Hについては未定義(結果としてゼロが入っているが)という状態になります。

img_bgr = cv2.cvtColor(img1, cv2.COLOR_GRAY2BGR)
img_hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV)

このうち彩度のプレーンを、先ほど得た img1 と img3 の絶対値 img_abs で置き換えます。差のある部分はほぼ 0 、差がある部分だけ彩度があがるため、結果として img1 の画像に対して、 img1 と img3 の差がある部分には何らかの色が付きます。どのような色がつくかは色相チャンネルによりますが、今回 cv2.cvtColor() はこのチャンネルを0で初期化していますので、これに対応して赤色になります。

img_hsv[:, :, 1] = img_abs

生成した画像を cv2.imwrite() で画像ファイルに落とします。 cv2.imwrite() は BGR フォーマットのカラー画像をとりますのでHSV色空間からBGR色空間へ変換も行います。

cv2.imwrite("saizeriya_akaze/result.png", cv2.cvtColor(img_hsv, cv2.COLOR_HSV2BGR))

出力結果


今年もよろしく御願いします。