[Python] マルチスレッドでロックして排他制御する

Pythonでマルチスレッドを使用する際、複数のスレッドが同時に共有リソースにアクセスするとデータ競合が発生する可能性があります。

この問題を解決するために、Pythonの標準ライブラリであるthreadingモジュールを使用してロックを実装し、排他制御を行います。

Lockオブジェクトを生成し、acquire()メソッドでロックを取得し、release()メソッドでロックを解放することで、スレッド間の競合を防ぎます。

これにより、スレッドセーフなプログラムを実現できます。

この記事でわかること
  • マルチスレッドの基本とその利点
  • Pythonのロック機構(Lock、RLock、Condition、Semaphore)の使い方
  • デッドロックを避けるためのベストプラクティス
  • マルチスレッドを活用したWebスクレイピングやデータベースアクセスの方法
  • マルチスレッドとマルチプロセスの違いについての理解

目次から探す

マルチスレッドと排他制御の基本

マルチスレッドとは

マルチスレッドとは、1つのプロセス内で複数のスレッドを同時に実行する技術です。

これにより、CPUのリソースを効率的に利用し、プログラムの応答性を向上させることができます。

特にI/O操作や待機時間が発生する処理において、他のスレッドが実行されることで全体の処理時間を短縮できます。

排他制御の必要性

排他制御は、複数のスレッドが同時に共有リソースにアクセスする際に、データの整合性を保つために必要です。

以下のような状況で排他制御が重要になります。

スクロールできます
状況説明
共有データの更新複数のスレッドが同時にデータを更新すると、
データが不整合になる可能性があります。
リソースの競合同じリソースにアクセスするスレッドがある場合、
競合が発生し、エラーが生じることがあります。
デッドロックの回避複数のスレッドが互いにロックを待つ状態になると、
プログラムが停止することがあります。

Pythonにおけるスレッドの基本操作

Pythonでは、threadingモジュールを使用してスレッドを操作します。

基本的なスレッドの作成と実行は以下のように行います。

import threading
import time
def thread_function(name):
    print(f"スレッド {name} が開始されました。")
    time.sleep(2)
    print(f"スレッド {name} が終了しました。")
# スレッドの作成
thread = threading.Thread(target=thread_function, args=("A",))
# スレッドの開始
thread.start()
# メインスレッドの処理
print("メインスレッドの処理を実行中...")
# スレッドの終了を待つ
thread.join()
print("全てのスレッドが終了しました。")

このコードでは、スレッドを作成し、開始した後、メインスレッドが他の処理を行い、最後にスレッドの終了を待っています。

スレッド A が開始されました。
メインスレッドの処理を実行中...
スレッド A が終了しました。
全てのスレッドが終了しました。

GIL(Global Interpreter Lock)とは

GIL(グローバルインタプリタロック)は、Pythonのインタプリタが同時に複数のスレッドを実行することを制限する仕組みです。

GILにより、Pythonのスレッドは同時に1つのスレッドしか実行できず、CPUバウンドな処理においてはマルチスレッドの効果が薄れることがあります。

しかし、I/Oバウンドな処理では、GILの影響を受けにくく、マルチスレッドの利点を活かすことができます。

Pythonでのロック機構

threadingモジュールの紹介

Pythonのthreadingモジュールは、マルチスレッドプログラミングをサポートするための機能を提供します。

このモジュールには、スレッドの作成や管理、排他制御のためのロック機構が含まれています。

特に、データの整合性を保つためにロックを使用することが重要です。

以下に、threadingモジュールで利用できる主要なロック機構を紹介します。

Lockオブジェクトの使い方

Lockオブジェクトは、最も基本的なロック機構で、スレッドが共有リソースにアクセスする際に排他制御を行います。

以下は、Lockオブジェクトの使用例です。

import threading
import time
# 共有リソース
shared_counter = 0
lock = threading.Lock()
def increment_counter():
    global shared_counter
    for _ in range(100000):
        lock.acquire()  # ロックを取得
        shared_counter += 1
        lock.release()  # ロックを解放
# スレッドの作成
threads = [threading.Thread(target=increment_counter) for _ in range(2)]
# スレッドの開始
for thread in threads:
    thread.start()
# スレッドの終了を待つ
for thread in threads:
    thread.join()
print(f"最終カウンタの値: {shared_counter}")

このコードでは、2つのスレッドが同時にカウンタをインクリメントしますが、Lockを使用することでデータの整合性が保たれます。

RLock(再帰ロック)の使い方

RLockは、同じスレッドが複数回ロックを取得できる再帰的なロックです。

以下は、RLockの使用例です。

import threading
rlock = threading.RLock()
def recursive_function(n):
    if n > 0:
        rlock.acquire()  # ロックを取得
        print(f"再帰呼び出し: {n}")
        recursive_function(n - 1)
        rlock.release()  # ロックを解放
# スレッドの作成
thread = threading.Thread(target=recursive_function, args=(3,))
# スレッドの開始
thread.start()
thread.join()

このコードでは、再帰的に呼び出される関数内でRLockを使用しています。

同じスレッドがロックを複数回取得できるため、デッドロックを避けることができます。

Conditionオブジェクトの使い方

Conditionオブジェクトは、スレッド間の通知を行うための機能を提供します。

特定の条件が満たされるまでスレッドを待機させることができます。

以下は、Conditionの使用例です。

import threading
import time
condition = threading.Condition()
data_ready = False
def producer():
    global data_ready
    time.sleep(2)  # データの準備
    with condition:
        data_ready = True
        condition.notify()  # 消費者に通知
def consumer():
    with condition:
        while not data_ready:
            condition.wait()  # データが準備されるまで待機
        print("データが準備されました。")
# スレッドの作成
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
# スレッドの開始
consumer_thread.start()
producer_thread.start()
# スレッドの終了を待つ
producer_thread.join()
consumer_thread.join()

このコードでは、producerスレッドがデータを準備し、consumerスレッドがそのデータを待機します。

Conditionを使用することで、効率的なスレッド間の通信が可能になります。

SemaphoreとBoundedSemaphoreの使い方

Semaphoreは、特定の数のスレッドが同時にリソースにアクセスできるように制御するためのロック機構です。

BoundedSemaphoreは、最大数を超えないように制限されたセマフォです。

以下は、Semaphoreの使用例です。

import threading
import time
semaphore = threading.Semaphore(2)  # 同時に2つのスレッドがアクセス可能
def access_resource(thread_id):
    with semaphore:
        print(f"スレッド {thread_id} がリソースにアクセス中...")
        time.sleep(2)  # リソースの使用
        print(f"スレッド {thread_id} がリソースを解放しました。")
# スレッドの作成
threads = [threading.Thread(target=access_resource, args=(i,)) for i in range(5)]
# スレッドの開始
for thread in threads:
    thread.start()
# スレッドの終了を待つ
for thread in threads:
    thread.join()

このコードでは、最大2つのスレッドが同時にリソースにアクセスできるようにSemaphoreを使用しています。

これにより、リソースの競合を防ぎつつ、効率的にスレッドを管理できます。

ロックを使った排他制御の実装例

単純なカウンタの例

以下は、Lockを使用して複数のスレッドが同時にカウンタをインクリメントする例です。

この例では、ロックを使うことでデータの整合性を保っています。

import threading
# 共有カウンタ
counter = 0
lock = threading.Lock()
def increment_counter():
    global counter
    for _ in range(100000):
        lock.acquire()  # ロックを取得
        counter += 1
        lock.release()  # ロックを解放
# スレッドの作成
threads = [threading.Thread(target=increment_counter) for _ in range(2)]
# スレッドの開始
for thread in threads:
    thread.start()
# スレッドの終了を待つ
for thread in threads:
    thread.join()
print(f"最終カウンタの値: {counter}")

このコードを実行すると、最終的なカウンタの値は200000になります。

ロックを使用することで、同時にカウンタが更新されることを防ぎ、正しい結果を得ることができます。

共有リソースの保護

共有リソースに対するアクセスを制御するために、Lockを使用してリソースを保護する例です。

以下のコードでは、複数のスレッドが同じリストにデータを追加します。

import threading
# 共有リスト
shared_list = []
lock = threading.Lock()
def add_to_list(item):
    with lock:  # ロックを取得
        shared_list.append(item)
# スレッドの作成
threads = [threading.Thread(target=add_to_list, args=(i,)) for i in range(10)]
# スレッドの開始
for thread in threads:
    thread.start()
# スレッドの終了を待つ
for thread in threads:
    thread.join()
print(f"共有リストの内容: {shared_list}")

このコードでは、10個のスレッドが同時にshared_listにアイテムを追加します。

ロックを使用することで、リストへの同時アクセスを防ぎ、正しい結果を得ることができます。

デッドロックの回避方法

デッドロックは、複数のスレッドが互いにロックを待ち続ける状態です。

以下は、デッドロックを避けるための方法を示す例です。

import threading
import time

lock1 = threading.Lock()
lock2 = threading.Lock()

def thread1_function():
    with lock1:
        print("スレッド1がロック1を取得")
        time.sleep(1)  # 少し待機
        with lock2:
            print("スレッド1がロック2を取得")

def thread2_function():
    with lock1:
        print("スレッド2がロック1を取得")
        time.sleep(1)  # 少し待機
        with lock2:
            print("スレッド2がロック2を取得")

# スレッドの作成
thread1 = threading.Thread(target=thread1_function)
thread2 = threading.Thread(target=thread2_function)

# スレッドの開始
thread1.start()
thread2.start()

# スレッドの終了を待つ
thread1.join()
thread2.join()

このコードはデッドロックを正しく回避します。

デッドロックを避けるためには、ロックを取得する順序を統一することが重要です。

例えば、常にlock1を先に取得し、その後にlock2を取得するようにします。

タイムアウト付きロックの使用例

Lockオブジェクトには、タイムアウトを指定してロックを取得することができます。

以下は、タイムアウト付きロックの使用例です。

import threading
import time
lock = threading.Lock()
def try_lock_with_timeout():
    if lock.acquire(timeout=1):  # 1秒のタイムアウト
        try:
            print("ロックを取得しました。")
            time.sleep(2)  # ロックを保持
        finally:
            lock.release()  # ロックを解放
            print("ロックを解放しました。")
    else:
        print("ロックを取得できませんでした。")
# スレッドの作成
threads = [threading.Thread(target=try_lock_with_timeout) for _ in range(3)]
# スレッドの開始
for thread in threads:
    thread.start()
# スレッドの終了を待つ
for thread in threads:
    thread.join()

このコードでは、3つのスレッドが同時にロックを取得しようとしますが、タイムアウトを設定しているため、ロックを取得できない場合はメッセージを表示します。

これにより、スレッドが無限に待機することを防ぎます。

応用例

マルチスレッドでのWebスクレイピング

マルチスレッドを使用することで、複数のWebページを同時にスクレイピングし、処理時間を短縮できます。

以下は、requeststhreadingを使用したWebスクレイピングの例です。

import threading
import requests
urls = [
    "https://example.com/page1",
    "https://example.com/page2",
    "https://example.com/page3",
]
def fetch_url(url):
    response = requests.get(url)
    print(f"{url} のステータスコード: {response.status_code}")
# スレッドの作成
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
# スレッドの開始
for thread in threads:
    thread.start()
# スレッドの終了を待つ
for thread in threads:
    thread.join()

このコードでは、指定したURLに対して同時にリクエストを送信し、各ページのステータスコードを表示します。

これにより、複数のページを効率的に取得できます。

データベースアクセスの同期

データベースに対する同時アクセスを制御するために、ロックを使用して排他制御を行います。

以下は、SQLiteデータベースに対するアクセスを同期する例です。

import sqlite3
import threading
# データベース接続
connection = sqlite3.connect("example.db")
lock = threading.Lock()
def insert_data(value):
    with lock:  # ロックを取得
        cursor = connection.cursor()
        cursor.execute("INSERT INTO data (value) VALUES (?)", (value,))
        connection.commit()
# スレッドの作成
threads = [threading.Thread(target=insert_data, args=(i,)) for i in range(10)]
# スレッドの開始
for thread in threads:
    thread.start()
# スレッドの終了を待つ
for thread in threads:
    thread.join()
# データベース接続を閉じる
connection.close()

このコードでは、10個のスレッドが同時にデータベースにデータを挿入しますが、ロックを使用することでデータの整合性を保っています。

ファイル操作の同期

ファイルへの同時アクセスを制御するために、ロックを使用して排他制御を行います。

以下は、複数のスレッドが同じファイルに書き込む例です。

import threading
lock = threading.Lock()
def write_to_file(filename, data):
    with lock:  # ロックを取得
        with open(filename, "a") as f:
            f.write(data + "\n")
# スレッドの作成
threads = [threading.Thread(target=write_to_file, args=("output.txt", f"データ {i}")) for i in range(5)]
# スレッドの開始
for thread in threads:
    thread.start()
# スレッドの終了を待つ
for thread in threads:
    thread.join()

このコードでは、5つのスレッドが同時にoutput.txtファイルにデータを書き込みます。

ロックを使用することで、ファイルへの同時書き込みを防ぎ、データの整合性を保っています。

マルチスレッドでの並列計算

マルチスレッドを使用して、計算を並列に実行することができます。

以下は、複数のスレッドを使用してフィボナッチ数を計算する例です。

import threading
def fibonacci(n, results, index):
    if n <= 1:
        results[index] = n
    else:
        results[index] = fibonacci(n - 1, results, index - 1) + fibonacci(n - 2, results, index - 2)
# 計算するフィボナッチ数のリスト
numbers = [5, 7, 10]
results = [0] * len(numbers)
# スレッドの作成
threads = [threading.Thread(target=fibonacci, args=(num, results, i)) for i, num in enumerate(numbers)]
# スレッドの開始
for thread in threads:
    thread.start()
# スレッドの終了を待つ
for thread in threads:
    thread.join()
print(f"フィボナッチ数: {results}")

このコードでは、3つのスレッドがそれぞれ異なるフィボナッチ数を計算します。

スレッドを使用することで、計算を並列に実行し、全体の処理時間を短縮できます。

よくある質問

GILがあるのにマルチスレッドを使う意味は?

GIL(グローバルインタプリタロック)は、Pythonのスレッドが同時に実行されることを制限しますが、I/Oバウンドな処理(例えば、ネットワーク通信やファイル操作)では、スレッドが待機している間に他のスレッドが実行されるため、マルチスレッドを使用することで全体の処理時間を短縮できます。

また、スレッドを使用することで、プログラムの応答性を向上させることができます。

デッドロックを避けるためのベストプラクティスは?

デッドロックを避けるためには、以下のベストプラクティスを考慮することが重要です。

  • ロックの取得順序を統一する: 複数のロックを使用する場合、常に同じ順序でロックを取得するようにします。
  • タイムアウトを設定する: ロックを取得する際にタイムアウトを設定し、一定時間内にロックを取得できない場合は処理を中断します。
  • ロックの使用を最小限にする: 必要な範囲でのみロックを使用し、ロックを保持する時間を短くします。

マルチスレッドとマルチプロセスの違いは?

マルチスレッドは、同一プロセス内で複数のスレッドを使用して並行処理を行う方法です。

一方、マルチプロセスは、複数のプロセスを立ち上げて並行処理を行う方法です。

マルチスレッドはメモリを共有するため、データの共有が容易ですが、GILの影響を受けます。

マルチプロセスは、各プロセスが独立したメモリ空間を持つため、GILの影響を受けずにCPUバウンドな処理を効率的に行えますが、プロセス間のデータ共有が難しくなります。

まとめ

この記事では、Pythonにおけるマルチスレッドと排他制御の基本から応用例までを解説しました。

特に、ロック機構を使用したデータの整合性の保ち方や、デッドロックを避けるための方法についても触れました。

これを機に、マルチスレッドを活用したプログラミングに挑戦してみてください。

当サイトはリンクフリーです。出典元を明記していただければ、ご自由に引用していただいて構いません。

関連カテゴリーから探す

  • URLをコピーしました!
目次から探す