【Python】マルチスレッドでロックして排他制御する

Pythonでマルチスレッドプログラミングを行う際、複数のスレッドが同時に共有リソースにアクセスすることで競合状態が発生することがあります。

この記事では、threadingモジュールを使ってスレッドを作成し、ロックを使って競合状態を防ぐ方法について解説します。

また、デッドロックの回避方法や高度な排他制御のテクニックについても紹介します。

初心者の方でも理解しやすいように、サンプルコードとその実行結果を交えながら説明していきますので、ぜひ参考にしてください。

目次から探す

Pythonでのマルチスレッドの実装

Pythonでマルチスレッドを実装するためには、標準ライブラリのthreadingモジュールを使用します。

このモジュールを使うことで、複数のスレッドを簡単に作成し、並行して実行することができます。

以下では、threadingモジュールの基本的な使い方について解説します。

threadingモジュールの紹介

threadingモジュールは、Pythonでスレッドを扱うための標準ライブラリです。

このモジュールを使うことで、スレッドの作成、管理、同期を行うことができます。

threadingモジュールには、スレッドを作成するためのThreadクラスや、スレッド間の排他制御を行うためのLockクラスなどが含まれています。

基本的なスレッドの作成と実行

スレッドを作成して実行するためには、まずThreadクラスをインポートし、スレッドのターゲットとなる関数を定義します。

その後、Threadクラスのインスタンスを作成し、startメソッドを呼び出すことでスレッドを開始します。

スレッドの作成方法

スレッドを作成するためには、Threadクラスのインスタンスを生成します。

以下に、スレッドのターゲットとなる関数と、スレッドの作成方法を示します。

import threading
# スレッドのターゲットとなる関数
def print_numbers():
    for i in range(5):
        print(i)
# Threadクラスのインスタンスを作成
thread = threading.Thread(target=print_numbers)

上記のコードでは、print_numbers関数をターゲットとしてスレッドを作成しています。

スレッドの開始と終了

スレッドを開始するためには、Threadクラスstartメソッドを呼び出します。

スレッドが終了するのを待つためには、joinメソッドを使用します。

以下に、スレッドの開始と終了の例を示します。

import threading
# スレッドのターゲットとなる関数
def print_numbers():
    for i in range(5):
        print(i)
# Threadクラスのインスタンスを作成
thread = threading.Thread(target=print_numbers)
# スレッドを開始
thread.start()
# スレッドの終了を待つ
thread.join()
print("スレッドが終了しました")

上記のコードでは、print_numbers関数をターゲットとするスレッドを作成し、startメソッドでスレッドを開始しています。

joinメソッドを呼び出すことで、スレッドが終了するのを待ち、スレッドが終了した後に「スレッドが終了しました」というメッセージを表示します。

このようにして、threadingモジュールを使って簡単にスレッドを作成し、実行することができます。

次のセクションでは、スレッド間の競合状態を防ぐための排他制御について解説します。

排他制御の必要性

競合状態とは

マルチスレッドプログラミングにおいて、複数のスレッドが同時に共有リソースにアクセスすることがあります。

このとき、スレッド間でリソースの状態が競合し、予期しない動作やデータの不整合が発生することを「競合状態」と呼びます。

例えば、以下のようなシンプルな例を考えてみましょう。

2つのスレッドが同時に1つの変数をインクリメントする場合、競合状態が発生する可能性があります。

import threading
# 共有リソース
counter = 0
def increment():
    global counter
    for _ in range(100000):
        counter += 1
# スレッドの作成
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=increment)
# スレッドの開始
thread1.start()
thread2.start()
# スレッドの終了を待つ
thread1.join()
thread2.join()
print(f"最終的なカウンターの値: {counter}")

このコードでは、counter変数を2つのスレッドが同時にインクリメントしています。

しかし、実行するたびに異なる結果が得られることがあります。

これは、スレッドが同時にcounter変数にアクセスし、競合状態が発生しているためです。

排他制御の重要性

競合状態を防ぐためには、スレッドが共有リソースにアクセスする際に「排他制御」を行う必要があります。

排他制御とは、あるスレッドがリソースにアクセスしている間、他のスレッドがそのリソースにアクセスできないようにする仕組みです。

Pythonでは、threadingモジュールのLockオブジェクトを使用して排他制御を実現できます。

Lockオブジェクトを使うことで、あるスレッドがリソースにアクセスしている間、他のスレッドがそのリソースにアクセスするのを防ぐことができます。

以下は、先ほどの例にLockオブジェクトを追加して競合状態を防ぐ方法です。

import threading
# 共有リソース
counter = 0
lock = threading.Lock()
def increment():
    global counter
    for _ in range(100000):
        # ロックを取得
        lock.acquire()
        try:
            counter += 1
        finally:
            # ロックを解放
            lock.release()
# スレッドの作成
thread1 = threading.Thread(target=increment)
thread2 = threading.Thread(target=increment)
# スレッドの開始
thread1.start()
thread2.start()
# スレッドの終了を待つ
thread1.join()
thread2.join()
print(f"最終的なカウンターの値: {counter}")

このコードでは、increment関数内でlock.acquire()を呼び出してロックを取得し、counter変数にアクセスする前にロックを取得しています。

counter変数へのアクセスが終わったら、lock.release()を呼び出してロックを解放します。

これにより、競合状態を防ぎ、正しい結果が得られるようになります。

排他制御は、マルチスレッドプログラミングにおいて非常に重要な概念です。

適切に排他制御を行うことで、スレッド間の競合状態を防ぎ、プログラムの正確性と安定性を確保することができます。

ロックを使った排他制御

ロックの基本概念

マルチスレッドプログラミングにおいて、複数のスレッドが同時に共有リソースにアクセスすることが原因で発生する競合状態を防ぐために、ロック(Lock)を使用します。

ロックは、あるスレッドがリソースを使用している間、他のスレッドがそのリソースにアクセスできないようにする仕組みです。

これにより、データの整合性を保つことができます。

threading.Lockの使い方

Pythonでは、threadingモジュールを使用してロックを実装します。

threading.Lockクラスを使うことで、簡単にロックを作成し、排他制御を行うことができます。

ロックの取得と解放

ロックの基本的な操作は、ロックの取得(acquire)と解放(release)です。

以下に、ロックの取得と解放の基本的な使い方を示します。

import threading
# ロックオブジェクトの作成
lock = threading.Lock()
def critical_section():
    # ロックの取得
    lock.acquire()
    try:
        # クリティカルセクション(共有リソースへのアクセス)
        print("クリティカルセクションに入ります")
    finally:
        # ロックの解放
        lock.release()
# スレッドの作成
thread1 = threading.Thread(target=critical_section)
thread2 = threading.Thread(target=critical_section)
# スレッドの開始
thread1.start()
thread2.start()
# スレッドの終了を待つ
thread1.join()
thread2.join()

この例では、lock.acquire()でロックを取得し、lock.release()でロックを解放しています。

tryブロックを使用することで、クリティカルセクション内で例外が発生しても必ずロックが解放されるようにしています。

withステートメントを使ったロックの管理

Pythonでは、withステートメントを使用してロックを管理することができます。

withステートメントを使うことで、ロックの取得と解放を自動的に行うことができ、コードがより簡潔になります。

import threading
# ロックオブジェクトの作成
lock = threading.Lock()
def critical_section():
    # withステートメントを使ったロックの取得と解放
    with lock:
        # クリティカルセクション(共有リソースへのアクセス)
        print("クリティカルセクションに入ります")
# スレッドの作成
thread1 = threading.Thread(target=critical_section)
thread2 = threading.Thread(target=critical_section)
# スレッドの開始
thread1.start()
thread2.start()
# スレッドの終了を待つ
thread1.join()
thread2.join()

この例では、with lockを使用することで、lock.acquire()lock.release()を明示的に呼び出す必要がなくなります。

withステートメントのブロックを抜けるときに自動的にロックが解放されるため、コードがより安全で読みやすくなります。

以上が、ロックを使った排他制御の基本的な使い方です。

次に、ロックを使った実装例を見ていきましょう。

ロックの実装例

競合状態が発生するコード例

まず、競合状態が発生するコードの例を見てみましょう。

以下のコードは、複数のスレッドが同時に共有リソース(ここではカウンタ)にアクセスすることで競合状態が発生する例です。

import threading
# 共有リソース
counter = 0
def increment_counter():
    global counter
    for _ in range(100000):
        counter += 1
# スレッドの作成
threads = []
for _ in range(10):
    thread = threading.Thread(target=increment_counter)
    threads.append(thread)
    thread.start()
# スレッドの終了を待つ
for thread in threads:
    thread.join()
print(f"最終的なカウンタの値: {counter}")

このコードでは、10個のスレッドが同時に counter をインクリメントしています。

しかし、スレッドが同時に counter にアクセスするため、最終的なカウンタの値は期待通りの100万(10スレッド × 100000回)にはなりません。

これは、競合状態が発生しているためです。

ロックを使った競合状態の解決

競合状態を解決するためには、スレッドが共有リソースにアクセスする際にロックを使用して排他制御を行う必要があります。

以下のコードでは、threading.Lock を使用して競合状態を解決しています。

ロックを使ったコードの修正

import threading
# 共有リソース
counter = 0
# ロックオブジェクトの作成
lock = threading.Lock()
def increment_counter():
    global counter
    for _ in range(100000):
        # ロックを取得
        lock.acquire()
        try:
            counter += 1
        finally:
            # ロックを解放
            lock.release()
# スレッドの作成
threads = []
for _ in range(10):
    thread = threading.Thread(target=increment_counter)
    threads.append(thread)
    thread.start()
# スレッドの終了を待つ
for thread in threads:
    thread.join()
print(f"最終的なカウンタの値: {counter}")

この修正されたコードでは、counter にアクセスする前にロックを取得し、アクセスが終わったらロックを解放しています。

これにより、同時に複数のスレッドが counter にアクセスすることが防がれ、競合状態が解消されます。

修正後のコードの動作確認

修正後のコードを実行すると、最終的なカウンタの値は期待通りの100万になります。

最終的なカウンタの値: 1000000

このように、ロックを使用することで競合状態を防ぎ、スレッドセーフなコードを実現することができます。

ロックを適切に使用することで、複数のスレッドが同時に共有リソースにアクセスする際の問題を解決することができます。

デッドロックとその回避方法

デッドロックの概念

デッドロックとは、複数のスレッドが互いに相手のリソースを待ち続ける状態のことを指します。

この状態になると、スレッドは永久に待ち続けることになり、プログラムが停止してしまいます。

デッドロックは、特に複雑なマルチスレッドプログラムで発生しやすく、注意が必要です。

デッドロックの発生条件

デッドロックが発生するためには、以下の4つの条件がすべて満たされる必要があります。

  1. 相互排他: リソースは排他的に使用される。

つまり、一度に一つのスレッドしかリソースを使用できない。

  1. 保持と待機: スレッドがすでにリソースを保持している状態で、他のリソースを待機する。
  2. 非奪取: リソースは強制的に奪取されない。

リソースは自発的に解放されるまで保持される。

  1. 循環待機: スレッドの循環チェーンが存在し、各スレッドが次のスレッドが保持するリソースを待機する。

デッドロックを回避する方法

デッドロックを回避するためには、上記の条件のいずれかを破る必要があります。

以下に、具体的な回避方法を紹介します。

タイムアウトを設定する

タイムアウトを設定することで、スレッドが一定時間内にリソースを取得できなかった場合に待機を中断することができます。

Pythonのthreading.Lockオブジェクトには、acquireメソッドにタイムアウトを設定するオプションがあります。

import threading
import time
lock1 = threading.Lock()
lock2 = threading.Lock()
def thread1():
    while True:
        if lock1.acquire(timeout=1):
            print("Thread 1 acquired lock1")
            time.sleep(0.5)
            if lock2.acquire(timeout=1):
                print("Thread 1 acquired lock2")
                lock2.release()
            lock1.release()
            break
        else:
            print("Thread 1 could not acquire lock1, retrying...")
def thread2():
    while True:
        if lock2.acquire(timeout=1):
            print("Thread 2 acquired lock2")
            time.sleep(0.5)
            if lock1.acquire(timeout=1):
                print("Thread 2 acquired lock1")
                lock1.release()
            lock2.release()
            break
        else:
            print("Thread 2 could not acquire lock2, retrying...")
t1 = threading.Thread(target=thread1)
t2 = threading.Thread(target=thread2)
t1.start()
t2.start()
t1.join()
t2.join()

この例では、各スレッドがリソースを取得できなかった場合にリトライするようにしています。

これにより、デッドロックの発生を防ぎます。

順序を決めてロックを取得する

デッドロックを回避するもう一つの方法は、ロックを取得する順序を決めることです。

すべてのスレッドが同じ順序でロックを取得するようにすれば、循環待機の条件を破ることができます。

import threading
import time
lock1 = threading.Lock()
lock2 = threading.Lock()
def thread1():
    with lock1:
        print("Thread 1 acquired lock1")
        time.sleep(0.5)
        with lock2:
            print("Thread 1 acquired lock2")
def thread2():
    with lock1:
        print("Thread 2 acquired lock1")
        time.sleep(0.5)
        with lock2:
            print("Thread 2 acquired lock2")
t1 = threading.Thread(target=thread1)
t2 = threading.Thread(target=thread2)
t1.start()
t2.start()
t1.join()
t2.join()

この例では、両方のスレッドが同じ順序でロックを取得するようにしています。

これにより、デッドロックの発生を防ぐことができます。

デッドロックはマルチスレッドプログラミングにおいて避けるべき重要な問題です。

タイムアウトの設定やロックの取得順序を工夫することで、デッドロックを回避し、安定したプログラムを作成することができます。

高度な排他制御

RLock(再帰ロック)の紹介

RLock(再帰ロック)は、同じスレッドが複数回ロックを取得できるようにするためのロックです。

通常のロック(Lock)では、同じスレッドが再度ロックを取得しようとするとデッドロックが発生しますが、RLockを使用することでこれを防ぐことができます。

RLockの使用例

以下は、RLockを使用した簡単な例です。

import threading
rlock = threading.RLock()
def recursive_function(n):
    rlock.acquire()
    try:
        print(f"Thread {threading.current_thread().name} acquired lock, n={n}")
        if n > 0:
            recursive_function(n - 1)
    finally:
        print(f"Thread {threading.current_thread().name} releasing lock, n={n}")
        rlock.release()
thread = threading.Thread(target=recursive_function, args=(3,))
thread.start()
thread.join()

このコードでは、recursive_functionが再帰的に呼び出されるたびに同じスレッドがロックを取得し、最終的にすべてのロックを解放します。

Conditionオブジェクトの利用

Conditionオブジェクトは、スレッド間の通信を容易にするための同期プリミティブです。

特定の条件が満たされるまでスレッドを待機させることができます。

Conditionオブジェクトは、内部的にロックを使用しており、waitnotifynotify_allメソッドを提供します。

Conditionの使用例

以下は、Conditionオブジェクトを使用した生産者-消費者問題の例です。

import threading
import time
condition = threading.Condition()
queue = []
def producer():
    global queue
    for i in range(5):
        time.sleep(1)
        condition.acquire()
        queue.append(i)
        print(f"Produced {i}")
        condition.notify()
        condition.release()
def consumer():
    global queue
    for i in range(5):
        condition.acquire()
        while not queue:
            condition.wait()
        item = queue.pop(0)
        print(f"Consumed {item}")
        condition.release()
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()

このコードでは、producerスレッドがアイテムを生成し、consumerスレッドがアイテムを消費します。

Conditionオブジェクトを使用して、消費者がアイテムを待機し、生成者がアイテムを生成したときに通知します。

SemaphoreとBoundedSemaphoreの利用

Semaphoreは、特定のリソースに対するアクセスを制限するための同期プリミティブです。

Semaphoreはカウンタを持ち、カウンタがゼロになるとスレッドは待機します。

BoundedSemaphoreは、Semaphoreと同様ですが、カウンタが初期値を超えないように制限されます。

Semaphoreの使用例

以下は、Semaphoreを使用した例です。

import threading
import time
semaphore = threading.Semaphore(2)
def worker(num):
    semaphore.acquire()
    print(f"Worker {num} acquired semaphore")
    time.sleep(2)
    print(f"Worker {num} releasing semaphore")
    semaphore.release()
threads = []
for i in range(4):
    thread = threading.Thread(target=worker, args=(i,))
    threads.append(thread)
    thread.start()
for thread in threads:
    thread.join()

このコードでは、4つのスレッドが同時に実行されますが、Semaphoreによって同時に2つのスレッドしかリソースにアクセスできません。

BoundedSemaphoreの使用例

以下は、BoundedSemaphoreを使用した例です。

import threading
import time
bounded_semaphore = threading.BoundedSemaphore(2)
def worker(num):
    bounded_semaphore.acquire()
    print(f"Worker {num} acquired bounded semaphore")
    time.sleep(2)
    print(f"Worker {num} releasing bounded semaphore")
    bounded_semaphore.release()
threads = []
for i in range(4):
    thread = threading.Thread(target=worker, args=(i,))
    threads.append(thread)
    thread.start()
for thread in threads:
    thread.join()

このコードは、Semaphoreの例と同様に動作しますが、BoundedSemaphoreを使用することで、カウンタが初期値を超えないように制限されます。

これにより、リソースの過剰な使用を防ぐことができます。

以上が、Pythonでの高度な排他制御の方法です。

これらの同期プリミティブを適切に使用することで、スレッド間の競合を防ぎ、安全で効率的なマルチスレッドプログラムを作成することができます。

目次から探す