RabbitMQ + MQTT で Pub/Sub サーバを立てることにしました。

いろいろなセンサーのグラフ化にあたって GrowthForecast へ直接 POST を行っていましたが、やはり一旦メッセージブローカー的なものをいれたほうがよさそうだという感じになってきました。

なぜメッセージブローカーが必要か

センサーデータを複数のプログラムから使いたい場合、特にほぼリアルタイムで情報を得たいようなケースだと、直接各アプリケーションに投げるのではなくて、センサーはある一箇所に値を投げることだけを考え、アプリケーションはある一箇所からデータを受けとることだけを考えるように分離したくなります。

例えば今まではセンサーデータをアプリケーションであるグラフサービスに直接投げていましたが、これだとセンサーデータをさらに別のデバイスから読みだして表示するといった場合に、本来の用途ではないグラフサービス側のAPIに問合せたりする必要があって不便です。

MQTT を選択

MQTT はキューがない (最後の値だけ保存する/Retain) Pub/Sub のメッセージ配信プロトコルで、組込み系だとそこそこメジャーなようです。ググってみるとクライアント実装はそこそこ充実しています。

サーバ実装がいまいちコレというのがない気がするのですが、RabbitMQ のプラグインに MQTT プロトコルサポートがあるので、これを利用するのが比較的よさそうでした。

これ系のプロトコルは MQTT 以外にもいろいろあって、RabbitMQ の本来の用途である AMQP も競合プロトコルになります。AMQP より MQTT が好まれるのはプロトコルのシンプルさのためですが、機能的には AMQP が勝ります。

RabbitMQ のインストール

ひとまず Ubuntu のパッケージをそのままつかうことにしました。

sudo apt-get install rabbitmq-server

Rabbit MQ は Erlang で書かれているので、Erlang 関係のパッケージが大量にはいります。

インストール直後から起動していて、rabbitmqctl status でステータスが見れます (root 権限が必要です)。デフォルトではクラスタリング用のポート25672と、AMQP 用のポート 5672 が listen されていました。

sudo rabbitmqctl status

続いて mqtt プラグインを有効にしておきます。

sudo rabbitmq-plugins enable rabbitmq_mqtt

このコマンドは自動的に設定が反映され、status を見ると mqtt を 1883 で listen していることがわかります。

RabbitMQ の設定

適当に見てみると /etc/rabbitmq/rabbitmq-env.conf というのが最初からありますが、これは環境変数設定ファイルなのでとりあえずそのままにしておきます。

ログとして /var/log/rabbitmq/$node@$host.log というファイルがあり、これの冒頭に

node           : rabbit@stfuawsc
home dir       : /var/lib/rabbitmq
config file(s) : /etc/rabbitmq/rabbitmq.config (not found)
cookie hash    : 
log            : /var/log/rabbitmq/rabbit@stfuawsc.log
sasl log       : /var/log/rabbitmq/rabbit@stfuawsc-sasl.log
database dir   : /var/lib/rabbitmq/mnesia/rabbit@stfuawsc

というログが出ています。この通り設定ファイルは /etc/rabbitmq/rabbitmq.config になりますが、まだないので作る必要があります。

サンプル設定ファイルがあるのでとりあえずこれをコピペしてつくるのがよさそうです。

# とりあえず眺める 
zless /usr/share/doc/rabbitmq-server/rabbitmq.config.example.gz

# コピペからはじめる
sudo sh -c 'zcat /usr/share/doc/rabbitmq-server/rabbitmq.config.example.gz > /etc/rabbitmq/rabbitmq.config'

Erlang の設定ファイル形式なのでちょっと読みにくいです。

  • % から行末まではコメント
  • シングルクオートはアトム (Ruby でいうところのシンボルと同様)
  • ダブルクオートはその文字列の数値のリスト
  • 括弧付きダブルクオートはバイナリ (Bit Syntax)
  • 余計なカンマがあると怒られる (ケツカンマ問題がある)

設定方針

ここでは方針として以下のようにします。

  • MQTT を使う
  • TLS を使う

TLS の設定では取得済みの Let's Encrypt の証明書をそのまま流用します。ただ、Let's Encrypt がつくる /etc/letsencrypt/live 以下が root 以外読めないようになっている一方、RabbitMQ は証明書ファイルを rabbitmq ユーザで読もうとするようで eacces がでます。

しかたないので以下のようにしてアクセスを許可するように変えてしまいました。なにかもっとスマートに解決したほうがいいと思うのですが、思いつきませんでした。

sudo chmod 0755 /etc/letsencrypt/{live,archive}

設定

以下のようにしてみました。

%% -*- mode: erlang -*-
%% ----------------------------------------------------------------------------
%% RabbitMQ Sample Configuration File.
%%
%% See http://www.rabbitmq.com/configure.html for details.
%% ----------------------------------------------------------------------------
[
 {rabbit,
  [
   {ssl_listeners, [5671]},
   {handshake_timeout, 10000},
   {log_levels, [{connection, info}, {channel, info}]},

   {ssl_options, [{cacertfile,           "/etc/letsencrypt/live/cho45.stfuawsc.com/fullchain.pem"},
                  {certfile,             "/etc/letsencrypt/live/cho45.stfuawsc.com/cert.pem"},
                  {keyfile,              "/etc/letsencrypt/live/cho45.stfuawsc.com/privkey.pem"},
                  {verify,               verify_peer},
                  {fail_if_no_peer_cert, false}]},

   {auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
   {auth_backends, [rabbit_auth_backend_internal]},

   {ssl_handshake_timeout, 5000}
  ]},

 {rabbitmq_mqtt,
  [
   {allow_anonymous, false},
   {vhost, <<"/">>},
   {exchange, <<"amq.topic">>},
   {subscription_ttl, 1800000},
   {prefetch, 10},

   {tcp_listeners, [1883]},
   {ssl_listeners, [8883]}
  ]}
].

ユーザの追加

もともとある guest/guest は localhost からだけ接続が許されているのでそのままにして、外部から使う用のユーザを追加します。

# guest しかない
$ sudo rabbitmqctl list_users
Listing users ...
guest   [administrator]

# ユーザ追加
$ sudo rabbitmqctl add_user tsun pass
Creating user "tsun" ...

# 確認
$ sudo rabbitmqctl list_users        
Listing users ...
guest   [administrator]
tsun    []

# パーミッションを追加 conf write read の順で指定する。これは正規表現。ここでは全権
$  sudo rabbitmqctl set_permissions tsun  ".*" ".*" ".*"
Setting permissions for user "tsun" in vhost "/" ...

繋いでみる

クライアントに rubygems の mqtt を使ってみます

gem install mqtt
#!/usr/bin/env ruby

require 'mqtt'
require 'thread'

# MQTT::Client seems not thread safe
mutex = Mutex.new

pub_thread = Thread.start do
	mutex.lock
	MQTT::Client.connect(
		host: '127.0.0.1',
		port: 8883,
		ssl: true,
		username: 'guest',
		password: 'guest',
	) do |client|
		mutex.unlock
		p [:pub, client]


		10.times do
			sleep 1
			p :publish
			client.publish("test", "this is test", false)
		end
	end
end

sub_thread = Thread.start do
	mutex.lock
	MQTT::Client.connect(
		host: '127.0.0.1',
		port: 8883,
		ssl: true,
		username: 'tsun',
		password: 'dere',
	) do |client|
		mutex.unlock
		p [:sub, client]
		client.subscribe("test")
		client.get do |topic,message|
			p [ topic, message ]
		end
	end
end

pub_thread.join

こんな感じのコードで動いていることを確認できるはずです。ただし guest を指定しているので、同一ホストで動かす必要があります。

ポートを開ける

基本の動作確認ができたので、パブリックにアクセス可能にするためポートをあけます。ufw で 8883 (MQTT with TLS) だけをあけました。他はいまのところ使用予定がないので閉じたままです。listen 自体しないほうがより安全ですが、サーバ内でごにょごにょすることはありそうなのでそのままにしています。

 sudo ufw allow 8883

外部から、さきほど作ったユーザで接続確認を行ってとりあえずセットアップ完了です。

ref

  1. トップ
  2. tech
  3. センサーデータ用に RabbitMQ + MQTT をセットアップする

寝室に置いてみたいので ESP8266 (ESP-WROOM-02) で動かして GrowthForecast にポストするようにしてみた。

MH-Z19 を PWM 経由で読んでいる loop 関数だけ抜きだすと以下のような感じ。とりあえず割込みは使ってない。

void loop() {
	ArduinoOTA.handle();

	static uint32_t prevTime = 0;
	static uint8_t lastState = 0;
	static uint32_t	th;
	static uint32_t	tl;
	int state = digitalRead(PWM_INPUT);
	if (lastState == state) {
		// nothing to do
	} else {
		lastState = state;
		uint32_t now = millis();
		if (!prevTime) {
			prevTime = now;
			return;
		}
		uint32_t interval = 0;
		if (prevTime <= now) {
			interval = now - prevTime;
		} else {
			interval = 0xffffffff - prevTime + now + 1;
		}
		prevTime = now;
		if (state == 1) {
			tl = interval;
			if (tl && th) {
				uint32_t cycle = tl + th;
				if ((uint16_t)(1004 * 0.95) < cycle && cycle < (uint16_t)(1004 + 1.05)) {
					uint16_t ppm = 5000.0 * ((float)(th - 2) / (float)(cycle - 4));
					Serial.printf("%d ppm (cycle %d / th: %d, tl: %d)\n", ppm, cycle, th, tl);
					gf.post("/home/sensor/co2_1", ppm);

					// reset count
					th = 0; tl = 0;
					prevTime = 0;
				} else {
					// error
					Serial.println("error");
				}
			}
		} else
		if (state == 0) {
			th = interval;
		}
	}
}
  1. トップ
  2. tech
  3. ESP8266 Arduino で CO2 センサー MH-Z19 を読む

Aliexpress で歪みゲージ (ロードセル 1kg) と、HX711 のモジュールを買ったので試してみました。

歪みゲージのつかいかた

歪ませる必要があるので、このように上下に板をつけて挟みこみます。このとき、多少すきまが必要なのでワッシャーなどをかまします。

今回使った歪みゲージのスペックは以下の通りです。(コピペ) また、M4 と M5 のネジが切ってありました。

Rated Load: 1Kg
Rated Output: 1.0mV/V±0.15mV/V
Zero Output: ±0.1mV/V
Creep: 0.03%F.S./30min
Input End: Red+ (power), Black-(power)
Output End: Green+(signal), White-(signal)
Recommended operating voltage: 3 ~ 12 VDC
Maximum operating voltage: 15 VDC
Input Impedance: 1115±10%Ω
Output Impedance: 1000±10%Ω
Protection class: IP65
Total Size: approx. 3.15 x0.50 x 0.50 inch
Cable: 0.8 x 20 cm (diameter x length)
Material: Aluminum Alloy
Weight: 30g

HX711 で読んでみる (Arduino)

手に入れたのは↑のようなモジュールです。適当に配線して読みます。

以下のライブラリを使ってみました。

README の通りにスケールを設定する必要があります。

#include <Arduino.h>
// https://github.com/bogde/HX711
#include "HX711.h"

const int DT_PIN = 2;
const int SCK_PIN = 3;

HX711 scale;

void setup() {
	Serial.begin(9600);
	Serial.println("start");
	scale.begin(DT_PIN, SCK_PIN);

	Serial.print("read:");
	Serial.println(scale.read());

	scale.set_scale();
	scale.tare();

	Serial.print("calibrating...");
	delay(5000);
	Serial.println(scale.get_units(10));

	scale.set_scale(-1536.00);
	scale.tare();

	Serial.print("read (calibrated):");
	Serial.println(scale.get_units(10));
}


void loop() {
	Serial.println(scale.get_units(10), 1);

	scale.power_down();
	delay(500);
	scale.power_up();
}

このようなコードを書いて、

  1. 何ものせずに起動
  2. calibrating... が表示されたらすかさず1円玉を1枚載せる (1g)
  3. 値を読む
  4. scale.set_scale(...); に値を埋めこむ

という方法をとりました。

どんな感じか

こんな感じで値が読めました。1円玉を1つずつ増やして5枚まで載せてみました。

ref.

  1. トップ
  2. tech
  3. 歪みゲージ(ロードセル)と HX711 を使って重量計測する (Arduino)

同級生との飲み会で若干ハメをはずして飲みすぎて久しぶりに激しい二日酔いになってしまった。二日酔いになるたびに二日酔いをすみやかに解消する方法を調べてるけど、結局ないってことはわかっている。

必要なのは時間と水分(分解に必要)なので、スポーツドリンクをこまめに飲みつつ、梅干し純を舐めて、身体をあたためつつ布団で寝ていた。

アサヒグループ食品 梅ぼし純 24粒×10個 - アサヒグループ食品

アサヒグループ食品

5.0 / 5.0

梅干し純は気持ち悪いのが軽減されるうえに塩分が得られる。スポーツドリンクにも塩は入っているけど、少なめになっているらしいので、ちょっと追加するのがちょうどいいのではないかという狙い。

MH-Z19 の個体差とキャリブレーションの必要性 | tech - 氾濫原 というのを書きましたが、とりあえず 400ppm 環境は比較的簡単に作れるのだから、それでキャリブレーションしてみることにしました。

その過程でこのセンサの「ゼロキャリブレーション」について誤解していたことがわかりました。 MH-Z19 という格安 CO2 センサを読んでみた | tech - 氾濫原 に追記してありますが、ここでいう「ゼロ」は 400ppm 環境のことであるらしく、0ppm の環境は必要ありませんでした。また「スパンキャリブレーション」は最低でも1000ppm以上、2000ppm以上推奨とのことでした。MH-Z19 のドキュメントではなく MH-Z19B のドキュメントのほうが詳しく書いてあり、こちらを参考にしました。

スパンキャリブレーションはそのような特定環境をつくるのが難しいので、今回は「ゼロキャリブレーション」だけを行ってグラフにしばらく投稿してみました。

ゼロキャリブレーションの方法

窓をあけて換気扇をつけ、周辺環境をできるだけ400ppmに近付けます。最低でも30分ぐらいは換気します。室内であっても十分に換気ができていれば400ppm前後にはできるので、気をつけるのは息を吹きかけないようにする (自分が風上にいないこと) ぐらいです。人間の呼気中の CO2 は 4% (40000ppm) ぐらいなので、普通に影響します。

センサはシリアル経由で繋ぎました。起動に数分時間がかかるので放っておきます。

以下のスクリプトを実行します。

https://github.com/cho45/ruby-mh-z19/blob/master/examples/cal.rb

短いのでコピペすると

#!/usr/bin/env ruby

$LOAD_PATH << "#{File.dirname(__FILE__)}/../lib"

require 'mh-z19'

co2 = MH_Z19::Serial.new(ENV['PORT'])
# wait sensor startup
loop do
	detail =  co2.read_concentration_detail
	p detail
	case detail[:status]
	when 0
		p "booting"
	when 1
		p "startup"
	when 64
		break
	end
	sleep 1
end

co2.calibrate_zero_point

sleep 3

p co2.read_concentration_detail

こんな感じになります。この MH-Z19 はドキュメントに書いてないのですが、ステータスビットや認識している温度も返すようで、read_concentration_detail だとそれらもとれるような実装にしてあります。

グラフ

例によって1日ぐらい放置してみました。スパンキャリブレーションしていないので、やはり最大値に差があります。とはいえ 100〜200ppm 以内には入ってる感じなので、実用的にはとりあえず十分かもしれません。いずれにせよ何もしない場合よりは遥かにマシなので、必ずやったほうがよさそうです。

  1. トップ
  2. tech
  3. MH-Z19 のゼロキャリブレーションをしてみる