前のエントリでいい感じで書けそうみたいなことを書いたが、例外処理について言及していなかった。

CompletableFuture は Promise 同様エラー処理用にcompleteExceptionally(Throwable ex) というメソッドを持っている。これを呼ぶと後続の exceptionally() にエラー処理が移る (そしてリカバリできる) 。

では普通の、時間のかかり、なおかつ検査例外を出す同期メソッドを CompletableFuture として実行する場合例外はどうすればいいだろうか?

runAsync などの中では CompletableFuture インスタンスは見えないので、catch して明示的に completeExceptionally() することができない。

いくつか方法がありそうだけど、よくわかってない

CompletableFuture を明示的に扱うラッパメソッドを作る

package com.company;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
public class Main {
public static void main(String[] args) {
new Main().doMain(args);
}
private static <T> CompletableFuture<T> async(Consumer<CompletableFuture<T>> function) {
final CompletableFuture<T> future = new CompletableFuture<>();
function.accept(future);
return future;
}
public void doMain(String[] args) {
final CompletableFuture<Void> future1 = async(f -> {
try {
f.complete("Foobar");
} catch (Exception e) {
f.completeExceptionally(e);
}
}).thenAccept(x -> {
System.out.println(x);
});
final CompletableFuture<Void> future2 = async(f -> {
try {
throw new RuntimeException("exception");
} catch (Exception e) {
f.completeExceptionally(e);
}
}).exceptionally(ex -> {
System.out.println("exception!!");
return ex;
}).thenAccept(x -> {
System.out.println(x);
});
CompletableFuture.allOf(future1, future2).thenRun(() -> {
System.out.println("All done");
});
}
}
view raw Main.java hosted with ❤ by GitHub

こういう感じで、JS でいうところの new Promise( (resolve, reject) => {}) に相当するメソッドを作って、明示的に同期コードでの終了時/例外時の処理を行う。

JS の流れだとわかりやすいけど、ラッパメソッドとかいちいち書きたくはないし、せっかくマルチスレッドによって同期的に書ける処理を非同期っぽくなおすのはダサい。

全部 RuntimeException にしてしまう

runAsync 内で全例外をキャッチして非検査例外にしてしまう。

この場合投げられた例外は java.util.concurrent.CompletionException (これも非検査例外) にさらにラップされて exceptionally に渡される。

全例外 catch して completeExceptionally するのも実質非検査への変換なのでこれでもよさそう。実質はラッパメソッドでキャッチして completeExceptionally するのと変わりない。

その他

検査例外をうまく生かしつつ処理するというのが難しそう。各ステージで検査例外は全て処理するというのが想定されているのだろうか? とりあえず全部 catch して uncheck にするというのをやっていると、当然検査例外をまともに処理しようという気にはならなくなる。

検索例外を投げるメソッドは、直接メソッド参照のスタイルで runAsync(this::FooBar) 等と渡せない。

  1. トップ
  2. tech
  3. CompletableFuture と例外
  1. トップ
  2. java
  3. CompletableFuture と例外

(実質的な) 非接続状態のことをハイインピーダンス (Hi-Z) 状態 (Tri-state) と言う。久しぶりに触ると、どうすれば Hi-Z になるんだっけ??となるのでメモしておく

Arduino 風にいうと、

  • DDxn は pinMode(XX, INPUT | OUTPUT);
  • PORTxn は digitalWrite(XX, LOW | HIGH);
  • PUD (in MCUCR) はプルアップの有効化を決めるフラグだけど普通は有効にしとくので気にしない (Pull-up Disable フラグ、1でdisableされる)

プルアップ有効のケースだと、pinMode が INPUT かつ digitalWrite で LOW したピンはハイインピーダンス

  1. トップ
  2. tech
  3. Arduino / AVR でコード上からピンを非接続状態とする

続きをかきました [tech] Quick Charge 2.0 電源から 9V をとる (任意の電圧をとる) | Mon, Dec 14. 2015 - 氾濫原

仕様はちょっと前に調べて、先日対応バッテリーがきたので試してみました。

[Qualcomm認証済み]Aukey モバイルバッテリー 大容量 10400mAh スマホ充電器 [Quick Charge 2.0対応] 急速充電可能 (シルバー)PB-T1 -

3.0 / 5.0

回路図

12V を出す場合必要なのは D+ 0.6V / D- 0.6V なので、非常に簡単な構成。

VBUS には QC時に 5V〜12V の電圧がかかる。直接 Arduino の VIN に繋いでいるが、Arduino 側で 5V レギュレータが入っているので問題ない (ref. https://www.arduino.cc/en/uploads/Main/ArduinoNano30Schematic.pdf )。当然 5V ピンに直接繋いではならない (燃える)

コード

#include <Arduino.h>
#include <avr/sleep.h>

const int D_PLUS  = 11;
const int D_MINUS = 12;

/**
 * Arduino nano has 5V regulated power, so required voltages are generated with:
 * 3.3V: 4.7kΩ / 9.1kΩ
 * 0.6V: 1.1kΩ / 150Ω
 *
 */

void setup() {
	// wait until stable connection
	delay(1000);

	// reset line
	pinMode(D_PLUS, OUTPUT);
	pinMode(D_MINUS, OUTPUT);
	digitalWrite(D_PLUS, LOW);
	digitalWrite(D_MINUS, LOW);
	delay(100);

	// D+ and D- to 0.6V for 1.25s
	digitalWrite(D_PLUS, HIGH);
	digitalWrite(D_MINUS, HIGH);
	delay(1500);

	// D- to 0V for 1ms
	digitalWrite(D_MINUS, LOW);
	delay(2);

	// Set voltage
	digitalWrite(D_MINUS, HIGH);

	set_sleep_mode(SLEEP_MODE_PWR_DOWN);
	sleep_mode();
}

void loop() {
}

結果

12V とれたよ〜 (赤が VBUS、黄は D-)

備考

9V に対応する場合 D+ を途中から 0.6V -> 3.3V とする必要があるので、もう少し回路とコードが必要。

  1. トップ
  2. tech
  3. Quick Charge 2.0 電源から 12V とる

(Java です)

例えばクローラみたいなのを書こうと思うと、

  1. リモートからダウンロードしてくるタスク
  2. ストレージに格納するタスク
  3. メタデータを更新するタスク

みたいなのが大まかにあって、リモートからダウンロードしてくるのは2並列で、ストレージに格納するのも2並列で (ダウンロード先とストア先は別のところなので別々に並列にしたい)、メタデータ更新するのはコリジョンさせたくないから1スレッドで、みたいなことをやりたくなることがある。そういうイメージでいきます。

CompletableFuture

Java8 から CompletableFuture というのが入って、これが JavaScript の Promise みたいなやつにあたるようだ。名前が長くてウザいが我慢するしかない。

Java には ExecutorService というスレッドプールを管理していい感じに実行してくれる仕組みがあり、Executors.newXXX() というのにいくつか実装がある。

これらを組合せるとマルチスレッドで同期処理を非同期して結果を集めて同期したりみたいなのが強力に書けそう。同期にしたいのか非同期にしたいのかどっちやねん感がでてくる。

いっぱいメソッドがあるが、とりあえず CompletableFuture.runAsync() と thenRunAsync() あたりでなんとかなりそう。

コード

実際こういう感じでできる。

並列で行われる部分は当然スレッドセーフでなければならないが、このコードの場合 metaExecutor はシングルスレッドなので metaExecutor 内では考えることが減る (downloadExecutor/storeExecutor/metaExecutor それぞれは並列で動くけど)

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class Main {

    public static void main(String[] args) {
        new Main().doMain(args);
    }

    private ExecutorService downloadExecutor;
    private ExecutorService storeExecutor;
    private ExecutorService metaExecutor;

    private void heavyProcess(String name) {
        System.out.println(String.format("%s -- START", name));
        try {
            Thread.currentThread().sleep((long) (Math.random() * 1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(String.format("%s -- END", name));
    }

    public void doMain(String[] args) {
        downloadExecutor = Executors.newFixedThreadPool(2, r -> new Thread(r, "download"));
        storeExecutor = Executors.newFixedThreadPool(2, r -> new Thread(r, "store"));
        metaExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "meta"));

        final List<String> urls = Arrays.asList(
                "http://example.com/1",
                "http://example.com/2",
                "http://example.com/3",
                "http://example.com/4",
                "http://example.com/5",
                "http://example.com/6",
                "http://example.com/7",
                "http://example.com/8",
                "http://example.com/9"
        );


        Thread.currentThread().setName("Main");
        System.out.println(String.format("[%s] START", Thread.currentThread().getName()));

        final List<CompletableFuture<Void>> futures = urls.stream().map(
                url -> CompletableFuture
                        .runAsync(() -> {
                            heavyProcess(String.format("[%s] downloading for %s", Thread.currentThread().getName(), url));
                        }, downloadExecutor)
                        .thenRunAsync(() -> {
                            heavyProcess(String.format("[%s] storing for %s", Thread.currentThread().getName(), url));
                        }, storeExecutor)
                        .thenRunAsync(() -> {
                            System.out.println(String.format("[%s] updating meta for %s", Thread.currentThread().getName(), url));
                        }, metaExecutor)
        ).collect(Collectors.toList());

        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenRun(() -> {
            System.out.println(String.format("[%s] FINISH", Thread.currentThread().getName()));
        });

        // .join() しなければ即座にこの関数は終わる.
        System.out.println(String.format("[%s] END OF MAIN", Thread.currentThread().getName()));
    }
}

これとは別に BlockingQueue を使ったり、各タスクが次のタスクを自力で次の Executor に投げるみたいな実装を書いてみたが、CompletableFuture のほうが簡潔に書けた。

出力

こんな感じで動く

[Main] START
[download] downloading for http://example.com/1 -- START
[download] downloading for http://example.com/2 -- START
[Main] END OF MAIN
[download] downloading for http://example.com/2 -- END
[download] downloading for http://example.com/3 -- START
[store] storing for http://example.com/2 -- START
[download] downloading for http://example.com/1 -- END
[download] downloading for http://example.com/4 -- START
[store] storing for http://example.com/1 -- START
[download] downloading for http://example.com/4 -- END
[download] downloading for http://example.com/5 -- START
[download] downloading for http://example.com/3 -- END
[download] downloading for http://example.com/6 -- START
[store] storing for http://example.com/2 -- END
[store] storing for http://example.com/4 -- START
[Main Executor] updating meta for http://example.com/2
[store] storing for http://example.com/1 -- END
[store] storing for http://example.com/3 -- START
[Main Executor] updating meta for http://example.com/1
[store] storing for http://example.com/3 -- END
[Main Executor] updating meta for http://example.com/3
[download] downloading for http://example.com/6 -- END
[download] downloading for http://example.com/7 -- START
[store] storing for http://example.com/6 -- START
[download] downloading for http://example.com/5 -- END
[download] downloading for http://example.com/8 -- START
[store] storing for http://example.com/6 -- END
[store] storing for http://example.com/5 -- START
[Main Executor] updating meta for http://example.com/6
[store] storing for http://example.com/4 -- END
[Main Executor] updating meta for http://example.com/4
[store] storing for http://example.com/5 -- END
[Main Executor] updating meta for http://example.com/5
[download] downloading for http://example.com/8 -- END
[download] downloading for http://example.com/9 -- START
[store] storing for http://example.com/8 -- START
[download] downloading for http://example.com/9 -- END
[store] storing for http://example.com/9 -- START
[download] downloading for http://example.com/7 -- END
[store] storing for http://example.com/8 -- END
[Main Executor] updating meta for http://example.com/8
[store] storing for http://example.com/7 -- START
[store] storing for http://example.com/9 -- END
[Main Executor] updating meta for http://example.com/9
[store] storing for http://example.com/7 -- END
[Main Executor] updating meta for http://example.com/7
[Main Executor] FINISH

ハマった点

知らない機能がいっぱいあるのでもっと簡潔に書ける気がします。

  1. トップ
  2. tech
  3. 数種類のタスクをタスクごとに別々の並列性ポリシー使いつつ、順次実行する
  1. トップ
  2. java
  3. 数種類のタスクをタスクごとに別々の並列性ポリシー使いつつ、順次実行する

土曜日の午後ぐらいから体調が悪くなった。日曜夜ぐらいから良くなって、朝には解熱した。

普段でも夢見が悪いが、熱がでるとさらにひどい夢を見る。なんとかしてほしい

もはや Promise がスタンダードに入り、モダンな実行環境ではポリフィルすら必要なく使えるケースが増えましたね。

かくいう自分も JSDeferred は使っておらず完全に Promise 依存に切替えております。外部ライブラリ依存なんてないほうがいい!!

JSDeferred と Promise の違い

機能的にはほぼ変わりがないので機械的に置き換えできますが、Promise は1度だけしか resolve できない点だけ違うので注意が必要。JSDeferred は値を保持しませんが、Promise は resolve した値を保持し、その後の then ではその値が返ってきます。

var resolver;
var promise = new Promise(function (resolve, reject) {
	resolver = resolve;
});

promise.then(function (r) {
	console.log(r); //=> foo
});

resolver('foo');

promise.then(function (r) {
	console.log(r); //=> foo
});

resolver('bar'); // nothing happened (invalid operation)

JSDeferred は遅延された (Deferred) な処理を表現していますが、Promise は未来の値に関する約束を表現している点で違いがでます (Promise は値なので継続(手続)のように扱うことはできない)

基本

JSDeferred() のグローバルな next() を引数なし Promise.resolve() に置き換えます。あとの next() は全部 then() に置き換えます。

next(function () {
    alert(1);
    return next(function () {
        alert(2);
    }).
    next(function () {
        alert(3);
    });
}).
next(function () {
    alert(4);
});

これを

Promise.resolve().then(function () {
    alert(1);
    return Promise.resolve().then(function () {
        alert(2);
    }).
    then(function () {
        alert(3);
    });
}).
then(function () {
    alert(4);
});

こうじゃ

parallel() は?

Promise.all(list) を使う

earlier() は?

Promise.race(list)

wait() は?

new Promise( (resolve) => {
    setTimeout(resolve, 100);
});
  1. トップ
  2. tech
  3. JSDeferred -> Promise 置き換え方法