2011年 02月 17日

メッセージキュー、何を使えばいいのかわからないお……

Q4M って DBI 使うとブロックしちゃうけど、非同期にするにはどうしたらいいんだろう。AnyEvent::Worker|AnyEvent::DBI (=fork) 使うのがいいのかな。スタンダードな方法がわからない

LDR Streaming API では自力で MQ 作ったというのを見たけど、これ使っとけ的なのはないのかなあ……

Memo

HTTP ベースで pub/sub なメッセージキュー

かなりざっくり書くと以下のような感じになる気がするけど、こういう HTTP ベースの内部用途の pub/sub サーバって既に良いのはあるのかな……

package Buspub;

use strict;
use warnings;
sub route ($$);
use Router::Simple;
use Tie::IxHash;
use AnyEvent::HTTP;
use HTTP::Request::Common;
use JSON::XS;
use POSIX ();

use MongoDB;
my $connection = MongoDB::Connection->new(host => 'localhost', port => 27017);
my $subscriptions = $connection->get_database('pubsub')->subscriptions;
$subscriptions->ensure_index(Tie::IxHash->new( key => 1, created => -1 ), { background => 1 });

route '/sub' => {
	action => sub {
		my ($r) = @_;
		my $key = $r->req->param('key') or return $r->json({ error => "key required"});;
		my $callback = $r->req->param('callback') or return $r->json({ error => "callback required"});;
		my $id = $subscriptions->insert({
			key      => $key,
			messages => [],
			callback => $callback,
			max      => 25,
			created  => scalar time(),
		});
		LOG("SUB:: %s with callback: %s", $key, $callback);
		$r->json({ id => "$id" });
	}
};

route '/pub' => {
	action => sub {
		my ($r) = @_;
		my $key = $r->req->param('key') or return $r->json({ error => "key required"});;
		my $message = decode_json $r->req->param('message');

		my $cursor = $subscriptions->query({ key => $key });
		my %count;
		while (my $obj = $cursor->next) {
			$count{all}++;
			my $id  = $obj->{_id} . "";
			my $uri = $obj->{callback};
			my $messages = [ @{$obj->{messages}}, $message ];
			my $req = POST $uri, [ id => $id, messages => encode_json($messages) ];

			LOG("PUB:: => %s => %s with %d messages", $key, $req->uri, scalar @$messages);
			http_request $req->method => $req->uri,
				body => $req->content,
				headers => {
					map { $_ => $req->header($_), } $req->headers->header_field_names
				},
				timeout => 20,
				sub {
					my ($body, $headers) = @_;
					LOG("PUB:: <= %s <= %s with status:%d", $key, $req->uri, $headers->{Status});
					if ($headers->{Status} =~ /^2/) {
						$subscriptions->update({ _id => $obj->{_id} }, { '$pullAll' => { messages => $obj->{messages} } });
					} elsif ($headers->{Status} =~ /^4/) {
						$subscriptions->remove({ _id => $obj->{_id} });
					} elsif ($headers->{Status} =~ /^5/) {
						if (@$messages > $obj->{max}) {
							$subscriptions->remove({ _id => $obj->{_id} });
						} else {
							$subscriptions->update({ _id => $obj->{_id} }, { '$push' => { messages => $message } });
						}
					}
				}
			;
		}
		$r->json({ key => $key, delivered => \%count });
	}
};

route '/test/callback' =>  {
	action => sub {
		my ($r) = @_;
		use Data::Dumper;
		warn Dumper $r->req->param('id') ;
		warn Dumper decode_json $r->req->param('messages') ;
		$r->res->status($r->req->param('code') || 404);
		$r->res->content_type('application/json; charset=utf8');
		$r->res->content('{}');
		$r->res->finalize;
	}
};


BEGIN {
	my $router = Router::Simple->new;
	sub route ($$) { $router->connect(@_) };

	sub run {
		my ($env) = @_;
		if ( my $handler = $router->match($env) ) {
			my $c = Buspub::Context->new($env);
			$handler->{action}->($c);
		} else {
			[ 404, [ 'Content-Type' => 'text/html' ], ['Not Found'] ];
		}
	}

	sub LOG {
		my ($message, @args) = @_;
		print sprintf("[%s] $message", POSIX::strftime("%Y-%m-%d %H:%M:%S", localtime), @args), "\n";
	}
};

package Buspub::Request;
use parent qw(Plack::Request);

package Buspub::Response;
use parent qw(Plack::Response);

package Buspub::Context;
use JSON::XS;

sub new {
	my ($class, $env) = @_;
	bless {
		req => Buspub::Request->new($env),
		res => Buspub::Response->new(200),
	}, $class;
}

sub req { $_[0]->{req} }
sub res { $_[0]->{res} }

sub json {
	my ($self, $vars) = @_;
	my $body = JSON::XS->new->ascii(1)->encode($vars);
	$self->res->content_type('application/json; charset=utf8');
	$self->res->content($body);
	$self->res->finalize;
}

\&Buspub::run;
2011年 02月 16日

JSDeferred を Jetpack (Add-on SDK) で使う

binding を書いた (まだブランチに入れてある)

  • setTimeout/clearTimeout がないので nsITimer で実装 (これでいいのか?)
  • Deferred.postie という utility を追加

Deferred.postie(constructor, options) を使って widget/panel をつくるとメソッドが増えます。

  • post(args..., function) //=> Deferred
    • function をレシーバの content context 内で args と共に呼び、結果を取得
  • bind(selector, event, function) //=> Deferred
    • content 内の selector でマッチする要素全てで、 event が発火時 function (chrome context) を呼びだす

以下のように書けます。post に渡す関数は文字列化されて content に渡されるので、スコープを外部と共有していない。args も JSON で表現できるものしか渡せない。この点わかりにくいのでイマイチかもしれない。

完全に message のやりとりを置き換えられるわけではないけど、ちょっとしたイベント処理とかは直列で簡単に書けるようになるので、便利な場合は便利かもしれない。

const Deferred = require("jsdeferred").Deferred;
Deferred.define(this); //define 使う場合必ず引数に this を渡す必用あり

widget = Deferred.postie(widgets.Widget, {
	label      : "Foo",
	contentURL : 'http://example.com/',
	width      : 32,
	onClick    : function () {
		var self = this;

		next(function () {
			return self.
				post(1, 2, function (a, b) { // content context 
					return a + b;
				}).
				next(function (res) { // chrome context
					console.log(res);
				});
		}).
		next(function () {
			return wait(1);
		}).
		next(function () {
			return self.
				post(function (a, b) {
					throw "foo";
				}).
				next(function (res) {
					console.log(res);
				}).
				error(function (e) {
					console.log(e);
				});
		});
	},
	onMessage : function (message) {
		console.log(message);
	},
	contentScript : 'setTimeout(function () { postMessage("custom message") }, 1000)',
});

widget.post(function () {
	var d = new Deferred();
	setTimeout(function () {
		d.call(1);
	}, 1000);
	return d;
}).
next(function (a) {
	console.log("Hello! " + a);
}).
error(function (e) {
	console.log(e);
});

widget.bind("body", "click", function (e) {
	console.log("body clicked" + e);
}).
error(function (e) {
	console.log(e);
});
2011年 02月 15日

jetpack は node.js と同じように、exports.foo = 以外にも this.foo = でもいけるようになっていて欲しかった……

Twitter でマジメな話ができないので、変な議論を呼びそうだなーとか、フローに流しても仕方ないこととか、ここに書いていこうという気持ちだけど、それに対して Twitter で返信されたらどうしたらいいのだろうなあ。追記するだけだと相手が気付かないし……

OGP、仕様的にフラグメントに対応できない問題があるし、そもそも hAtom っていう仕様が既にあるのでなんでわざわざ別のを定義したのかわからない

2011年 02月 13日

gerry++

2011年 02月 12日

JavaScript の哲学

  1. 小さいものは美しい
  2. 各スニペットが一つのことをうまくやるようにせよ
  3. できる限り最小限のプロトタイプを作れ
  4. コード量よりも移植のしやすさを選べ
  5. 単純なオブジェクトにデータを格納せよ
  6. 最小限のコードで最大限のことをすることに優位性を見出せ
  7. 効率と移植性を高めるためにコールバックを利用せよ
  8. 束縛するインターフェースは作るな
  9. 全てのスニペットは限りなく独立して動くようにせよ

Mike Gancarz の UNIX哲学の改変です

2011年 02月 11日

Chrome で XHR を abort() し続けるとそのうちリクエストが送信されなくなるのはどうにかならないのか

callback だけ無効にしてサーバサイドではちゃんとレスポンス返せってこと?

いまいち発生条件がつかめてないけど、ページロードしてからの累積回数で決まっているような気がしてならない。待ってても回復しない? というか abort() は自分でやってるんだから、余計なことしないでほしい。嫌なリクエストがくるならサーバ側ではじけばいいんだから……

以下のようなコードで再現する

上記の推測は間違っていて?、要は response body を待っている状態の XHR を abort() しても、Chrome はレスポンスを待ち続けるということらしい。HTTP ヘッダだけ先に送って body をストリームさせる場合にいろいろ困る。

#!/usr/bin/env corona
# vim:ft=perl:
use strict;
use warnings;
use Coro;
use Coro::Timer qw(sleep);

my $HTML = <<'EOF';
<!DOCTYPE html>
<html>
<title>test</title>

<h1>reproducing process</h1>
<ol>
	<li>Request to remote server
	<li>After requested, abort() it
	<li>Try requesting another xhr (but this waits until aborted xhr response)
</ol>

<textarea cols="140" rows="20" id="log"></textarea>

<script>
document.getElementById('log').value = "";
function log (m) {
	document.getElementById('log').value += m + "\n";
}

var seq = 1;
function req (time, callback) {
	log('req:'+seq++);
	var xhr = new XMLHttpRequest();
	xhr.open('GET', '/api/test?' + time, true);
	xhr.onreadystatechange = function () {
		log([ xhr.readyState, xhr.responseText ]);
		if (xhr.readyState == 4) callback();
	};
	xhr.send(null);
	return xhr;
}

function req_abort (time, callback) {
	var xhr = req(time, callback);
	// setup timeout to send request to remote surely.
	setTimeout(function () {
		log('abort');
		xhr.abort();
	}, 100);
	return req;
}

req(0, function () {
	req_abort(10, function () {
		req(1, function () { log('ok') }); // success after 10sec but should success after 1sec
	});
}); // success;


</script>
EOF

sub {
	my $env = shift;
	warn $env->{REQUEST_URI};
	my $sub = {
		'/' => sub {
			[ 200, [ 'Content-Type' => 'text/html' ], [ $HTML ] ];
		},
		'/api/test' => sub {
			sub {
				my $respond = shift;
				my $writer = $respond->([ 200, [ 'Content-Type' => 'application/json' ] ]);
				async {
					sleep $env->{QUERY_STRING};
					$writer->write('{ "ok": true }');
					$writer->close;
				};
			};
		},
	}->{$env->{PATH_INFO}};

	$sub ? $sub->() : [ 404, [ 'Content-Type' => 'text/plain' ], [ '404' ] ];
};

Chrome で 50x を数回受信すると throttle が有効になってリクエストを送信できなくなるが、どういった対策をうてばいいのか?

これは XHR にもあてはまってしまう。

マジでヤバい場合以外 50x を返さないということが必要になってしまう。バックエンドが応答できない場合特定のレスポンスを 200 で返したい、とかいうのは nginx でできるのか?

あるいはそもそも、1度 50x がでたらちゃんと待てばいいんだろうか?