(Java です)

例えばクローラみたいなのを書こうと思うと、

  1. リモートからダウンロードしてくるタスク
  2. ストレージに格納するタスク
  3. メタデータを更新するタスク

みたいなのが大まかにあって、リモートからダウンロードしてくるのは2並列で、ストレージに格納するのも2並列で (ダウンロード先とストア先は別のところなので別々に並列にしたい)、メタデータ更新するのはコリジョンさせたくないから1スレッドで、みたいなことをやりたくなることがある。そういうイメージでいきます。

CompletableFuture

Java8 から CompletableFuture というのが入って、これが JavaScript の Promise みたいなやつにあたるようだ。名前が長くてウザいが我慢するしかない。

Java には ExecutorService というスレッドプールを管理していい感じに実行してくれる仕組みがあり、Executors.newXXX() というのにいくつか実装がある。

これらを組合せるとマルチスレッドで同期処理を非同期して結果を集めて同期したりみたいなのが強力に書けそう。同期にしたいのか非同期にしたいのかどっちやねん感がでてくる。

いっぱいメソッドがあるが、とりあえず CompletableFuture.runAsync() と thenRunAsync() あたりでなんとかなりそう。

コード

実際こういう感じでできる。

並列で行われる部分は当然スレッドセーフでなければならないが、このコードの場合 metaExecutor はシングルスレッドなので metaExecutor 内では考えることが減る (downloadExecutor/storeExecutor/metaExecutor それぞれは並列で動くけど)

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class Main {

    public static void main(String[] args) {
        new Main().doMain(args);
    }

    private ExecutorService downloadExecutor;
    private ExecutorService storeExecutor;
    private ExecutorService metaExecutor;

    private void heavyProcess(String name) {
        System.out.println(String.format("%s -- START", name));
        try {
            Thread.currentThread().sleep((long) (Math.random() * 1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(String.format("%s -- END", name));
    }

    public void doMain(String[] args) {
        downloadExecutor = Executors.newFixedThreadPool(2, r -> new Thread(r, "download"));
        storeExecutor = Executors.newFixedThreadPool(2, r -> new Thread(r, "store"));
        metaExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "meta"));

        final List<String> urls = Arrays.asList(
                "http://example.com/1",
                "http://example.com/2",
                "http://example.com/3",
                "http://example.com/4",
                "http://example.com/5",
                "http://example.com/6",
                "http://example.com/7",
                "http://example.com/8",
                "http://example.com/9"
        );


        Thread.currentThread().setName("Main");
        System.out.println(String.format("[%s] START", Thread.currentThread().getName()));

        final List<CompletableFuture<Void>> futures = urls.stream().map(
                url -> CompletableFuture
                        .runAsync(() -> {
                            heavyProcess(String.format("[%s] downloading for %s", Thread.currentThread().getName(), url));
                        }, downloadExecutor)
                        .thenRunAsync(() -> {
                            heavyProcess(String.format("[%s] storing for %s", Thread.currentThread().getName(), url));
                        }, storeExecutor)
                        .thenRunAsync(() -> {
                            System.out.println(String.format("[%s] updating meta for %s", Thread.currentThread().getName(), url));
                        }, metaExecutor)
        ).collect(Collectors.toList());

        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenRun(() -> {
            System.out.println(String.format("[%s] FINISH", Thread.currentThread().getName()));
        });

        // .join() しなければ即座にこの関数は終わる.
        System.out.println(String.format("[%s] END OF MAIN", Thread.currentThread().getName()));
    }
}

これとは別に BlockingQueue を使ったり、各タスクが次のタスクを自力で次の Executor に投げるみたいな実装を書いてみたが、CompletableFuture のほうが簡潔に書けた。

出力

こんな感じで動く

[Main] START
[download] downloading for http://example.com/1 -- START
[download] downloading for http://example.com/2 -- START
[Main] END OF MAIN
[download] downloading for http://example.com/2 -- END
[download] downloading for http://example.com/3 -- START
[store] storing for http://example.com/2 -- START
[download] downloading for http://example.com/1 -- END
[download] downloading for http://example.com/4 -- START
[store] storing for http://example.com/1 -- START
[download] downloading for http://example.com/4 -- END
[download] downloading for http://example.com/5 -- START
[download] downloading for http://example.com/3 -- END
[download] downloading for http://example.com/6 -- START
[store] storing for http://example.com/2 -- END
[store] storing for http://example.com/4 -- START
[Main Executor] updating meta for http://example.com/2
[store] storing for http://example.com/1 -- END
[store] storing for http://example.com/3 -- START
[Main Executor] updating meta for http://example.com/1
[store] storing for http://example.com/3 -- END
[Main Executor] updating meta for http://example.com/3
[download] downloading for http://example.com/6 -- END
[download] downloading for http://example.com/7 -- START
[store] storing for http://example.com/6 -- START
[download] downloading for http://example.com/5 -- END
[download] downloading for http://example.com/8 -- START
[store] storing for http://example.com/6 -- END
[store] storing for http://example.com/5 -- START
[Main Executor] updating meta for http://example.com/6
[store] storing for http://example.com/4 -- END
[Main Executor] updating meta for http://example.com/4
[store] storing for http://example.com/5 -- END
[Main Executor] updating meta for http://example.com/5
[download] downloading for http://example.com/8 -- END
[download] downloading for http://example.com/9 -- START
[store] storing for http://example.com/8 -- START
[download] downloading for http://example.com/9 -- END
[store] storing for http://example.com/9 -- START
[download] downloading for http://example.com/7 -- END
[store] storing for http://example.com/8 -- END
[Main Executor] updating meta for http://example.com/8
[store] storing for http://example.com/7 -- START
[store] storing for http://example.com/9 -- END
[Main Executor] updating meta for http://example.com/9
[store] storing for http://example.com/7 -- END
[Main Executor] updating meta for http://example.com/7
[Main Executor] FINISH

ハマった点

知らない機能がいっぱいあるのでもっと簡潔に書ける気がします。

  1. トップ
  2. tech
  3. 数種類のタスクをタスクごとに別々の並列性ポリシー使いつつ、順次実行する
  1. トップ
  2. java
  3. 数種類のタスクをタスクごとに別々の並列性ポリシー使いつつ、順次実行する

土曜日の午後ぐらいから体調が悪くなった。日曜夜ぐらいから良くなって、朝には解熱した。

普段でも夢見が悪いが、熱がでるとさらにひどい夢を見る。なんとかしてほしい

もはや Promise がスタンダードに入り、モダンな実行環境ではポリフィルすら必要なく使えるケースが増えましたね。

かくいう自分も JSDeferred は使っておらず完全に Promise 依存に切替えております。外部ライブラリ依存なんてないほうがいい!!

JSDeferred と Promise の違い

機能的にはほぼ変わりがないので機械的に置き換えできますが、Promise は1度だけしか resolve できない点だけ違うので注意が必要。JSDeferred は値を保持しませんが、Promise は resolve した値を保持し、その後の then ではその値が返ってきます。

var resolver;
var promise = new Promise(function (resolve, reject) {
	resolver = resolve;
});

promise.then(function (r) {
	console.log(r); //=> foo
});

resolver('foo');

promise.then(function (r) {
	console.log(r); //=> foo
});

resolver('bar'); // nothing happened (invalid operation)

JSDeferred は遅延された (Deferred) な処理を表現していますが、Promise は未来の値に関する約束を表現している点で違いがでます (Promise は値なので継続(手続)のように扱うことはできない)

基本

JSDeferred() のグローバルな next() を引数なし Promise.resolve() に置き換えます。あとの next() は全部 then() に置き換えます。

next(function () {
    alert(1);
    return next(function () {
        alert(2);
    }).
    next(function () {
        alert(3);
    });
}).
next(function () {
    alert(4);
});

これを

Promise.resolve().then(function () {
    alert(1);
    return Promise.resolve().then(function () {
        alert(2);
    }).
    then(function () {
        alert(3);
    });
}).
then(function () {
    alert(4);
});

こうじゃ

parallel() は?

Promise.all(list) を使う

earlier() は?

Promise.race(list)

wait() は?

new Promise( (resolve) => {
    setTimeout(resolve, 100);
});
  1. トップ
  2. tech
  3. JSDeferred -> Promise 置き換え方法

ネットで文章書くときに一番重要なのは、ゴミみたいなことを言ってくる人をできるだけ避けること。

ネットでは常に書き手が最初から不利な状態にある。だから中立よりも負よりに考えて書かなければ、不要な謗りをうける。

「読み飛ばすことができる人」

読み飛ばしても良いと本人が思った場合その通り読み飛ばす、ないしは読み飛ばさずに内容を理解しようと努めてくれる。なので、そもそもこういう人向けには配慮の必要がない。

「読み飛ばすことができない人」

読み飛ばすということができず、ただ書いてあること全てを飲みこもうとする。こういう人に対して「ここは読む価値はありませんよ」とか「本題とは直接関係ありませんよ」ということを書く必要がある。そうしないと、こういう人は飲み込めなかった文をそのまま書き手に投げつけてくる。