Warning: Undefined variable $position in /home/pystyles/pystyle.info/public_html/wp/wp-content/themes/lionblog/functions.php on line 4897

Python – concurrent.futures を使った並列化の方法について

Python – concurrent.futures を使った並列化の方法について

概要

concurrent.futures モジュールの ThreadPoolExecutor 及び ProcessPoolExecutor による並列実行について解説します。

Advertisement

マルチプロセスとマルチスレッド

実行中のプログラムを抽象化したものをプロセス (process) といいます。一方、プロセス内の処理の実行単位をスレッド (thread) といいます。プロセスは1つ以上のスレッドを持ちます。プロセスとスレッドの関係を図で表すと以下のようになります。

プロセスとスレッド

作成、終了、コンテキストの切り替え、プロセス間 (スレッド間) のデータの交換といった操作はプロセスよりスレッドを使用したほうが軽量です。

項目 プロセス スレッド
操作 重い 軽い
リソース プロセスごとに独立 スレッドが属するプロセスのリソースを共有

並列化していないプログラムの場合、単一のプロセスに単一のスレッド (メインスレッド) があり、処理が逐次実行される形になります。 並列化するには、子プロセスを複数作成するマルチプロセスか、スレッドを複数作成するマルチスレッドのどちらか、または両方のアプローチをとります。

高速化が期待できる場合

プロセスやスレッドはいずれも作成や終了の処理コストがかかるため、そのコストを上回る重たい処理でないと並列化することによって却って遅くなってしまいます。ファイル I/O やネットワーク通信を伴う時間がかかる処理を複数回行う場合などは並列化を検討しましょう。

  • 並列化により高速化が期待できる例
    • ディレクトリ内の複数のファイルに対してそれぞれ処理する
    • 複数の Web サイトから情報を取得する

Python の並列処理をサポートするライブラリ

標準ライブラリでは、マルチプロセスやマルチスレッドによる並列化をサポートする複数のモジュールが提供されています。これらはマルチプロセス (スレッド) を利用して並列化を行う点では同じですが、その実装方針により API が異なります。

  • threading: スレッドを表すクラスを提供するモジュール
  • multiprocessing: プロセスを表すクラスを提供するモジュール
  • concurrent.futures: マルチスレッド、マルチプロセスを Future パターン により実現するモジュール

multiprocessingthreading はプロセスやスレッドを直接操作します。 一方、concurrent.futures は、プロセスやスレッドが Future パターンにより隠蔽されており、スレッドの作成、終了、タスクの割当といった面倒な処理をライブラリ側が担うことで、より簡単に並列化が行えます。

futures モジュール

concurrent.futures モジュールでは、並列処理を行う仕組みとして、マルチスレッドによる並列化を行う ThreadPoolExecutor とマルチプロセスによる並列化を行う concurrent.futures.ProcessPoolExecutor が提供されています。どちらも Executor クラスを基底クラスとしており、API はほぼ同じです。

  • ThreadPoolExecutor: 各ワーカーをメインプロセス内の独立したスレッドで実行する
  • ProcessPoolExecutor: 各ワーカーを独立した子プロセスで実行する

ThreadPoolExecutor

1. ThreadPoolExecutor インスタンスを作成する

タスクを行うオブジェクトをワーカー (worker) といいます。ThreadPoolExecutor のコンストラクタ引数 max_workers でワーカー、すなわちスレッドの最大数を指定します。デフォルトは min(論理コア数 + 4, 32) となっています。例えば、論理コアが8コアの CPU であれば、max_workers は12になります。

2. タスクを追加する

ワーカーで実行する処理をタスク (task) といいます。submit() に関数など呼び出し可能なオブジェクトを指定することでタスクを追加できます。この関数は、タスクを追加すると Future オブジェクトを返して直ちに終了します。

関数に位置引数や名前付き引数を渡すこともできます。

def func(a, b, hoge):
    time.sleep(5)
    return a + b

submit(func, 1, 2, hoge="test")

3. すべてのタスクが終了するのを待機する

通常、submit() はタスクを追加するとただちに返るため、すべてのタスクが終了するまで待機する必要があります。as_completed() に Future オブジェクトのリストを渡すことで、すべてのタスクが完了するまでそこで処理を停止できます。 ただし、今回のように with コンテキストを使用して ThreadPoolExecutor を作成した場合、with を抜けるタイミングで実行中のタスクの完了を待つようになっているので、as_completed() は不要です。

4. タスクの結果を受け取る

submit() に渡した関数が返り値を返す場合、タスクが完了したあと、Future.result() で結果を受け取ります。

In [1]:
import time
from concurrent import futures


def func(i):
    time.sleep(2)
    return i * 2


future_list = []
with futures.ThreadPoolExecutor() as executor:
    for i in range(10):
        # タスクを追加する。
        future = executor.submit(func, i)
        # Future オブジェクトを記録する。
        future_list.append(future)

print([x.result() for x in future_list])
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

ThreadPoolExecutor.map

タスクに渡す引数の一覧が用意されている場合は ThreadPoolExecutor.map() を使って書くこともできます。 この関数はタスクの結果を取得するための Generator を返します。

In [2]:
from concurrent import futures


def func(i):
    time.sleep(2)
    return i * 2


with futures.ThreadPoolExecutor() as executor:
    rets = executor.map(func, range(10))

print([x for x in rets])
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

要素数が同じ iterable を与えることで、複数の引数を渡すこともできます。

In [3]:
from concurrent import futures


def func(a, b):
    return f"a: {a}, b: {b}"


with futures.ThreadPoolExecutor() as executor:
    rets = executor.map(func, range(10), "ABCDEFGHIJK")

print([x for x in rets])
['a: 0, b: A', 'a: 1, b: B', 'a: 2, b: C', 'a: 3, b: D', 'a: 4, b: E', 'a: 5, b: F', 'a: 6, b: G', 'a: 7, b: H', 'a: 8, b: I', 'a: 9, b: J']

固定値の引数は itertools.repeat() で同じ値を繰り返す配列にして渡します。

In [4]:
import itertools


def func(a, b):
    return f"a: {a}, b: {b}"


with futures.ThreadPoolExecutor() as executor:
    rets = executor.map(func, range(10), itertools.repeat("A"))

print([x for x in rets])
['a: 0, b: A', 'a: 1, b: A', 'a: 2, b: A', 'a: 3, b: A', 'a: 4, b: A', 'a: 5, b: A', 'a: 6, b: A', 'a: 7, b: A', 'a: 8, b: A', 'a: 9, b: A']

例 大量の画像のりサイズ

並列化で高速化できる例として、3000枚の画像をリサイズして保存する処理を紹介します。リサイズ処理は OpenCV を使用します。

In [5]:
from pathlib import Path

import cv2

img_dir = Path("/data/samples")
img_paths = list(img_dir.glob("*.jpg"))[:3000]

output_dir = Path("output")
output_dir.mkdir(exist_ok=True)


def resize(img_path):
    """1枚の画像をリサイズする関数
    """
    save_path = output_dir / img_path.name

    img = cv2.imread(str(img_path))
    img = cv2.resize(img, (256, 256))
    cv2.imwrite(str(save_path), img)

まずは普通に for で逐次処理した場合の時間を計測します。

In [6]:
%%time

for img_path in img_paths:
    resize(img_path)
CPU times: user 16.4 s, sys: 277 ms, total: 16.7 s
Wall time: 16.7 s

次に ThreadPoolExecutor、ThreadPoolExecutor を利用した場合の速度を計測します。

In [7]:
%%time

with futures.ThreadPoolExecutor() as executor:
    executor.map(resize, img_paths)
CPU times: user 27.9 s, sys: 607 ms, total: 28.5 s
Wall time: 3.69 s
In [8]:
%%time

with futures.ProcessPoolExecutor() as executor:
    executor.map(resize, img_paths)
CPU times: user 783 ms, sys: 108 ms, total: 891 ms
Wall time: 3.9 s

使用した CPU は物理コアが4個で、ThreadPoolExecutor、ProcessPoolExecutor のいずれも4倍程高速化できました。 また、ThreadPoolExecutor のほうが ProcessPoolExecutor より若干早い結果となりました。

種類 時間
for ループ 16.7 s
ThreadPoolExecutor 3.69 s
ProcessPoolExecutor 3.9 s

参考文献