(Java です)
- リモートからダウンロードしてくるタスク
- ストレージに格納するタスク
- メタデータを更新するタスク
みたいなのが大まかにあって、リモートからダウンロードしてくるのは2並列で、ストレージに格納するのも2並列で (ダウンロード先とストア先は別のところなので別々に並列にしたい)、メタデータ更新するのはコリジョンさせたくないから1スレッドで、みたいなことをやりたくなることがある。そういうイメージでいきます。
Java8 から CompletableFuture というのが入って、これが JavaScript の Promise みたいなやつにあたるようだ。名前が長くてウザいが我慢するしかない。
Java には ExecutorService というスレッドプールを管理していい感じに実行してくれる仕組みがあり、Executors.newXXX() というのにいくつか実装がある。
いっぱいメソッドがあるが、とりあえず CompletableFuture.runAsync() と thenRunAsync() あたりでなんとかなりそう。
並列で行われる部分は当然スレッドセーフでなければならないが、このコードの場合 metaExecutor はシングルスレッドなので metaExecutor 内では考えることが減る (downloadExecutor/storeExecutor/metaExecutor それぞれは並列で動くけど)
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class Main {
public static void main(String[] args) {
new Main().doMain(args);
private ExecutorService downloadExecutor;
private ExecutorService storeExecutor;
private ExecutorService metaExecutor;
private void heavyProcess(String name) {
System.out.println(String.format("%s -- START", name));
try {
Thread.currentThread().sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
System.out.println(String.format("%s -- END", name));
public void doMain(String[] args) {
downloadExecutor = Executors.newFixedThreadPool(2, r -> new Thread(r, "download"));
storeExecutor = Executors.newFixedThreadPool(2, r -> new Thread(r, "store"));
metaExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "meta"));
final List<String> urls = Arrays.asList(
System.out.println(String.format("[%s] START", Thread.currentThread().getName()));
final List<CompletableFuture<Void>> futures = urls.stream().map(
url -> CompletableFuture
.runAsync(() -> {
heavyProcess(String.format("[%s] downloading for %s", Thread.currentThread().getName(), url));
}, downloadExecutor)
.thenRunAsync(() -> {
heavyProcess(String.format("[%s] storing for %s", Thread.currentThread().getName(), url));
}, storeExecutor)
.thenRunAsync(() -> {
System.out.println(String.format("[%s] updating meta for %s", Thread.currentThread().getName(), url));
}, metaExecutor)
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenRun(() -> {
System.out.println(String.format("[%s] FINISH", Thread.currentThread().getName()));
// .join() しなければ即座にこの関数は終わる.
System.out.println(String.format("[%s] END OF MAIN", Thread.currentThread().getName()));
これとは別に BlockingQueue を使ったり、各タスクが次のタスクを自力で次の Executor に投げるみたいな実装を書いてみたが、CompletableFuture のほうが簡潔に書けた。
[Main] START [download] downloading for http://example.com/1 -- START [download] downloading for http://example.com/2 -- START [Main] END OF MAIN [download] downloading for http://example.com/2 -- END [download] downloading for http://example.com/3 -- START [store] storing for http://example.com/2 -- START [download] downloading for http://example.com/1 -- END [download] downloading for http://example.com/4 -- START [store] storing for http://example.com/1 -- START [download] downloading for http://example.com/4 -- END [download] downloading for http://example.com/5 -- START [download] downloading for http://example.com/3 -- END [download] downloading for http://example.com/6 -- START [store] storing for http://example.com/2 -- END [store] storing for http://example.com/4 -- START [Main Executor] updating meta for http://example.com/2 [store] storing for http://example.com/1 -- END [store] storing for http://example.com/3 -- START [Main Executor] updating meta for http://example.com/1 [store] storing for http://example.com/3 -- END [Main Executor] updating meta for http://example.com/3 [download] downloading for http://example.com/6 -- END [download] downloading for http://example.com/7 -- START [store] storing for http://example.com/6 -- START [download] downloading for http://example.com/5 -- END [download] downloading for http://example.com/8 -- START [store] storing for http://example.com/6 -- END [store] storing for http://example.com/5 -- START [Main Executor] updating meta for http://example.com/6 [store] storing for http://example.com/4 -- END [Main Executor] updating meta for http://example.com/4 [store] storing for http://example.com/5 -- END [Main Executor] updating meta for http://example.com/5 [download] downloading for http://example.com/8 -- END [download] downloading for http://example.com/9 -- START [store] storing for http://example.com/8 -- START [download] downloading for http://example.com/9 -- END [store] storing for http://example.com/9 -- START [download] downloading for http://example.com/7 -- END [store] storing for http://example.com/8 -- END [Main Executor] updating meta for http://example.com/8 [store] storing for http://example.com/7 -- START [store] storing for http://example.com/9 -- END [Main Executor] updating meta for http://example.com/9 [store] storing for http://example.com/7 -- END [Main Executor] updating meta for http://example.com/7 [Main Executor] FINISH