スレッド

[Python] concurrent.futuresの使い方 – 複数処理の並列化

concurrent.futuresは、Pythonで並列処理や非同期処理を簡単に実装するためのモジュールです。

主にThreadPoolExecutorProcessPoolExecutorの2つのクラスが提供されており、前者はスレッドを、後者はプロセスを使用して並列処理を行います。

submit()メソッドでタスクを非同期に実行し、result()メソッドで結果を取得できます。

また、map()メソッドを使うと、複数のタスクを並列に処理できます。

concurrent.futuresとは

concurrent.futuresは、Pythonの標準ライブラリの一部で、並列処理を簡単に実装するためのモジュールです。

このモジュールは、スレッドやプロセスを使用して、複数のタスクを同時に実行することを可能にします。

特に、CPUバウンドな処理やI/Oバウンドな処理を効率的に行うために設計されています。

concurrent.futuresには、主に2つのクラスが用意されています。

1つはThreadPoolExecutorで、スレッドを使用してタスクを並列実行します。

もう1つはProcessPoolExecutorで、プロセスを使用してタスクを並列実行します。

これにより、開発者は複雑なスレッドやプロセス管理を意識することなく、簡単に並列処理を実装できます。

このモジュールを使用することで、プログラムのパフォーマンスを向上させ、処理時間を短縮することが可能になります。

特に、大量のデータ処理やネットワーク通信を行うアプリケーションにおいて、その効果は顕著です。

ThreadPoolExecutorの使い方

ThreadPoolExecutorの基本的な使い方

ThreadPoolExecutorは、スレッドを使用してタスクを並列に実行するためのクラスです。

基本的な使い方は、まずインスタンスを作成し、その後タスクを追加して実行します。

以下は、ThreadPoolExecutorの基本的な構文です。

from concurrent.futures import ThreadPoolExecutor
# スレッドプールの作成
executor = ThreadPoolExecutor(max_workers=5)  # 最大5スレッド

submit()メソッドでタスクを実行する

submit()メソッドを使用すると、タスクをスレッドプールに追加し、非同期に実行することができます。

このメソッドは、実行する関数とその引数を受け取ります。

以下は、submit()メソッドの使用例です。

from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
    time.sleep(n)  # n秒待機
    return f"Task {n} completed"
with ThreadPoolExecutor(max_workers=3) as executor:
    future = executor.submit(task, 2)  # 2秒待機するタスクを追加
    print(future.result())  # 結果を取得
Task 2 completed

result()メソッドで結果を取得する

submit()メソッドで返されるFutureオブジェクトを使用して、タスクの結果を取得できます。

result()メソッドを呼び出すことで、タスクが完了するまで待機し、結果を取得します。

上記の例でも使用されています。

map()メソッドで複数タスクを並列実行

map()メソッドを使用すると、複数のタスクを一度に並列実行できます。

このメソッドは、リストやイテラブルを受け取り、各要素に対して指定した関数を適用します。

以下は、map()メソッドの使用例です。

from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
    time.sleep(n)  # n秒待機
    return f"Task {n} completed"
with ThreadPoolExecutor(max_workers=3) as executor:
    results = executor.map(task, [1, 2, 3])  # 1, 2, 3秒待機するタスクを追加
    for result in results:
        print(result)
Task 1 completed
Task 2 completed
Task 3 completed

with文を使ったリソース管理

with文を使用することで、ThreadPoolExecutorのリソースを自動的に管理できます。

with文を使うと、スレッドプールが自動的にシャットダウンされ、リソースが解放されます。

上記の例でもwith文を使用しています。

実行中のタスクのキャンセル方法

Futureオブジェクトのcancel()メソッドを使用することで、実行中のタスクをキャンセルできます。

ただし、タスクがすでに実行中の場合はキャンセルできません。

以下は、タスクのキャンセルの例です。

from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
    time.sleep(n)  # n秒待機
    return f"Task {n} completed"
with ThreadPoolExecutor(max_workers=3) as executor:
    future = executor.submit(task, 5)  # 5秒待機するタスクを追加
    future.cancel()  # タスクをキャンセル
    print(f"Task cancelled: {future.cancelled()}")  # キャンセルの結果を表示
Task cancelled: TrueまたはFalse

このように、ThreadPoolExecutorを使用することで、簡単にスレッドを利用した並列処理を実装できます。

ProcessPoolExecutorの使い方

ProcessPoolExecutorの基本的な使い方

ProcessPoolExecutorは、プロセスを使用してタスクを並列に実行するためのクラスです。

ThreadPoolExecutorと同様に、タスクを追加して実行することができます。

以下は、ProcessPoolExecutorの基本的な構文です。

from concurrent.futures import ProcessPoolExecutor
# プロセスプールの作成
executor = ProcessPoolExecutor(max_workers=4)  # 最大4プロセス

ThreadPoolExecutorとの違い

ThreadPoolExecutorはスレッドを使用するのに対し、ProcessPoolExecutorはプロセスを使用します。

スレッドは軽量で、同じメモリ空間を共有しますが、プロセスは独立したメモリ空間を持ちます。

このため、ProcessPoolExecutorはCPUバウンドな処理に適しており、GIL(Global Interpreter Lock)の影響を受けません。

以下の表に、両者の違いをまとめます。

特徴ThreadPoolExecutorProcessPoolExecutor
使用する単位スレッドプロセス
メモリ空間共有独立
GILの影響受ける受けない
適した処理I/Oバウンドな処理CPUバウンドな処理

CPUバウンドな処理に適した使い方

ProcessPoolExecutorは、CPUバウンドな処理に特に適しています。

例えば、大量の計算を行うタスクを並列に実行する場合に効果的です。

以下は、CPUバウンドな処理の例です。

from concurrent.futures import ProcessPoolExecutor

def compute_square(n):
    return n * n

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = executor.map(compute_square, range(10))  # 0から9までの平方を計算
        for result in results:
            print(result)
0
1
4
9
16
25
36
49
64
81

非同期処理の実行と結果の取得

ProcessPoolExecutorでも、submit()メソッドmap()メソッドを使用して非同期処理を実行できます。

Futureオブジェクトを使用して結果を取得することも可能です。

以下は、非同期処理の実行と結果の取得の例です。

from concurrent.futures import ProcessPoolExecutor
def task(n):
    return n * n
if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as executor:
        future = executor.submit(task, 5)  # 5の平方を計算するタスクを追加
        print(future.result())  # 結果を取得
25

プロセス間通信の注意点

ProcessPoolExecutorを使用する際は、プロセス間通信に注意が必要です。

プロセスは独立したメモリ空間を持つため、データの共有が難しくなります。

共有データが必要な場合は、multiprocessingモジュールのQueuePipeを使用することが推奨されます。

また、オブジェクトのシリアライズ(pickle化)が必要になるため、シリアライズ可能なオブジェクトを使用することが重要です。

このように、ProcessPoolExecutorを使用することで、CPUバウンドな処理を効率的に並列実行することができますが、プロセス間通信に関する注意が必要です。

実践例:並列処理の活用

Webスクレイピングの並列化

Webスクレイピングでは、複数のページを同時に取得することで、処理時間を大幅に短縮できます。

ThreadPoolExecutorを使用して、複数のURLからデータを並列に取得する例を示します。

import requests
from concurrent.futures import ThreadPoolExecutor
def fetch_url(url):
    response = requests.get(url)
    return response.text[:100]  # 取得したHTMLの最初の100文字を返す
urls = [
    'https://example.com',
    'https://example.org',
    'https://example.net',
]
with ThreadPoolExecutor(max_workers=3) as executor:
    results = executor.map(fetch_url, urls)
    for result in results:
        print(result)
<!doctype html>...  # example.comのHTMLの一部
<!doctype html>...  # example.orgのHTMLの一部
<!doctype html>...  # example.netのHTMLの一部

ファイルI/Oの並列処理

ファイルの読み書きも並列処理の対象です。

特に、大量のファイルを処理する場合、ThreadPoolExecutorを使用してI/Oバウンドな処理を効率化できます。

以下は、複数のファイルを同時に読み込む例です。

import os
from concurrent.futures import ThreadPoolExecutor
def read_file(file_path):
    with open(file_path, 'r') as f:
        return f.read()
file_paths = ['file1.txt', 'file2.txt', 'file3.txt']
with ThreadPoolExecutor(max_workers=3) as executor:
    results = executor.map(read_file, file_paths)
    for result in results:
        print(result[:100])  # 各ファイルの最初の100文字を表示
ファイル1の内容の最初の100文字
ファイル2の内容の最初の100文字
ファイル3の内容の最初の100文字

大量データの並列処理

大量のデータを処理する場合、ProcessPoolExecutorを使用してCPUバウンドな処理を並列化することが効果的です。

以下は、リスト内の数値を平方する例です。

from concurrent.futures import ProcessPoolExecutor
def compute_square(n):
    return n * n
data = list(range(10000))  # 0から9999までのリスト
if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = executor.map(compute_square, data)
        for result in results:
            if result % 100000 == 0:  # 10万ごとに結果を表示
                print(result)
0
1000000
4000000
9000000
16000000
25000000
36000000
49000000
64000000
81000000

APIリクエストの並列化

APIリクエストを並列に実行することで、レスポンスを迅速に取得できます。

以下は、複数のAPIエンドポイントに対してリクエストを送信する例です。

import requests
from concurrent.futures import ThreadPoolExecutor
def fetch_api_data(endpoint):
    response = requests.get(endpoint)
    return response.json()  # JSON形式で返す
endpoints = [
    'https://api.example.com/data1',
    'https://api.example.com/data2',
    'https://api.example.com/data3',
]
with ThreadPoolExecutor(max_workers=3) as executor:
    results = executor.map(fetch_api_data, endpoints)
    for result in results:
        print(result)  # 各APIのレスポンスを表示
{'key1': 'value1', ...}  # API1のレスポンス
{'key2': 'value2', ...}  # API2のレスポンス
{'key3': 'value3', ...}  # API3のレスポンス

画像処理の並列化

画像処理も並列化することで、処理時間を短縮できます。

以下は、複数の画像を同時にリサイズする例です。

from PIL import Image
from concurrent.futures import ProcessPoolExecutor
def resize_image(image_path):
    with Image.open(image_path) as img:
        img = img.resize((100, 100))  # 100x100にリサイズ
        img.save(f'resized_{image_path}')  # 保存
image_paths = ['image1.jpg', 'image2.jpg', 'image3.jpg']
if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=4) as executor:
        executor.map(resize_image, image_paths)

このように、concurrent.futuresを活用することで、さまざまな処理を並列化し、効率的に実行することができます。

エラーハンドリングとデバッグ

タスクの例外処理

concurrent.futuresを使用する際、タスクが例外を発生させることがあります。

Futureオブジェクトのresult()メソッドを呼び出すと、タスク内で発生した例外が再スローされます。

これにより、例外をキャッチして適切に処理することができます。

以下は、タスクの例外処理の例です。

from concurrent.futures import ThreadPoolExecutor
def task(n):
    if n == 0:
        raise ValueError("n must not be zero")
    return 10 / n
with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(task, n) for n in [1, 2, 0, 4]]
    for future in futures:
        try:
            result = future.result()
            print(result)
        except Exception as e:
            print(f"Error: {e}")
10.0
5.0
Error: n must not be zero
2.5

as_completed()でのエラーハンドリング

as_completed()メソッドを使用すると、タスクが完了するたびに結果を取得できます。

このメソッドを使うことで、タスクの結果を順次処理し、例外が発生した場合も適切にハンドリングできます。

以下は、as_completed()を使用した例です。

from concurrent.futures import ThreadPoolExecutor, as_completed
def task(n):
    if n == 0:
        raise ValueError("n must not be zero")
    return 10 / n
with ThreadPoolExecutor(max_workers=3) as executor:
    futures = {executor.submit(task, n): n for n in [1, 2, 0, 4]}
    for future in as_completed(futures):
        n = futures[future]
        try:
            result = future.result()
            print(f"Result for {n}: {result}")
        except Exception as e:
            print(f"Error for {n}: {e}")
Result for 1: 10.0
Result for 2: 5.0
Error for 0: n must not be zero
Result for 4: 2.5

タスクのタイムアウト処理

タスクが指定した時間内に完了しない場合、タイムアウトを設定して処理を中断することができます。

result(timeout)メソッドを使用することで、タイムアウトを設定できます。

以下は、タイムアウト処理の例です。

from concurrent.futures import ThreadPoolExecutor
import time
def long_running_task():
    time.sleep(5)  # 5秒待機
    return "Task completed"
with ThreadPoolExecutor(max_workers=2) as executor:
    future = executor.submit(long_running_task)
    try:
        result = future.result(timeout=2)  # 2秒のタイムアウトを設定
        print(result)
    except Exception as e:
        print(f"Error: {e}")
Error: Future finished with an error: TimeoutError()

デバッグ時の注意点

concurrent.futuresを使用する際のデバッグにはいくつかの注意点があります。

以下に、デバッグ時に考慮すべきポイントを示します。

  • 例外の再スロー: タスク内で発生した例外は、Futureオブジェクトのresult()メソッドを呼び出すことで再スローされます。

これを利用して、例外を適切にキャッチし、ログを出力することが重要です。

  • タスクの状態確認: Futureオブジェクトのdone()メソッドcancelled()メソッドを使用して、タスクの状態を確認できます。

これにより、タスクが正常に完了したかどうかを確認できます。

  • ロギングの活用: タスク内での処理状況やエラーをログに記録することで、後から問題を追跡しやすくなります。

Pythonのloggingモジュールを活用することをお勧めします。

  • デバッグツールの利用: IDEのデバッガやpdbモジュールを使用して、タスクの実行状況を逐次確認することができます。

特に、並列処理ではタスクの実行順序が不明確になるため、デバッガが役立ちます。

これらのポイントを考慮することで、concurrent.futuresを使用したプログラムのデバッグが容易になります。

応用例:concurrent.futuresの高度な使い方

タスクの優先度を設定する

concurrent.futures自体にはタスクの優先度を直接設定する機能はありませんが、タスクをキューに追加する際に優先度を考慮することで、擬似的に優先度を管理できます。

queue.PriorityQueueを使用して、優先度に基づいてタスクを処理する例を示します。

from concurrent.futures import ThreadPoolExecutor
import queue
import time
def task(n):
    time.sleep(n)  # n秒待機
    return f"Task {n} completed"
# 優先度付きキューの作成
priority_queue = queue.PriorityQueue()
priority_queue.put((2, 1))  # 優先度2のタスク
priority_queue.put((1, 2))  # 優先度1のタスク
with ThreadPoolExecutor(max_workers=2) as executor:
    futures = {executor.submit(task, n): priority for priority, n in priority_queue.queue}
    for future in futures:
        print(future.result())
Task 2 completed
Task 1 completed

タスクの依存関係を管理する

タスクの依存関係を管理するためには、タスクが完了するまで次のタスクを実行しないようにする必要があります。

Futureオブジェクトを使用して、依存関係を管理する例を示します。

from concurrent.futures import ThreadPoolExecutor
def task_a():
    return "Task A completed"
def task_b(dep):
    return f"Task B depends on: {dep}"
with ThreadPoolExecutor(max_workers=2) as executor:
    future_a = executor.submit(task_a)
    future_b = executor.submit(task_b, future_a.result())  # task_aの結果に依存
    print(future_b.result())
Task B depends on: Task A completed

並列処理のパフォーマンスチューニング

並列処理のパフォーマンスを向上させるためには、以下のポイントを考慮することが重要です。

  • ワーカー数の調整: max_workersの値を適切に設定することで、CPUやI/Oの負荷に応じた最適なスレッド数を選択できます。
  • タスクの粒度: タスクの粒度を適切に設定することで、オーバーヘッドを減少させ、効率的な並列処理が可能になります。
  • I/OバウンドとCPUバウンドのバランス: I/Oバウンドな処理とCPUバウンドな処理を適切に分け、必要に応じてThreadPoolExecutorProcessPoolExecutorを使い分けることが重要です。

非同期処理と同期処理の組み合わせ

非同期処理と同期処理を組み合わせることで、より柔軟なプログラムを作成できます。

以下は、非同期処理を使用してデータを取得し、その後に同期処理を行う例です。

from concurrent.futures import ThreadPoolExecutor
import requests
def fetch_data(url):
    response = requests.get(url)
    return response.json()
def process_data(data):
    # データの処理
    return f"Processed {len(data)} items"
urls = ['https://api.example.com/data1', 'https://api.example.com/data2']
with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(fetch_data, url) for url in urls]
    results = [future.result() for future in futures]  # 非同期処理の結果を取得
    for data in results:
        print(process_data(data))  # 同期処理でデータを処理

並列処理の進捗状況をモニタリングする

タスクの進捗状況をモニタリングするためには、Futureオブジェクトのdone()メソッドcancelled()メソッドを使用して、タスクの状態を確認できます。

以下は、タスクの進捗状況を表示する例です。

from concurrent.futures import ThreadPoolExecutor
import time
def long_running_task(n):
    time.sleep(n)  # n秒待機
    return f"Task {n} completed"
with ThreadPoolExecutor(max_workers=3) as executor:
    futures = {executor.submit(long_running_task, n): n for n in [3, 2, 1]}
    while futures:
        for future in list(futures):
            if future.done():
                print(f"Completed: {future.result()}")
                del futures[future]  # 完了したタスクを削除
        time.sleep(0.5)  # 状態確認の間隔
Completed: Task 1 completed
Completed: Task 2 completed
Completed: Task 3 completed

これらの応用例を通じて、concurrent.futuresを活用した高度な並列処理の実装が可能になります。

タスクの優先度や依存関係を管理し、パフォーマンスを最適化することで、より効率的なプログラムを作成できます。

まとめ

この記事では、Pythonのconcurrent.futuresモジュールを活用した並列処理の基本から応用までを詳しく解説しました。

特に、ThreadPoolExecutorProcessPoolExecutorの使い方や、それぞれの特性に応じた適切な選択方法、さらにはエラーハンドリングやデバッグのテクニックについても触れました。

これらの知識を活用することで、効率的なプログラムを作成し、処理時間を短縮することが可能になります。

ぜひ、実際のプロジェクトにおいてこれらのテクニックを試し、並列処理の効果を実感してみてください。

関連記事

Back to top button