概要
concurrent.futures
モジュールの ThreadPoolExecutor
及び ProcessPoolExecutor
による並列実行について解説します。
マルチプロセスとマルチスレッド
実行中のプログラムを抽象化したものをプロセス (process) といいます。一方、プロセス内の処理の実行単位をスレッド (thread) といいます。プロセスは1つ以上のスレッドを持ちます。プロセスとスレッドの関係を図で表すと以下のようになります。
作成、終了、コンテキストの切り替え、プロセス間 (スレッド間) のデータの交換といった操作はプロセスよりスレッドを使用したほうが軽量です。
項目 | プロセス | スレッド |
---|---|---|
操作 | 重い | 軽い |
リソース | プロセスごとに独立 | スレッドが属するプロセスのリソースを共有 |
並列化していないプログラムの場合、単一のプロセスに単一のスレッド (メインスレッド) があり、処理が逐次実行される形になります。 並列化するには、子プロセスを複数作成するマルチプロセスか、スレッドを複数作成するマルチスレッドのどちらか、または両方のアプローチをとります。
高速化が期待できる場合
プロセスやスレッドはいずれも作成や終了の処理コストがかかるため、そのコストを上回る重たい処理でないと並列化することによって却って遅くなってしまいます。ファイル I/O やネットワーク通信を伴う時間がかかる処理を複数回行う場合などは並列化を検討しましょう。
- 並列化により高速化が期待できる例
- ディレクトリ内の複数のファイルに対してそれぞれ処理する
- 複数の Web サイトから情報を取得する
Python の並列処理をサポートするライブラリ
標準ライブラリでは、マルチプロセスやマルチスレッドによる並列化をサポートする複数のモジュールが提供されています。これらはマルチプロセス (スレッド) を利用して並列化を行う点では同じですが、その実装方針により API が異なります。
- threading: スレッドを表すクラスを提供するモジュール
- multiprocessing: プロセスを表すクラスを提供するモジュール
- concurrent.futures: マルチスレッド、マルチプロセスを Future パターン により実現するモジュール
multiprocessing
や threading
はプロセスやスレッドを直接操作します。
一方、concurrent.futures
は、プロセスやスレッドが Future パターンにより隠蔽されており、スレッドの作成、終了、タスクの割当といった面倒な処理をライブラリ側が担うことで、より簡単に並列化が行えます。
futures モジュール
concurrent.futures
モジュールでは、並列処理を行う仕組みとして、マルチスレッドによる並列化を行う ThreadPoolExecutor とマルチプロセスによる並列化を行う concurrent.futures.ProcessPoolExecutor が提供されています。どちらも Executor クラスを基底クラスとしており、API はほぼ同じです。
ThreadPoolExecutor
: 各ワーカーをメインプロセス内の独立したスレッドで実行するProcessPoolExecutor
: 各ワーカーを独立した子プロセスで実行する
ThreadPoolExecutor
1. ThreadPoolExecutor インスタンスを作成する
タスクを行うオブジェクトをワーカー (worker) といいます。ThreadPoolExecutor
のコンストラクタ引数 max_workers
でワーカー、すなわちスレッドの最大数を指定します。デフォルトは min(論理コア数 + 4, 32)
となっています。例えば、論理コアが8コアの CPU であれば、max_workers
は12になります。
2. タスクを追加する
ワーカーで実行する処理をタスク (task) といいます。submit()
に関数など呼び出し可能なオブジェクトを指定することでタスクを追加できます。この関数は、タスクを追加すると Future オブジェクトを返して直ちに終了します。
関数に位置引数や名前付き引数を渡すこともできます。
3. すべてのタスクが終了するのを待機する
通常、submit() はタスクを追加するとただちに返るため、すべてのタスクが終了するまで待機する必要があります。as_completed()
に Future オブジェクトのリストを渡すことで、すべてのタスクが完了するまでそこで処理を停止できます。
ただし、今回のように with
コンテキストを使用して ThreadPoolExecutor
を作成した場合、with を抜けるタイミングで実行中のタスクの完了を待つようになっているので、as_completed()
は不要です。
4. タスクの結果を受け取る
submit()
に渡した関数が返り値を返す場合、タスクが完了したあと、Future.result()
で結果を受け取ります。
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
ThreadPoolExecutor.map
タスクに渡す引数の一覧が用意されている場合は ThreadPoolExecutor.map()
を使って書くこともできます。
この関数はタスクの結果を取得するための Generator を返します。
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
要素数が同じ iterable を与えることで、複数の引数を渡すこともできます。
['a: 0, b: A', 'a: 1, b: B', 'a: 2, b: C', 'a: 3, b: D', 'a: 4, b: E', 'a: 5, b: F', 'a: 6, b: G', 'a: 7, b: H', 'a: 8, b: I', 'a: 9, b: J']
固定値の引数は itertools.repeat()
で同じ値を繰り返す配列にして渡します。
['a: 0, b: A', 'a: 1, b: A', 'a: 2, b: A', 'a: 3, b: A', 'a: 4, b: A', 'a: 5, b: A', 'a: 6, b: A', 'a: 7, b: A', 'a: 8, b: A', 'a: 9, b: A']
例 大量の画像のりサイズ
並列化で高速化できる例として、3000枚の画像をリサイズして保存する処理を紹介します。リサイズ処理は OpenCV を使用します。
まずは普通に for で逐次処理した場合の時間を計測します。
CPU times: user 16.4 s, sys: 277 ms, total: 16.7 s Wall time: 16.7 s
次に ThreadPoolExecutor、ThreadPoolExecutor を利用した場合の速度を計測します。
CPU times: user 27.9 s, sys: 607 ms, total: 28.5 s Wall time: 3.69 s
CPU times: user 783 ms, sys: 108 ms, total: 891 ms Wall time: 3.9 s
使用した CPU は物理コアが4個で、ThreadPoolExecutor、ProcessPoolExecutor のいずれも4倍程高速化できました。 また、ThreadPoolExecutor のほうが ProcessPoolExecutor より若干早い結果となりました。
種類 | 時間 |
---|---|
for ループ | 16.7 s |
ThreadPoolExecutor | 3.69 s |
ProcessPoolExecutor | 3.9 s |
コメント