メッセージキュー、何を使えばいいのかわからないお……
Q4M って DBI 使うとブロックしちゃうけど、非同期にするにはどうしたらいいんだろう。AnyEvent::Worker|AnyEvent::DBI (=fork) 使うのがいいのかな。スタンダードな方法がわからない
LDR Streaming API では自力で MQ 作ったというのを見たけど、これ使っとけ的なのはないのかなあ……
Memo
Q4M って DBI 使うとブロックしちゃうけど、非同期にするにはどうしたらいいんだろう。AnyEvent::Worker|AnyEvent::DBI (=fork) 使うのがいいのかな。スタンダードな方法がわからない
LDR Streaming API では自力で MQ 作ったというのを見たけど、これ使っとけ的なのはないのかなあ……
Memo
かなりざっくり書くと以下のような感じになる気がするけど、こういう HTTP ベースの内部用途の pub/sub サーバって既に良いのはあるのかな……
package Buspub;
use strict;
use warnings;
sub route ($$);
use Router::Simple;
use Tie::IxHash;
use AnyEvent::HTTP;
use HTTP::Request::Common;
use JSON::XS;
use POSIX ();
use MongoDB;
my $connection = MongoDB::Connection->new(host => 'localhost', port => 27017);
my $subscriptions = $connection->get_database('pubsub')->subscriptions;
$subscriptions->ensure_index(Tie::IxHash->new( key => 1, created => -1 ), { background => 1 });
route '/sub' => {
action => sub {
my ($r) = @_;
my $key = $r->req->param('key') or return $r->json({ error => "key required"});;
my $callback = $r->req->param('callback') or return $r->json({ error => "callback required"});;
my $id = $subscriptions->insert({
key => $key,
messages => [],
callback => $callback,
max => 25,
created => scalar time(),
});
LOG("SUB:: %s with callback: %s", $key, $callback);
$r->json({ id => "$id" });
}
};
route '/pub' => {
action => sub {
my ($r) = @_;
my $key = $r->req->param('key') or return $r->json({ error => "key required"});;
my $message = decode_json $r->req->param('message');
my $cursor = $subscriptions->query({ key => $key });
my %count;
while (my $obj = $cursor->next) {
$count{all}++;
my $id = $obj->{_id} . "";
my $uri = $obj->{callback};
my $messages = [ @{$obj->{messages}}, $message ];
my $req = POST $uri, [ id => $id, messages => encode_json($messages) ];
LOG("PUB:: => %s => %s with %d messages", $key, $req->uri, scalar @$messages);
http_request $req->method => $req->uri,
body => $req->content,
headers => {
map { $_ => $req->header($_), } $req->headers->header_field_names
},
timeout => 20,
sub {
my ($body, $headers) = @_;
LOG("PUB:: <= %s <= %s with status:%d", $key, $req->uri, $headers->{Status});
if ($headers->{Status} =~ /^2/) {
$subscriptions->update({ _id => $obj->{_id} }, { '$pullAll' => { messages => $obj->{messages} } });
} elsif ($headers->{Status} =~ /^4/) {
$subscriptions->remove({ _id => $obj->{_id} });
} elsif ($headers->{Status} =~ /^5/) {
if (@$messages > $obj->{max}) {
$subscriptions->remove({ _id => $obj->{_id} });
} else {
$subscriptions->update({ _id => $obj->{_id} }, { '$push' => { messages => $message } });
}
}
}
;
}
$r->json({ key => $key, delivered => \%count });
}
};
route '/test/callback' => {
action => sub {
my ($r) = @_;
use Data::Dumper;
warn Dumper $r->req->param('id') ;
warn Dumper decode_json $r->req->param('messages') ;
$r->res->status($r->req->param('code') || 404);
$r->res->content_type('application/json; charset=utf8');
$r->res->content('{}');
$r->res->finalize;
}
};
BEGIN {
my $router = Router::Simple->new;
sub route ($$) { $router->connect(@_) };
sub run {
my ($env) = @_;
if ( my $handler = $router->match($env) ) {
my $c = Buspub::Context->new($env);
$handler->{action}->($c);
} else {
[ 404, [ 'Content-Type' => 'text/html' ], ['Not Found'] ];
}
}
sub LOG {
my ($message, @args) = @_;
print sprintf("[%s] $message", POSIX::strftime("%Y-%m-%d %H:%M:%S", localtime), @args), "\n";
}
};
package Buspub::Request;
use parent qw(Plack::Request);
package Buspub::Response;
use parent qw(Plack::Response);
package Buspub::Context;
use JSON::XS;
sub new {
my ($class, $env) = @_;
bless {
req => Buspub::Request->new($env),
res => Buspub::Response->new(200),
}, $class;
}
sub req { $_[0]->{req} }
sub res { $_[0]->{res} }
sub json {
my ($self, $vars) = @_;
my $body = JSON::XS->new->ascii(1)->encode($vars);
$self->res->content_type('application/json; charset=utf8');
$self->res->content($body);
$self->res->finalize;
}
\&Buspub::run;
binding を書いた (まだブランチに入れてある)
Deferred.postie(constructor, options) を使って widget/panel をつくるとメソッドが増えます。
以下のように書けます。post に渡す関数は文字列化されて content に渡されるので、スコープを外部と共有していない。args も JSON で表現できるものしか渡せない。この点わかりにくいのでイマイチかもしれない。
完全に message のやりとりを置き換えられるわけではないけど、ちょっとしたイベント処理とかは直列で簡単に書けるようになるので、便利な場合は便利かもしれない。
const Deferred = require("jsdeferred").Deferred;
Deferred.define(this); //define 使う場合必ず引数に this を渡す必用あり
widget = Deferred.postie(widgets.Widget, {
label : "Foo",
contentURL : 'http://example.com/',
width : 32,
onClick : function () {
var self = this;
next(function () {
return self.
post(1, 2, function (a, b) { // content context
return a + b;
}).
next(function (res) { // chrome context
console.log(res);
});
}).
next(function () {
return wait(1);
}).
next(function () {
return self.
post(function (a, b) {
throw "foo";
}).
next(function (res) {
console.log(res);
}).
error(function (e) {
console.log(e);
});
});
},
onMessage : function (message) {
console.log(message);
},
contentScript : 'setTimeout(function () { postMessage("custom message") }, 1000)',
});
widget.post(function () {
var d = new Deferred();
setTimeout(function () {
d.call(1);
}, 1000);
return d;
}).
next(function (a) {
console.log("Hello! " + a);
}).
error(function (e) {
console.log(e);
});
widget.bind("body", "click", function (e) {
console.log("body clicked" + e);
}).
error(function (e) {
console.log(e);
});
http://twitter.com/piro_or/status/37337771559489536:twitter:detail
http://twitter.com/cho45/status/37354831702134785:twitter:detail
http://twitter.com/piro_or/status/37355297706221569:twitter:detail
http://twitter.com/cho45/status/37355553843838976:twitter:detail
http://twitter.com/piro_or/status/37357301442682880:twitter:detail
http://twitter.com/cho45/status/37357988255633408:twitter:detail
var exports = { foo : ... }; と書けという話らしいです?
Twitter でマジメな話ができないので、変な議論を呼びそうだなーとか、フローに流しても仕方ないこととか、ここに書いていこうという気持ちだけど、それに対して Twitter で返信されたらどうしたらいいのだろうなあ。追記するだけだと相手が気付かないし……
callback だけ無効にしてサーバサイドではちゃんとレスポンス返せってこと?
いまいち発生条件がつかめてないけど、ページロードしてからの累積回数で決まっているような気がしてならない。待ってても回復しない? というか abort() は自分でやってるんだから、余計なことしないでほしい。嫌なリクエストがくるならサーバ側ではじけばいいんだから……
以下のようなコードで再現する
上記の推測は間違っていて?、要は response body を待っている状態の XHR を abort() しても、Chrome はレスポンスを待ち続けるということらしい。HTTP ヘッダだけ先に送って body をストリームさせる場合にいろいろ困る。
#!/usr/bin/env corona
# vim:ft=perl:
use strict;
use warnings;
use Coro;
use Coro::Timer qw(sleep);
my $HTML = <<'EOF';
<!DOCTYPE html>
<html>
<title>test</title>
<h1>reproducing process</h1>
<ol>
<li>Request to remote server
<li>After requested, abort() it
<li>Try requesting another xhr (but this waits until aborted xhr response)
</ol>
<textarea cols="140" rows="20" id="log"></textarea>
<script>
document.getElementById('log').value = "";
function log (m) {
document.getElementById('log').value += m + "\n";
}
var seq = 1;
function req (time, callback) {
log('req:'+seq++);
var xhr = new XMLHttpRequest();
xhr.open('GET', '/api/test?' + time, true);
xhr.onreadystatechange = function () {
log([ xhr.readyState, xhr.responseText ]);
if (xhr.readyState == 4) callback();
};
xhr.send(null);
return xhr;
}
function req_abort (time, callback) {
var xhr = req(time, callback);
// setup timeout to send request to remote surely.
setTimeout(function () {
log('abort');
xhr.abort();
}, 100);
return req;
}
req(0, function () {
req_abort(10, function () {
req(1, function () { log('ok') }); // success after 10sec but should success after 1sec
});
}); // success;
</script>
EOF
sub {
my $env = shift;
warn $env->{REQUEST_URI};
my $sub = {
'/' => sub {
[ 200, [ 'Content-Type' => 'text/html' ], [ $HTML ] ];
},
'/api/test' => sub {
sub {
my $respond = shift;
my $writer = $respond->([ 200, [ 'Content-Type' => 'application/json' ] ]);
async {
sleep $env->{QUERY_STRING};
$writer->write('{ "ok": true }');
$writer->close;
};
};
},
}->{$env->{PATH_INFO}};
$sub ? $sub->() : [ 404, [ 'Content-Type' => 'text/plain' ], [ '404' ] ];
};
これは XHR にもあてはまってしまう。
マジでヤバい場合以外 50x を返さないということが必要になってしまう。バックエンドが応答できない場合特定のレスポンスを 200 で返したい、とかいうのは nginx でできるのか?
あるいはそもそも、1度 50x がでたらちゃんと待てばいいんだろうか?