RabbitMQ + MQTT で Pub/Sub サーバを立てることにしました。
いろいろなセンサーのグラフ化にあたって GrowthForecast へ直接 POST を行っていましたが、やはり一旦メッセージブローカー的なものをいれたほうがよさそうだという感じになってきました。
なぜメッセージブローカーが必要か
センサーデータを複数のプログラムから使いたい場合、特にほぼリアルタイムで情報を得たいようなケースだと、直接各アプリケーションに投げるのではなくて、センサーはある一箇所に値を投げることだけを考え、アプリケーションはある一箇所からデータを受けとることだけを考えるように分離したくなります。
例えば今まではセンサーデータをアプリケーションであるグラフサービスに直接投げていましたが、これだとセンサーデータをさらに別のデバイスから読みだして表示するといった場合に、本来の用途ではないグラフサービス側のAPIに問合せたりする必要があって不便です。
MQTT を選択
MQTT はキューがない (最後の値だけ保存する/Retain) Pub/Sub のメッセージ配信プロトコルで、組込み系だとそこそこメジャーなようです。ググってみるとクライアント実装はそこそこ充実しています。
サーバ実装がいまいちコレというのがない気がするのですが、RabbitMQ のプラグインに MQTT プロトコルサポートがあるので、これを利用するのが比較的よさそうでした。
これ系のプロトコルは MQTT 以外にもいろいろあって、RabbitMQ の本来の用途である AMQP も競合プロトコルになります。AMQP より MQTT が好まれるのはプロトコルのシンプルさのためですが、機能的には AMQP が勝ります。
RabbitMQ のインストール
ひとまず Ubuntu のパッケージをそのままつかうことにしました。
sudo apt-get install rabbitmq-server
Rabbit MQ は Erlang で書かれているので、Erlang 関係のパッケージが大量にはいります。
インストール直後から起動していて、rabbitmqctl status でステータスが見れます (root 権限が必要です)。デフォルトではクラスタリング用のポート25672と、AMQP 用のポート 5672 が listen されていました。
sudo rabbitmqctl status
続いて mqtt プラグインを有効にしておきます。
sudo rabbitmq-plugins enable rabbitmq_mqtt
このコマンドは自動的に設定が反映され、status を見ると mqtt を 1883 で listen していることがわかります。
RabbitMQ の設定
適当に見てみると /etc/rabbitmq/rabbitmq-env.conf というのが最初からありますが、これは環境変数設定ファイルなのでとりあえずそのままにしておきます。
ログとして /var/log/rabbitmq/$node@$host.log というファイルがあり、これの冒頭に
node : rabbit@stfuawsc home dir : /var/lib/rabbitmq config file(s) : /etc/rabbitmq/rabbitmq.config (not found) cookie hash : log : /var/log/rabbitmq/rabbit@stfuawsc.log sasl log : /var/log/rabbitmq/rabbit@stfuawsc-sasl.log database dir : /var/lib/rabbitmq/mnesia/rabbit@stfuawsc
というログが出ています。この通り設定ファイルは /etc/rabbitmq/rabbitmq.config になりますが、まだないので作る必要があります。
サンプル設定ファイルがあるのでとりあえずこれをコピペしてつくるのがよさそうです。
# とりあえず眺める zless /usr/share/doc/rabbitmq-server/rabbitmq.config.example.gz # コピペからはじめる sudo sh -c 'zcat /usr/share/doc/rabbitmq-server/rabbitmq.config.example.gz > /etc/rabbitmq/rabbitmq.config'
Erlang の設定ファイル形式なのでちょっと読みにくいです。
- % から行末まではコメント
- シングルクオートはアトム (Ruby でいうところのシンボルと同様)
- ダブルクオートはその文字列の数値のリスト
- 括弧付きダブルクオートはバイナリ (Bit Syntax)
- 余計なカンマがあると怒られる (ケツカンマ問題がある)
設定方針
ここでは方針として以下のようにします。
- MQTT を使う
- TLS を使う
TLS の設定では取得済みの Let's Encrypt の証明書をそのまま流用します。ただ、Let's Encrypt がつくる /etc/letsencrypt/live 以下が root 以外読めないようになっている一方、RabbitMQ は証明書ファイルを rabbitmq ユーザで読もうとするようで eacces がでます。
しかたないので以下のようにしてアクセスを許可するように変えてしまいました。なにかもっとスマートに解決したほうがいいと思うのですが、思いつきませんでした。
sudo chmod 0755 /etc/letsencrypt/{live,archive}
設定
以下のようにしてみました。
%% -*- mode: erlang -*- %% ---------------------------------------------------------------------------- %% RabbitMQ Sample Configuration File. %% %% See http://www.rabbitmq.com/configure.html for details. %% ---------------------------------------------------------------------------- [ {rabbit, [ {ssl_listeners, [5671]}, {handshake_timeout, 10000}, {log_levels, [{connection, info}, {channel, info}]}, {ssl_options, [{cacertfile, "/etc/letsencrypt/live/cho45.stfuawsc.com/fullchain.pem"}, {certfile, "/etc/letsencrypt/live/cho45.stfuawsc.com/cert.pem"}, {keyfile, "/etc/letsencrypt/live/cho45.stfuawsc.com/privkey.pem"}, {verify, verify_peer}, {fail_if_no_peer_cert, false}]}, {auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, {auth_backends, [rabbit_auth_backend_internal]}, {ssl_handshake_timeout, 5000} ]}, {rabbitmq_mqtt, [ {allow_anonymous, false}, {vhost, <<"/">>}, {exchange, <<"amq.topic">>}, {subscription_ttl, 1800000}, {prefetch, 10}, {tcp_listeners, [1883]}, {ssl_listeners, [8883]} ]} ].
ユーザの追加
もともとある guest/guest は localhost からだけ接続が許されているのでそのままにして、外部から使う用のユーザを追加します。
# guest しかない $ sudo rabbitmqctl list_users Listing users ... guest [administrator] # ユーザ追加 $ sudo rabbitmqctl add_user tsun pass Creating user "tsun" ... # 確認 $ sudo rabbitmqctl list_users Listing users ... guest [administrator] tsun [] # パーミッションを追加 conf write read の順で指定する。これは正規表現。ここでは全権 $ sudo rabbitmqctl set_permissions tsun ".*" ".*" ".*" Setting permissions for user "tsun" in vhost "/" ...
繋いでみる
クライアントに rubygems の mqtt を使ってみます
gem install mqtt
#!/usr/bin/env ruby
require 'mqtt'
require 'thread'
# MQTT::Client seems not thread safe
mutex = Mutex.new
pub_thread = Thread.start do
mutex.lock
MQTT::Client.connect(
host: '127.0.0.1',
port: 8883,
ssl: true,
username: 'guest',
password: 'guest',
) do |client|
mutex.unlock
p [:pub, client]
10.times do
sleep 1
p :publish
client.publish("test", "this is test", false)
end
end
end
sub_thread = Thread.start do
mutex.lock
MQTT::Client.connect(
host: '127.0.0.1',
port: 8883,
ssl: true,
username: 'tsun',
password: 'dere',
) do |client|
mutex.unlock
p [:sub, client]
client.subscribe("test")
client.get do |topic,message|
p [ topic, message ]
end
end
end
pub_thread.join
こんな感じのコードで動いていることを確認できるはずです。ただし guest を指定しているので、同一ホストで動かす必要があります。
ポートを開ける
基本の動作確認ができたので、パブリックにアクセス可能にするためポートをあけます。ufw で 8883 (MQTT with TLS) だけをあけました。他はいまのところ使用予定がないので閉じたままです。listen 自体しないほうがより安全ですが、サーバ内でごにょごにょすることはありそうなのでそのままにしています。
sudo ufw allow 8883
外部から、さきほど作ったユーザで接続確認を行ってとりあえずセットアップ完了です。