ウェブページから JavaScript で MQTT ブローカーに送られてくるデータを取得したかったので、MQTT over WebSocket を試してみました。

RabbitMQ 3.6.6 をいれる

Ubuntu のレポジトリのは 3.5.7 と古いので、Installing on Debian / Ubuntu にしたがって 3.6.6 にします。

既に古いのが入っていても、説明通りに RabbitMQの apt レポジトリを設定して、 apt-get update して install したら自動でアップグレードされます。

プラグインをいれる

rabbitmq_web_mqtt をいれます。これは Community Plugins にありますが、 RabbitMQ のチームが作っているので安心感があります。プラグイン自体が比較的新しくて、RabbitMQ 3.6.1 以降でないと使えません。

$ wget --content-disposition https://bintray.com/rabbitmq/community-plugins/download_file\?file_path\=rabbitmq_web_mqtt-3.6.x-14dae543.ez
$ sudo mv rabbitmq_web_mqtt-3.6.x-14dae543.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.6/plugins/
$  sudo rabbitmq-plugins list | grep web_mqtt
[  ] rabbitmq_web_mqtt 

$ sudo rabbitmq-plugins enable rabbitmq_web_mqtt
The following plugins have been enabled:
  cowlib
  cowboy
  rabbitmq_web_mqtt

Applying plugin configuration to rabbit@stfuawsc... started 3 plugins.

$ sudo rabbitmq-plugins list | grep web_mqtt    
[E*] rabbitmq_web_mqtt    

設定する

デフォルトで 15675 が listen されています。このままでいいのですが、念のため設定を書いておくことにしました。

設定例がないのでソースを読むしかないようです。rabbitmq_web_mqtt.schema が設定ファイルのスキーマのようなので、これを参考に以下のようにしました。

 {rabbitmq_web_mqtt,
  [
   {tcp_config,[{port, 15675}]}
  ]
 }

WebSocket 経路の TLS 化はリバースプロキシ (h2o) で行うので、このサーバ自体には TLS の設定を書いていません。

リバースプロキシ

HTTP フロントエンドの h2o からポート 15675 にリバースプロキシするように設定しました。これで外部から接続可能になります。

なおh2oは定期的にWebSocket接続を切るようになっているので、クライアント側で再接続するコードが必須です。

WebSocket から繋ぐ

認証はどうなるのか? という疑問があるところですが、MQTT レイヤーでのユーザ認証があります。Upgrade 時に Origin による制限などはかけられないようです。

rabbitmq-web-mqtt-examples というレポジトリがあり、これを参考にすれば簡単に接続できます。

ライブラリとして Eclipse Paho の JavaScript 版を使っています。WebSocket のエンドポイントを指定する以外は普通にMQTTするのと変わりません。

var TOPIC = "/foo/bar/baz";
var USER = "foo";
var PASS = "bar";
function mqttConnect() {
	var client = new Paho.MQTT.Client(
		location.hostname,
		location.port || 80,
		"/mqtt", "myclientid_" + Math.random().toString(32)
	);

	client.onConnectionLost = function (responseObject) {
		console.log(responseObject);
		// reconnect
		setTimeout(mqttConnect, 1000);
	};

	client.onMessageArrived = function (message) {
		var data = Number(message.payloadString);
		console.log(data);
	};

	client.connect({
		userName: USER,
		password: PASS,
		timeout: 3,
		onSuccess: function () {
			console.log('onSuccess');
			client.subscribe(TOPIC, {qos: 1});
		},
		onFailure: function (message) {
			console.log('onFailure', message.errorMessage, message);
		}
	});
}

ブラウザ上の WebSocket の場合、TLS はブラウザ側でやってくれるので、普通の MQTT with TLS の接続で考えることよりも少なくて楽です。

ref

  1. トップ
  2. tech
  3. RabbitMQ で MQTT over WebSocket

RabbitMQ + MQTT で Pub/Sub サーバを立てることにしました。

いろいろなセンサーのグラフ化にあたって GrowthForecast へ直接 POST を行っていましたが、やはり一旦メッセージブローカー的なものをいれたほうがよさそうだという感じになってきました。

なぜメッセージブローカーが必要か

センサーデータを複数のプログラムから使いたい場合、特にほぼリアルタイムで情報を得たいようなケースだと、直接各アプリケーションに投げるのではなくて、センサーはある一箇所に値を投げることだけを考え、アプリケーションはある一箇所からデータを受けとることだけを考えるように分離したくなります。

例えば今まではセンサーデータをアプリケーションであるグラフサービスに直接投げていましたが、これだとセンサーデータをさらに別のデバイスから読みだして表示するといった場合に、本来の用途ではないグラフサービス側のAPIに問合せたりする必要があって不便です。

MQTT を選択

MQTT はキューがない (最後の値だけ保存する/Retain) Pub/Sub のメッセージ配信プロトコルで、組込み系だとそこそこメジャーなようです。ググってみるとクライアント実装はそこそこ充実しています。

サーバ実装がいまいちコレというのがない気がするのですが、RabbitMQ のプラグインに MQTT プロトコルサポートがあるので、これを利用するのが比較的よさそうでした。

これ系のプロトコルは MQTT 以外にもいろいろあって、RabbitMQ の本来の用途である AMQP も競合プロトコルになります。AMQP より MQTT が好まれるのはプロトコルのシンプルさのためですが、機能的には AMQP が勝ります。

RabbitMQ のインストール

ひとまず Ubuntu のパッケージをそのままつかうことにしました。

sudo apt-get install rabbitmq-server

Rabbit MQ は Erlang で書かれているので、Erlang 関係のパッケージが大量にはいります。

インストール直後から起動していて、rabbitmqctl status でステータスが見れます (root 権限が必要です)。デフォルトではクラスタリング用のポート25672と、AMQP 用のポート 5672 が listen されていました。

sudo rabbitmqctl status

続いて mqtt プラグインを有効にしておきます。

sudo rabbitmq-plugins enable rabbitmq_mqtt

このコマンドは自動的に設定が反映され、status を見ると mqtt を 1883 で listen していることがわかります。

RabbitMQ の設定

適当に見てみると /etc/rabbitmq/rabbitmq-env.conf というのが最初からありますが、これは環境変数設定ファイルなのでとりあえずそのままにしておきます。

ログとして /var/log/rabbitmq/$node@$host.log というファイルがあり、これの冒頭に

node           : rabbit@stfuawsc
home dir       : /var/lib/rabbitmq
config file(s) : /etc/rabbitmq/rabbitmq.config (not found)
cookie hash    : 
log            : /var/log/rabbitmq/rabbit@stfuawsc.log
sasl log       : /var/log/rabbitmq/rabbit@stfuawsc-sasl.log
database dir   : /var/lib/rabbitmq/mnesia/rabbit@stfuawsc

というログが出ています。この通り設定ファイルは /etc/rabbitmq/rabbitmq.config になりますが、まだないので作る必要があります。

サンプル設定ファイルがあるのでとりあえずこれをコピペしてつくるのがよさそうです。

# とりあえず眺める 
zless /usr/share/doc/rabbitmq-server/rabbitmq.config.example.gz

# コピペからはじめる
sudo sh -c 'zcat /usr/share/doc/rabbitmq-server/rabbitmq.config.example.gz > /etc/rabbitmq/rabbitmq.config'

Erlang の設定ファイル形式なのでちょっと読みにくいです。

  • % から行末まではコメント
  • シングルクオートはアトム (Ruby でいうところのシンボルと同様)
  • ダブルクオートはその文字列の数値のリスト
  • 括弧付きダブルクオートはバイナリ (Bit Syntax)
  • 余計なカンマがあると怒られる (ケツカンマ問題がある)

設定方針

ここでは方針として以下のようにします。

  • MQTT を使う
  • TLS を使う

TLS の設定では取得済みの Let's Encrypt の証明書をそのまま流用します。ただ、Let's Encrypt がつくる /etc/letsencrypt/live 以下が root 以外読めないようになっている一方、RabbitMQ は証明書ファイルを rabbitmq ユーザで読もうとするようで eacces がでます。

しかたないので以下のようにしてアクセスを許可するように変えてしまいました。なにかもっとスマートに解決したほうがいいと思うのですが、思いつきませんでした。

sudo chmod 0755 /etc/letsencrypt/{live,archive}

設定

以下のようにしてみました。

%% -*- mode: erlang -*-
%% ----------------------------------------------------------------------------
%% RabbitMQ Sample Configuration File.
%%
%% See http://www.rabbitmq.com/configure.html for details.
%% ----------------------------------------------------------------------------
[
 {rabbit,
  [
   {ssl_listeners, [5671]},
   {handshake_timeout, 10000},
   {log_levels, [{connection, info}, {channel, info}]},

   {ssl_options, [{cacertfile,           "/etc/letsencrypt/live/cho45.stfuawsc.com/fullchain.pem"},
                  {certfile,             "/etc/letsencrypt/live/cho45.stfuawsc.com/cert.pem"},
                  {keyfile,              "/etc/letsencrypt/live/cho45.stfuawsc.com/privkey.pem"},
                  {verify,               verify_peer},
                  {fail_if_no_peer_cert, false}]},

   {auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
   {auth_backends, [rabbit_auth_backend_internal]},

   {ssl_handshake_timeout, 5000}
  ]},

 {rabbitmq_mqtt,
  [
   {allow_anonymous, false},
   {vhost, <<"/">>},
   {exchange, <<"amq.topic">>},
   {subscription_ttl, 1800000},
   {prefetch, 10},

   {tcp_listeners, [1883]},
   {ssl_listeners, [8883]}
  ]}
].

ユーザの追加

もともとある guest/guest は localhost からだけ接続が許されているのでそのままにして、外部から使う用のユーザを追加します。

# guest しかない
$ sudo rabbitmqctl list_users
Listing users ...
guest   [administrator]

# ユーザ追加
$ sudo rabbitmqctl add_user tsun pass
Creating user "tsun" ...

# 確認
$ sudo rabbitmqctl list_users        
Listing users ...
guest   [administrator]
tsun    []

# パーミッションを追加 conf write read の順で指定する。これは正規表現。ここでは全権
$  sudo rabbitmqctl set_permissions tsun  ".*" ".*" ".*"
Setting permissions for user "tsun" in vhost "/" ...

繋いでみる

クライアントに rubygems の mqtt を使ってみます

gem install mqtt
#!/usr/bin/env ruby

require 'mqtt'
require 'thread'

# MQTT::Client seems not thread safe
mutex = Mutex.new

pub_thread = Thread.start do
	mutex.lock
	MQTT::Client.connect(
		host: '127.0.0.1',
		port: 8883,
		ssl: true,
		username: 'guest',
		password: 'guest',
	) do |client|
		mutex.unlock
		p [:pub, client]


		10.times do
			sleep 1
			p :publish
			client.publish("test", "this is test", false)
		end
	end
end

sub_thread = Thread.start do
	mutex.lock
	MQTT::Client.connect(
		host: '127.0.0.1',
		port: 8883,
		ssl: true,
		username: 'tsun',
		password: 'dere',
	) do |client|
		mutex.unlock
		p [:sub, client]
		client.subscribe("test")
		client.get do |topic,message|
			p [ topic, message ]
		end
	end
end

pub_thread.join

こんな感じのコードで動いていることを確認できるはずです。ただし guest を指定しているので、同一ホストで動かす必要があります。

ポートを開ける

基本の動作確認ができたので、パブリックにアクセス可能にするためポートをあけます。ufw で 8883 (MQTT with TLS) だけをあけました。他はいまのところ使用予定がないので閉じたままです。listen 自体しないほうがより安全ですが、サーバ内でごにょごにょすることはありそうなのでそのままにしています。

 sudo ufw allow 8883

外部から、さきほど作ったユーザで接続確認を行ってとりあえずセットアップ完了です。

ref

  1. トップ
  2. tech
  3. センサーデータ用に RabbitMQ + MQTT をセットアップする

寝室に置いてみたいので ESP8266 (ESP-WROOM-02) で動かして GrowthForecast にポストするようにしてみた。

MH-Z19 を PWM 経由で読んでいる loop 関数だけ抜きだすと以下のような感じ。とりあえず割込みは使ってない。

void loop() {
	ArduinoOTA.handle();

	static uint32_t prevTime = 0;
	static uint8_t lastState = 0;
	static uint32_t	th;
	static uint32_t	tl;
	int state = digitalRead(PWM_INPUT);
	if (lastState == state) {
		// nothing to do
	} else {
		lastState = state;
		uint32_t now = millis();
		if (!prevTime) {
			prevTime = now;
			return;
		}
		uint32_t interval = 0;
		if (prevTime <= now) {
			interval = now - prevTime;
		} else {
			interval = 0xffffffff - prevTime + now + 1;
		}
		prevTime = now;
		if (state == 1) {
			tl = interval;
			if (tl && th) {
				uint32_t cycle = tl + th;
				if ((uint16_t)(1004 * 0.95) < cycle && cycle < (uint16_t)(1004 + 1.05)) {
					uint16_t ppm = 5000.0 * ((float)(th - 2) / (float)(cycle - 4));
					Serial.printf("%d ppm (cycle %d / th: %d, tl: %d)\n", ppm, cycle, th, tl);
					gf.post("/home/sensor/co2_1", ppm);

					// reset count
					th = 0; tl = 0;
					prevTime = 0;
				} else {
					// error
					Serial.println("error");
				}
			}
		} else
		if (state == 0) {
			th = interval;
		}
	}
}
  1. トップ
  2. tech
  3. ESP8266 Arduino で CO2 センサー MH-Z19 を読む