Category java.

前のエントリでいい感じで書けそうみたいなことを書いたが、例外処理について言及していなかった。

CompletableFuture は Promise 同様エラー処理用にcompleteExceptionally(Throwable ex) というメソッドを持っている。これを呼ぶと後続の exceptionally() にエラー処理が移る (そしてリカバリできる) 。

では普通の、時間のかかり、なおかつ検査例外を出す同期メソッドを CompletableFuture として実行する場合例外はどうすればいいだろうか?

runAsync などの中では CompletableFuture インスタンスは見えないので、catch して明示的に completeExceptionally() することができない。

いくつか方法がありそうだけど、よくわかってない

CompletableFuture を明示的に扱うラッパメソッドを作る

package com.company;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
public class Main {
public static void main(String[] args) {
new Main().doMain(args);
}
private static <T> CompletableFuture<T> async(Consumer<CompletableFuture<T>> function) {
final CompletableFuture<T> future = new CompletableFuture<>();
function.accept(future);
return future;
}
public void doMain(String[] args) {
final CompletableFuture<Void> future1 = async(f -> {
try {
f.complete("Foobar");
} catch (Exception e) {
f.completeExceptionally(e);
}
}).thenAccept(x -> {
System.out.println(x);
});
final CompletableFuture<Void> future2 = async(f -> {
try {
throw new RuntimeException("exception");
} catch (Exception e) {
f.completeExceptionally(e);
}
}).exceptionally(ex -> {
System.out.println("exception!!");
return ex;
}).thenAccept(x -> {
System.out.println(x);
});
CompletableFuture.allOf(future1, future2).thenRun(() -> {
System.out.println("All done");
});
}
}
view raw Main.java hosted with ❤ by GitHub

こういう感じで、JS でいうところの new Promise( (resolve, reject) => {}) に相当するメソッドを作って、明示的に同期コードでの終了時/例外時の処理を行う。

JS の流れだとわかりやすいけど、ラッパメソッドとかいちいち書きたくはないし、せっかくマルチスレッドによって同期的に書ける処理を非同期っぽくなおすのはダサい。

全部 RuntimeException にしてしまう

runAsync 内で全例外をキャッチして非検査例外にしてしまう。

この場合投げられた例外は java.util.concurrent.CompletionException (これも非検査例外) にさらにラップされて exceptionally に渡される。

全例外 catch して completeExceptionally するのも実質非検査への変換なのでこれでもよさそう。実質はラッパメソッドでキャッチして completeExceptionally するのと変わりない。

その他

検査例外をうまく生かしつつ処理するというのが難しそう。各ステージで検査例外は全て処理するというのが想定されているのだろうか? とりあえず全部 catch して uncheck にするというのをやっていると、当然検査例外をまともに処理しようという気にはならなくなる。

検索例外を投げるメソッドは、直接メソッド参照のスタイルで runAsync(this::FooBar) 等と渡せない。

  1. トップ
  2. tech
  3. CompletableFuture と例外
  1. トップ
  2. java
  3. CompletableFuture と例外

(Java です)

例えばクローラみたいなのを書こうと思うと、

  1. リモートからダウンロードしてくるタスク
  2. ストレージに格納するタスク
  3. メタデータを更新するタスク

みたいなのが大まかにあって、リモートからダウンロードしてくるのは2並列で、ストレージに格納するのも2並列で (ダウンロード先とストア先は別のところなので別々に並列にしたい)、メタデータ更新するのはコリジョンさせたくないから1スレッドで、みたいなことをやりたくなることがある。そういうイメージでいきます。

CompletableFuture

Java8 から CompletableFuture というのが入って、これが JavaScript の Promise みたいなやつにあたるようだ。名前が長くてウザいが我慢するしかない。

Java には ExecutorService というスレッドプールを管理していい感じに実行してくれる仕組みがあり、Executors.newXXX() というのにいくつか実装がある。

これらを組合せるとマルチスレッドで同期処理を非同期して結果を集めて同期したりみたいなのが強力に書けそう。同期にしたいのか非同期にしたいのかどっちやねん感がでてくる。

いっぱいメソッドがあるが、とりあえず CompletableFuture.runAsync() と thenRunAsync() あたりでなんとかなりそう。

コード

実際こういう感じでできる。

並列で行われる部分は当然スレッドセーフでなければならないが、このコードの場合 metaExecutor はシングルスレッドなので metaExecutor 内では考えることが減る (downloadExecutor/storeExecutor/metaExecutor それぞれは並列で動くけど)

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class Main {

    public static void main(String[] args) {
        new Main().doMain(args);
    }

    private ExecutorService downloadExecutor;
    private ExecutorService storeExecutor;
    private ExecutorService metaExecutor;

    private void heavyProcess(String name) {
        System.out.println(String.format("%s -- START", name));
        try {
            Thread.currentThread().sleep((long) (Math.random() * 1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(String.format("%s -- END", name));
    }

    public void doMain(String[] args) {
        downloadExecutor = Executors.newFixedThreadPool(2, r -> new Thread(r, "download"));
        storeExecutor = Executors.newFixedThreadPool(2, r -> new Thread(r, "store"));
        metaExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "meta"));

        final List<String> urls = Arrays.asList(
                "http://example.com/1",
                "http://example.com/2",
                "http://example.com/3",
                "http://example.com/4",
                "http://example.com/5",
                "http://example.com/6",
                "http://example.com/7",
                "http://example.com/8",
                "http://example.com/9"
        );


        Thread.currentThread().setName("Main");
        System.out.println(String.format("[%s] START", Thread.currentThread().getName()));

        final List<CompletableFuture<Void>> futures = urls.stream().map(
                url -> CompletableFuture
                        .runAsync(() -> {
                            heavyProcess(String.format("[%s] downloading for %s", Thread.currentThread().getName(), url));
                        }, downloadExecutor)
                        .thenRunAsync(() -> {
                            heavyProcess(String.format("[%s] storing for %s", Thread.currentThread().getName(), url));
                        }, storeExecutor)
                        .thenRunAsync(() -> {
                            System.out.println(String.format("[%s] updating meta for %s", Thread.currentThread().getName(), url));
                        }, metaExecutor)
        ).collect(Collectors.toList());

        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenRun(() -> {
            System.out.println(String.format("[%s] FINISH", Thread.currentThread().getName()));
        });

        // .join() しなければ即座にこの関数は終わる.
        System.out.println(String.format("[%s] END OF MAIN", Thread.currentThread().getName()));
    }
}

これとは別に BlockingQueue を使ったり、各タスクが次のタスクを自力で次の Executor に投げるみたいな実装を書いてみたが、CompletableFuture のほうが簡潔に書けた。

出力

こんな感じで動く

[Main] START
[download] downloading for http://example.com/1 -- START
[download] downloading for http://example.com/2 -- START
[Main] END OF MAIN
[download] downloading for http://example.com/2 -- END
[download] downloading for http://example.com/3 -- START
[store] storing for http://example.com/2 -- START
[download] downloading for http://example.com/1 -- END
[download] downloading for http://example.com/4 -- START
[store] storing for http://example.com/1 -- START
[download] downloading for http://example.com/4 -- END
[download] downloading for http://example.com/5 -- START
[download] downloading for http://example.com/3 -- END
[download] downloading for http://example.com/6 -- START
[store] storing for http://example.com/2 -- END
[store] storing for http://example.com/4 -- START
[Main Executor] updating meta for http://example.com/2
[store] storing for http://example.com/1 -- END
[store] storing for http://example.com/3 -- START
[Main Executor] updating meta for http://example.com/1
[store] storing for http://example.com/3 -- END
[Main Executor] updating meta for http://example.com/3
[download] downloading for http://example.com/6 -- END
[download] downloading for http://example.com/7 -- START
[store] storing for http://example.com/6 -- START
[download] downloading for http://example.com/5 -- END
[download] downloading for http://example.com/8 -- START
[store] storing for http://example.com/6 -- END
[store] storing for http://example.com/5 -- START
[Main Executor] updating meta for http://example.com/6
[store] storing for http://example.com/4 -- END
[Main Executor] updating meta for http://example.com/4
[store] storing for http://example.com/5 -- END
[Main Executor] updating meta for http://example.com/5
[download] downloading for http://example.com/8 -- END
[download] downloading for http://example.com/9 -- START
[store] storing for http://example.com/8 -- START
[download] downloading for http://example.com/9 -- END
[store] storing for http://example.com/9 -- START
[download] downloading for http://example.com/7 -- END
[store] storing for http://example.com/8 -- END
[Main Executor] updating meta for http://example.com/8
[store] storing for http://example.com/7 -- START
[store] storing for http://example.com/9 -- END
[Main Executor] updating meta for http://example.com/9
[store] storing for http://example.com/7 -- END
[Main Executor] updating meta for http://example.com/7
[Main Executor] FINISH

ハマった点

知らない機能がいっぱいあるのでもっと簡潔に書ける気がします。

  1. トップ
  2. tech
  3. 数種類のタスクをタスクごとに別々の並列性ポリシー使いつつ、順次実行する
  1. トップ
  2. java
  3. 数種類のタスクをタスクごとに別々の並列性ポリシー使いつつ、順次実行する