[Python] concurrent.futuresの使い方 – 複数処理の並列化
concurrent.futures
は、Pythonで並列処理や非同期処理を簡単に実装するためのモジュールです。
主にThreadPoolExecutor
とProcessPoolExecutor
の2つのクラスが提供されており、前者はスレッドを、後者はプロセスを使用して並列処理を行います。
submit()メソッド
でタスクを非同期に実行し、result()メソッド
で結果を取得できます。
また、map()メソッド
を使うと、複数のタスクを並列に処理できます。
concurrent.futuresとは
concurrent.futures
は、Pythonの標準ライブラリの一部で、並列処理を簡単に実装するためのモジュールです。
このモジュールは、スレッドやプロセスを使用して、複数のタスクを同時に実行することを可能にします。
特に、CPUバウンドな処理やI/Oバウンドな処理を効率的に行うために設計されています。
concurrent.futures
には、主に2つのクラスが用意されています。
1つはThreadPoolExecutor
で、スレッドを使用してタスクを並列実行します。
もう1つはProcessPoolExecutor
で、プロセスを使用してタスクを並列実行します。
これにより、開発者は複雑なスレッドやプロセス管理を意識することなく、簡単に並列処理を実装できます。
このモジュールを使用することで、プログラムのパフォーマンスを向上させ、処理時間を短縮することが可能になります。
特に、大量のデータ処理やネットワーク通信を行うアプリケーションにおいて、その効果は顕著です。
ThreadPoolExecutorの使い方
ThreadPoolExecutorの基本的な使い方
ThreadPoolExecutor
は、スレッドを使用してタスクを並列に実行するためのクラスです。
基本的な使い方は、まずインスタンスを作成し、その後タスクを追加して実行します。
以下は、ThreadPoolExecutor
の基本的な構文です。
from concurrent.futures import ThreadPoolExecutor
# スレッドプールの作成
executor = ThreadPoolExecutor(max_workers=5) # 最大5スレッド
submit()メソッドでタスクを実行する
submit()メソッド
を使用すると、タスクをスレッドプールに追加し、非同期に実行することができます。
このメソッドは、実行する関数とその引数を受け取ります。
以下は、submit()メソッド
の使用例です。
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
time.sleep(n) # n秒待機
return f"Task {n} completed"
with ThreadPoolExecutor(max_workers=3) as executor:
future = executor.submit(task, 2) # 2秒待機するタスクを追加
print(future.result()) # 結果を取得
Task 2 completed
result()メソッドで結果を取得する
submit()メソッド
で返されるFuture
オブジェクトを使用して、タスクの結果を取得できます。
result()メソッド
を呼び出すことで、タスクが完了するまで待機し、結果を取得します。
上記の例でも使用されています。
map()メソッドで複数タスクを並列実行
map()メソッド
を使用すると、複数のタスクを一度に並列実行できます。
このメソッドは、リストやイテラブルを受け取り、各要素に対して指定した関数を適用します。
以下は、map()メソッド
の使用例です。
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
time.sleep(n) # n秒待機
return f"Task {n} completed"
with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(task, [1, 2, 3]) # 1, 2, 3秒待機するタスクを追加
for result in results:
print(result)
Task 1 completed
Task 2 completed
Task 3 completed
with文を使ったリソース管理
with
文を使用することで、ThreadPoolExecutor
のリソースを自動的に管理できます。
with
文を使うと、スレッドプールが自動的にシャットダウンされ、リソースが解放されます。
上記の例でもwith
文を使用しています。
実行中のタスクのキャンセル方法
Future
オブジェクトのcancel()メソッド
を使用することで、実行中のタスクをキャンセルできます。
ただし、タスクがすでに実行中の場合はキャンセルできません。
以下は、タスクのキャンセルの例です。
from concurrent.futures import ThreadPoolExecutor
import time
def task(n):
time.sleep(n) # n秒待機
return f"Task {n} completed"
with ThreadPoolExecutor(max_workers=3) as executor:
future = executor.submit(task, 5) # 5秒待機するタスクを追加
future.cancel() # タスクをキャンセル
print(f"Task cancelled: {future.cancelled()}") # キャンセルの結果を表示
Task cancelled: TrueまたはFalse
このように、ThreadPoolExecutor
を使用することで、簡単にスレッドを利用した並列処理を実装できます。
ProcessPoolExecutorの使い方
ProcessPoolExecutorの基本的な使い方
ProcessPoolExecutor
は、プロセスを使用してタスクを並列に実行するためのクラスです。
ThreadPoolExecutor
と同様に、タスクを追加して実行することができます。
以下は、ProcessPoolExecutor
の基本的な構文です。
from concurrent.futures import ProcessPoolExecutor
# プロセスプールの作成
executor = ProcessPoolExecutor(max_workers=4) # 最大4プロセス
ThreadPoolExecutorとの違い
ThreadPoolExecutor
はスレッドを使用するのに対し、ProcessPoolExecutor
はプロセスを使用します。
スレッドは軽量で、同じメモリ空間を共有しますが、プロセスは独立したメモリ空間を持ちます。
このため、ProcessPoolExecutor
はCPUバウンドな処理に適しており、GIL(Global Interpreter Lock)の影響を受けません。
以下の表に、両者の違いをまとめます。
特徴 | ThreadPoolExecutor | ProcessPoolExecutor |
---|---|---|
使用する単位 | スレッド | プロセス |
メモリ空間 | 共有 | 独立 |
GILの影響 | 受ける | 受けない |
適した処理 | I/Oバウンドな処理 | CPUバウンドな処理 |
CPUバウンドな処理に適した使い方
ProcessPoolExecutor
は、CPUバウンドな処理に特に適しています。
例えば、大量の計算を行うタスクを並列に実行する場合に効果的です。
以下は、CPUバウンドな処理の例です。
from concurrent.futures import ProcessPoolExecutor
def compute_square(n):
return n * n
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(compute_square, range(10)) # 0から9までの平方を計算
for result in results:
print(result)
0
1
4
9
16
25
36
49
64
81
非同期処理の実行と結果の取得
ProcessPoolExecutor
でも、submit()メソッド
やmap()メソッド
を使用して非同期処理を実行できます。
Future
オブジェクトを使用して結果を取得することも可能です。
以下は、非同期処理の実行と結果の取得の例です。
from concurrent.futures import ProcessPoolExecutor
def task(n):
return n * n
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=4) as executor:
future = executor.submit(task, 5) # 5の平方を計算するタスクを追加
print(future.result()) # 結果を取得
25
プロセス間通信の注意点
ProcessPoolExecutor
を使用する際は、プロセス間通信に注意が必要です。
プロセスは独立したメモリ空間を持つため、データの共有が難しくなります。
共有データが必要な場合は、multiprocessing
モジュールのQueue
やPipe
を使用することが推奨されます。
また、オブジェクトのシリアライズ(pickle化)が必要になるため、シリアライズ可能なオブジェクトを使用することが重要です。
このように、ProcessPoolExecutor
を使用することで、CPUバウンドな処理を効率的に並列実行することができますが、プロセス間通信に関する注意が必要です。
実践例:並列処理の活用
Webスクレイピングの並列化
Webスクレイピングでは、複数のページを同時に取得することで、処理時間を大幅に短縮できます。
ThreadPoolExecutor
を使用して、複数のURLからデータを並列に取得する例を示します。
import requests
from concurrent.futures import ThreadPoolExecutor
def fetch_url(url):
response = requests.get(url)
return response.text[:100] # 取得したHTMLの最初の100文字を返す
urls = [
'https://example.com',
'https://example.org',
'https://example.net',
]
with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(fetch_url, urls)
for result in results:
print(result)
<!doctype html>... # example.comのHTMLの一部
<!doctype html>... # example.orgのHTMLの一部
<!doctype html>... # example.netのHTMLの一部
ファイルI/Oの並列処理
ファイルの読み書きも並列処理の対象です。
特に、大量のファイルを処理する場合、ThreadPoolExecutor
を使用してI/Oバウンドな処理を効率化できます。
以下は、複数のファイルを同時に読み込む例です。
import os
from concurrent.futures import ThreadPoolExecutor
def read_file(file_path):
with open(file_path, 'r') as f:
return f.read()
file_paths = ['file1.txt', 'file2.txt', 'file3.txt']
with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(read_file, file_paths)
for result in results:
print(result[:100]) # 各ファイルの最初の100文字を表示
ファイル1の内容の最初の100文字
ファイル2の内容の最初の100文字
ファイル3の内容の最初の100文字
大量データの並列処理
大量のデータを処理する場合、ProcessPoolExecutor
を使用してCPUバウンドな処理を並列化することが効果的です。
以下は、リスト内の数値を平方する例です。
from concurrent.futures import ProcessPoolExecutor
def compute_square(n):
return n * n
data = list(range(10000)) # 0から9999までのリスト
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(compute_square, data)
for result in results:
if result % 100000 == 0: # 10万ごとに結果を表示
print(result)
0
1000000
4000000
9000000
16000000
25000000
36000000
49000000
64000000
81000000
APIリクエストの並列化
APIリクエストを並列に実行することで、レスポンスを迅速に取得できます。
以下は、複数のAPIエンドポイントに対してリクエストを送信する例です。
import requests
from concurrent.futures import ThreadPoolExecutor
def fetch_api_data(endpoint):
response = requests.get(endpoint)
return response.json() # JSON形式で返す
endpoints = [
'https://api.example.com/data1',
'https://api.example.com/data2',
'https://api.example.com/data3',
]
with ThreadPoolExecutor(max_workers=3) as executor:
results = executor.map(fetch_api_data, endpoints)
for result in results:
print(result) # 各APIのレスポンスを表示
{'key1': 'value1', ...} # API1のレスポンス
{'key2': 'value2', ...} # API2のレスポンス
{'key3': 'value3', ...} # API3のレスポンス
画像処理の並列化
画像処理も並列化することで、処理時間を短縮できます。
以下は、複数の画像を同時にリサイズする例です。
from PIL import Image
from concurrent.futures import ProcessPoolExecutor
def resize_image(image_path):
with Image.open(image_path) as img:
img = img.resize((100, 100)) # 100x100にリサイズ
img.save(f'resized_{image_path}') # 保存
image_paths = ['image1.jpg', 'image2.jpg', 'image3.jpg']
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=4) as executor:
executor.map(resize_image, image_paths)
このように、concurrent.futures
を活用することで、さまざまな処理を並列化し、効率的に実行することができます。
エラーハンドリングとデバッグ
タスクの例外処理
concurrent.futures
を使用する際、タスクが例外を発生させることがあります。
Future
オブジェクトのresult()メソッド
を呼び出すと、タスク内で発生した例外が再スローされます。
これにより、例外をキャッチして適切に処理することができます。
以下は、タスクの例外処理の例です。
from concurrent.futures import ThreadPoolExecutor
def task(n):
if n == 0:
raise ValueError("n must not be zero")
return 10 / n
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, n) for n in [1, 2, 0, 4]]
for future in futures:
try:
result = future.result()
print(result)
except Exception as e:
print(f"Error: {e}")
10.0
5.0
Error: n must not be zero
2.5
as_completed()でのエラーハンドリング
as_completed()メソッド
を使用すると、タスクが完了するたびに結果を取得できます。
このメソッドを使うことで、タスクの結果を順次処理し、例外が発生した場合も適切にハンドリングできます。
以下は、as_completed()
を使用した例です。
from concurrent.futures import ThreadPoolExecutor, as_completed
def task(n):
if n == 0:
raise ValueError("n must not be zero")
return 10 / n
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(task, n): n for n in [1, 2, 0, 4]}
for future in as_completed(futures):
n = futures[future]
try:
result = future.result()
print(f"Result for {n}: {result}")
except Exception as e:
print(f"Error for {n}: {e}")
Result for 1: 10.0
Result for 2: 5.0
Error for 0: n must not be zero
Result for 4: 2.5
タスクのタイムアウト処理
タスクが指定した時間内に完了しない場合、タイムアウトを設定して処理を中断することができます。
result(timeout)メソッド
を使用することで、タイムアウトを設定できます。
以下は、タイムアウト処理の例です。
from concurrent.futures import ThreadPoolExecutor
import time
def long_running_task():
time.sleep(5) # 5秒待機
return "Task completed"
with ThreadPoolExecutor(max_workers=2) as executor:
future = executor.submit(long_running_task)
try:
result = future.result(timeout=2) # 2秒のタイムアウトを設定
print(result)
except Exception as e:
print(f"Error: {e}")
Error: Future finished with an error: TimeoutError()
デバッグ時の注意点
concurrent.futures
を使用する際のデバッグにはいくつかの注意点があります。
以下に、デバッグ時に考慮すべきポイントを示します。
- 例外の再スロー: タスク内で発生した例外は、
Future
オブジェクトのresult()メソッド
を呼び出すことで再スローされます。
これを利用して、例外を適切にキャッチし、ログを出力することが重要です。
- タスクの状態確認:
Future
オブジェクトのdone()メソッド
やcancelled()メソッド
を使用して、タスクの状態を確認できます。
これにより、タスクが正常に完了したかどうかを確認できます。
- ロギングの活用: タスク内での処理状況やエラーをログに記録することで、後から問題を追跡しやすくなります。
Pythonのlogging
モジュールを活用することをお勧めします。
- デバッグツールの利用: IDEのデバッガや
pdb
モジュールを使用して、タスクの実行状況を逐次確認することができます。
特に、並列処理ではタスクの実行順序が不明確になるため、デバッガが役立ちます。
これらのポイントを考慮することで、concurrent.futures
を使用したプログラムのデバッグが容易になります。
応用例:concurrent.futuresの高度な使い方
タスクの優先度を設定する
concurrent.futures
自体にはタスクの優先度を直接設定する機能はありませんが、タスクをキューに追加する際に優先度を考慮することで、擬似的に優先度を管理できます。
queue.PriorityQueue
を使用して、優先度に基づいてタスクを処理する例を示します。
from concurrent.futures import ThreadPoolExecutor
import queue
import time
def task(n):
time.sleep(n) # n秒待機
return f"Task {n} completed"
# 優先度付きキューの作成
priority_queue = queue.PriorityQueue()
priority_queue.put((2, 1)) # 優先度2のタスク
priority_queue.put((1, 2)) # 優先度1のタスク
with ThreadPoolExecutor(max_workers=2) as executor:
futures = {executor.submit(task, n): priority for priority, n in priority_queue.queue}
for future in futures:
print(future.result())
Task 2 completed
Task 1 completed
タスクの依存関係を管理する
タスクの依存関係を管理するためには、タスクが完了するまで次のタスクを実行しないようにする必要があります。
Future
オブジェクトを使用して、依存関係を管理する例を示します。
from concurrent.futures import ThreadPoolExecutor
def task_a():
return "Task A completed"
def task_b(dep):
return f"Task B depends on: {dep}"
with ThreadPoolExecutor(max_workers=2) as executor:
future_a = executor.submit(task_a)
future_b = executor.submit(task_b, future_a.result()) # task_aの結果に依存
print(future_b.result())
Task B depends on: Task A completed
並列処理のパフォーマンスチューニング
並列処理のパフォーマンスを向上させるためには、以下のポイントを考慮することが重要です。
- ワーカー数の調整:
max_workers
の値を適切に設定することで、CPUやI/Oの負荷に応じた最適なスレッド数を選択できます。 - タスクの粒度: タスクの粒度を適切に設定することで、オーバーヘッドを減少させ、効率的な並列処理が可能になります。
- I/OバウンドとCPUバウンドのバランス: I/Oバウンドな処理とCPUバウンドな処理を適切に分け、必要に応じて
ThreadPoolExecutor
とProcessPoolExecutor
を使い分けることが重要です。
非同期処理と同期処理の組み合わせ
非同期処理と同期処理を組み合わせることで、より柔軟なプログラムを作成できます。
以下は、非同期処理を使用してデータを取得し、その後に同期処理を行う例です。
from concurrent.futures import ThreadPoolExecutor
import requests
def fetch_data(url):
response = requests.get(url)
return response.json()
def process_data(data):
# データの処理
return f"Processed {len(data)} items"
urls = ['https://api.example.com/data1', 'https://api.example.com/data2']
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(fetch_data, url) for url in urls]
results = [future.result() for future in futures] # 非同期処理の結果を取得
for data in results:
print(process_data(data)) # 同期処理でデータを処理
並列処理の進捗状況をモニタリングする
タスクの進捗状況をモニタリングするためには、Future
オブジェクトのdone()メソッド
やcancelled()メソッド
を使用して、タスクの状態を確認できます。
以下は、タスクの進捗状況を表示する例です。
from concurrent.futures import ThreadPoolExecutor
import time
def long_running_task(n):
time.sleep(n) # n秒待機
return f"Task {n} completed"
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(long_running_task, n): n for n in [3, 2, 1]}
while futures:
for future in list(futures):
if future.done():
print(f"Completed: {future.result()}")
del futures[future] # 完了したタスクを削除
time.sleep(0.5) # 状態確認の間隔
Completed: Task 1 completed
Completed: Task 2 completed
Completed: Task 3 completed
これらの応用例を通じて、concurrent.futures
を活用した高度な並列処理の実装が可能になります。
タスクの優先度や依存関係を管理し、パフォーマンスを最適化することで、より効率的なプログラムを作成できます。
まとめ
この記事では、Pythonのconcurrent.futures
モジュールを活用した並列処理の基本から応用までを詳しく解説しました。
特に、ThreadPoolExecutor
とProcessPoolExecutor
の使い方や、それぞれの特性に応じた適切な選択方法、さらにはエラーハンドリングやデバッグのテクニックについても触れました。
これらの知識を活用することで、効率的なプログラムを作成し、処理時間を短縮することが可能になります。
ぜひ、実際のプロジェクトにおいてこれらのテクニックを試し、並列処理の効果を実感してみてください。