(Java です)

例えばクローラみたいなのを書こうと思うと、

  1. リモートからダウンロードしてくるタスク
  2. ストレージに格納するタスク
  3. メタデータを更新するタスク

みたいなのが大まかにあって、リモートからダウンロードしてくるのは2並列で、ストレージに格納するのも2並列で (ダウンロード先とストア先は別のところなので別々に並列にしたい)、メタデータ更新するのはコリジョンさせたくないから1スレッドで、みたいなことをやりたくなることがある。そういうイメージでいきます。

CompletableFuture

Java8 から CompletableFuture というのが入って、これが JavaScript の Promise みたいなやつにあたるようだ。名前が長くてウザいが我慢するしかない。

Java には ExecutorService というスレッドプールを管理していい感じに実行してくれる仕組みがあり、Executors.newXXX() というのにいくつか実装がある。

これらを組合せるとマルチスレッドで同期処理を非同期して結果を集めて同期したりみたいなのが強力に書けそう。同期にしたいのか非同期にしたいのかどっちやねん感がでてくる。

いっぱいメソッドがあるが、とりあえず CompletableFuture.runAsync() と thenRunAsync() あたりでなんとかなりそう。

コード

実際こういう感じでできる。

並列で行われる部分は当然スレッドセーフでなければならないが、このコードの場合 metaExecutor はシングルスレッドなので metaExecutor 内では考えることが減る (downloadExecutor/storeExecutor/metaExecutor それぞれは並列で動くけど)

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class Main {

    public static void main(String[] args) {
        new Main().doMain(args);
    }

    private ExecutorService downloadExecutor;
    private ExecutorService storeExecutor;
    private ExecutorService metaExecutor;

    private void heavyProcess(String name) {
        System.out.println(String.format("%s -- START", name));
        try {
            Thread.currentThread().sleep((long) (Math.random() * 1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(String.format("%s -- END", name));
    }

    public void doMain(String[] args) {
        downloadExecutor = Executors.newFixedThreadPool(2, r -> new Thread(r, "download"));
        storeExecutor = Executors.newFixedThreadPool(2, r -> new Thread(r, "store"));
        metaExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "meta"));

        final List<String> urls = Arrays.asList(
                "http://example.com/1",
                "http://example.com/2",
                "http://example.com/3",
                "http://example.com/4",
                "http://example.com/5",
                "http://example.com/6",
                "http://example.com/7",
                "http://example.com/8",
                "http://example.com/9"
        );


        Thread.currentThread().setName("Main");
        System.out.println(String.format("[%s] START", Thread.currentThread().getName()));

        final List<CompletableFuture<Void>> futures = urls.stream().map(
                url -> CompletableFuture
                        .runAsync(() -> {
                            heavyProcess(String.format("[%s] downloading for %s", Thread.currentThread().getName(), url));
                        }, downloadExecutor)
                        .thenRunAsync(() -> {
                            heavyProcess(String.format("[%s] storing for %s", Thread.currentThread().getName(), url));
                        }, storeExecutor)
                        .thenRunAsync(() -> {
                            System.out.println(String.format("[%s] updating meta for %s", Thread.currentThread().getName(), url));
                        }, metaExecutor)
        ).collect(Collectors.toList());

        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenRun(() -> {
            System.out.println(String.format("[%s] FINISH", Thread.currentThread().getName()));
        });

        // .join() しなければ即座にこの関数は終わる.
        System.out.println(String.format("[%s] END OF MAIN", Thread.currentThread().getName()));
    }
}

これとは別に BlockingQueue を使ったり、各タスクが次のタスクを自力で次の Executor に投げるみたいな実装を書いてみたが、CompletableFuture のほうが簡潔に書けた。

出力

こんな感じで動く

[Main] START
[download] downloading for http://example.com/1 -- START
[download] downloading for http://example.com/2 -- START
[Main] END OF MAIN
[download] downloading for http://example.com/2 -- END
[download] downloading for http://example.com/3 -- START
[store] storing for http://example.com/2 -- START
[download] downloading for http://example.com/1 -- END
[download] downloading for http://example.com/4 -- START
[store] storing for http://example.com/1 -- START
[download] downloading for http://example.com/4 -- END
[download] downloading for http://example.com/5 -- START
[download] downloading for http://example.com/3 -- END
[download] downloading for http://example.com/6 -- START
[store] storing for http://example.com/2 -- END
[store] storing for http://example.com/4 -- START
[Main Executor] updating meta for http://example.com/2
[store] storing for http://example.com/1 -- END
[store] storing for http://example.com/3 -- START
[Main Executor] updating meta for http://example.com/1
[store] storing for http://example.com/3 -- END
[Main Executor] updating meta for http://example.com/3
[download] downloading for http://example.com/6 -- END
[download] downloading for http://example.com/7 -- START
[store] storing for http://example.com/6 -- START
[download] downloading for http://example.com/5 -- END
[download] downloading for http://example.com/8 -- START
[store] storing for http://example.com/6 -- END
[store] storing for http://example.com/5 -- START
[Main Executor] updating meta for http://example.com/6
[store] storing for http://example.com/4 -- END
[Main Executor] updating meta for http://example.com/4
[store] storing for http://example.com/5 -- END
[Main Executor] updating meta for http://example.com/5
[download] downloading for http://example.com/8 -- END
[download] downloading for http://example.com/9 -- START
[store] storing for http://example.com/8 -- START
[download] downloading for http://example.com/9 -- END
[store] storing for http://example.com/9 -- START
[download] downloading for http://example.com/7 -- END
[store] storing for http://example.com/8 -- END
[Main Executor] updating meta for http://example.com/8
[store] storing for http://example.com/7 -- START
[store] storing for http://example.com/9 -- END
[Main Executor] updating meta for http://example.com/9
[store] storing for http://example.com/7 -- END
[Main Executor] updating meta for http://example.com/7
[Main Executor] FINISH

ハマった点

知らない機能がいっぱいあるのでもっと簡潔に書ける気がします。

  1. トップ
  2. tech
  3. 数種類のタスクをタスクごとに別々の並列性ポリシー使いつつ、順次実行する
  1. トップ
  2. java
  3. 数種類のタスクをタスクごとに別々の並列性ポリシー使いつつ、順次実行する