[Python] bleakライブラリの使い方 – BLE通信を行う方法
bleak
はPythonでBluetooth Low Energy(BLE)通信を行うためのライブラリです。
BLEデバイスに接続し、データの読み書きや通知の受信が可能です。
基本的な使い方として、まずBleakClient
を使用してデバイスに接続します。
接続後、read_gatt_char
で特定のキャラクタリスティックからデータを読み取り、write_gatt_char
でデータを書き込むことができます。
また、start_notify
を使って通知を受け取ることも可能です。
bleakライブラリとは
bleakは、PythonでBluetooth Low Energy(BLE)デバイスと通信するためのライブラリです。
このライブラリは、非同期プログラミングをサポートしており、BLEデバイスとの接続やデータの送受信を効率的に行うことができます。
bleakは、Windows、macOS、Linuxの各プラットフォームで動作し、クロスプラットフォームなアプリケーションの開発を可能にします。
bleakを使用することで、BLEデバイスのスキャン、接続、キャラクタリスティックの読み書き、通知の受信など、BLE通信に必要な機能を簡単に実装できます。
特に、非同期処理を活用することで、複数のデバイスとの同時通信や、UIの応答性を保ちながらのデータ処理が可能です。
これにより、IoTデバイスやセンサーとの連携がスムーズに行えるため、さまざまなアプリケーションでの利用が期待されています。
BLE通信の基本
BLEとは何か
Bluetooth Low Energy(BLE)は、低消費電力で短距離の無線通信を実現するための技術です。
従来のBluetoothに比べて、データ転送速度は遅いものの、消費電力が非常に低いため、バッテリー駆動のデバイスに最適です。
BLEは、IoTデバイスやウェアラブルデバイス、スマートホーム機器など、さまざまな分野で広く利用されています。
BLEの基本的な概念(デバイス、サービス、キャラクタリスティック)
BLE通信の基本的な構成要素は以下の通りです。
概念 | 説明 |
---|---|
デバイス | BLE通信を行う機器。センサーやスマートフォン、ウェアラブルデバイスなど。 |
サービス | デバイスが提供する機能の集合。特定の機能を持つキャラクタリスティックを含む。 |
キャラクタリスティック | サービス内のデータの単位。データの読み書きや通知を行うためのインターフェース。 |
これらの要素を組み合わせることで、BLEデバイスは特定の機能を提供し、他のデバイスと通信します。
BLE通信の流れ
BLE通信は、以下のような流れで行われます。
- デバイスのスキャン: BLEデバイスが周囲のデバイスをスキャンし、接続可能なデバイスを探します。
- 接続: スキャンしたデバイスの中から、接続したいデバイスを選択し、接続を行います。
- サービスの探索: 接続後、デバイスが提供するサービスを探索し、利用可能なキャラクタリスティックを確認します。
- データの読み書き: 必要に応じて、キャラクタリスティックからデータを読み取ったり、データを書き込んだりします。
- 通知の受信: デバイスからの通知を受信し、リアルタイムでデータの変化を把握します。
- 切断: 通信が終了したら、接続を切断します。
この流れを通じて、BLEデバイス同士が効率的にデータをやり取りすることができます。
bleakを使ったBLEデバイスのスキャン
デバイススキャンの基本
BLEデバイスのスキャンは、周囲に存在するBLEデバイスを検出するためのプロセスです。
bleakライブラリを使用すると、簡単にデバイスのスキャンを行うことができます。
スキャンを開始すると、周囲のBLEデバイスが検出され、その情報が取得されます。
スキャンは非同期で行われるため、他の処理をブロックすることなく実行できます。
スキャン結果の取得方法
以下は、bleakを使用してBLEデバイスをスキャンし、結果を取得するサンプルコードです。
import asyncio
from bleak import BleakScanner
async def scan_devices():
devices = await BleakScanner.discover()
for device in devices:
print(f"デバイス名: {device.name}, アドレス: {device.address}")
# スキャンを実行
asyncio.run(scan_devices())
このコードを実行すると、周囲のBLEデバイスの名前とアドレスが表示されます。
デバイス名: Device1, アドレス: 00:11:22:33:44:55
デバイス名: Device2, アドレス: 66:77:88:99:AA:BB
フィルタリングによる特定デバイスの検索
スキャン結果から特定のデバイスをフィルタリングすることも可能です。
例えば、特定のデバイス名やアドレスを持つデバイスのみを表示することができます。
以下は、デバイス名でフィルタリングするサンプルコードです。
import asyncio
from bleak import BleakScanner
async def scan_specific_device(target_name):
devices = await BleakScanner.discover()
for device in devices:
if device.name == target_name:
print(f"特定デバイス: {device.name}, アドレス: {device.address}")
# 特定のデバイス名を指定してスキャン
asyncio.run(scan_specific_device("Device1"))
このコードを実行すると、指定したデバイス名を持つデバイスのみが表示されます。
特定デバイス: Device1, アドレス: 00:11:22:33:44:55
このように、bleakを使用することで、BLEデバイスのスキャンと特定デバイスの検索が簡単に行えます。
BLEデバイスへの接続
BleakClientの使い方
bleakライブラリでは、BLEデバイスに接続するためにBleakClientクラス
を使用します。
このクラスを利用することで、デバイスとの通信を簡単に行うことができます。
BleakClient
は非同期で動作し、接続、データの読み書き、通知の受信などの機能を提供します。
以下は、BleakClient
を使用するための基本的なサンプルコードです。
import asyncio
from bleak import BleakClient
async def connect_to_device(address):
async with BleakClient(address) as client:
print(f"{client.address} に接続しました。")
# ここでデータの読み書きや通知の設定を行うことができます。
# デバイスのアドレスを指定して接続
asyncio.run(connect_to_device("00:11:22:33:44:55"))
このコードを実行すると、指定したアドレスのデバイスに接続されます。
デバイスへの接続と切断
BleakClient
を使用してデバイスに接続する際、async with
構文を使用することで、接続が自動的に管理されます。
接続が終了すると、自動的に切断されます。
手動で接続と切断を行いたい場合は、以下のように記述できます。
import asyncio
from bleak import BleakClient
async def manual_connect(address):
client = BleakClient(address)
await client.connect()
print(f"{client.address} に接続しました。")
# データの読み書きや通知の設定を行うことができます。
await client.disconnect()
print(f"{client.address} から切断しました。")
# デバイスのアドレスを指定して接続
asyncio.run(manual_connect("00:11:22:33:44:55"))
このコードでは、接続後に手動で切断を行っています。
接続状態の確認方法
接続状態を確認するには、BleakClient
のis_connected
プロパティを使用します。
このプロパティは、デバイスが接続されているかどうかを真偽値で返します。
以下は、接続状態を確認するサンプルコードです。
import asyncio
from bleak import BleakClient
async def check_connection(address):
async with BleakClient(address) as client:
if client.is_connected:
print(f"{client.address} は接続されています。")
else:
print(f"{client.address} は接続されていません。")
# デバイスのアドレスを指定して接続状態を確認
asyncio.run(check_connection("00:11:22:33:44:55"))
このコードを実行すると、指定したデバイスの接続状態が表示されます。
接続状態を確認することで、デバイスとの通信が正常に行われているかを把握することができます。
キャラクタリスティックの読み書き
キャラクタリスティックとは
キャラクタリスティックは、BLEデバイスが提供するデータの単位であり、特定の機能や情報を表します。
各キャラクタリスティックは、UUID(ユニーク識別子)によって識別され、データの読み取り、書き込み、通知の受信が可能です。
キャラクタリスティックは、特定のサービスに関連付けられており、デバイスの機能を利用するためのインターフェースとなります。
キャラクタリスティックのUUIDの取得方法
キャラクタリスティックのUUIDは、デバイスのサービスを探索することで取得できます。
以下は、接続したデバイスからサービスとキャラクタリスティックのUUIDを取得するサンプルコードです。
import asyncio
from bleak import BleakClient
async def get_characteristics(address):
async with BleakClient(address) as client:
services = await client.get_services()
for service in services:
print(f"サービスUUID: {service.uuid}")
for characteristic in service.characteristics:
print(f" キャラクタリスティックUUID: {characteristic.uuid}")
# デバイスのアドレスを指定してキャラクタリスティックのUUIDを取得
asyncio.run(get_characteristics("00:11:22:33:44:55"))
このコードを実行すると、接続したデバイスのサービスとそのキャラクタリスティックのUUIDが表示されます。
キャラクタリスティックの読み取り(read_gatt_char)
キャラクタリスティックからデータを読み取るには、read_gatt_charメソッド
を使用します。
以下は、特定のキャラクタリスティックからデータを読み取るサンプルコードです。
import asyncio
from bleak import BleakClient
async def read_characteristic(address, characteristic_uuid):
async with BleakClient(address) as client:
data = await client.read_gatt_char(characteristic_uuid)
print(f"キャラクタリスティック {characteristic_uuid} のデータ: {data}")
# デバイスのアドレスとキャラクタリスティックのUUIDを指定してデータを読み取る
asyncio.run(read_characteristic("00:11:22:33:44:55", "00002a37-0000-1000-8000-00805f9b34fb"))
このコードを実行すると、指定したキャラクタリスティックからデータが読み取られ、その内容が表示されます。
キャラクタリスティックへの書き込み(write_gatt_char)
キャラクタリスティックにデータを書き込むには、write_gatt_charメソッド
を使用します。
以下は、特定のキャラクタリスティックにデータを書き込むサンプルコードです。
import asyncio
from bleak import BleakClient
async def write_characteristic(address, characteristic_uuid, data):
async with BleakClient(address) as client:
await client.write_gatt_char(characteristic_uuid, data)
print(f"キャラクタリスティック {characteristic_uuid} にデータを書き込みました。")
# デバイスのアドレス、キャラクタリスティックのUUID、書き込むデータを指定
data_to_write = bytearray([0x01]) # 例: 1バイトのデータ
asyncio.run(write_characteristic("00:11:22:33:44:55", "00002a37-0000-1000-8000-00805f9b34fb", data_to_write))
このコードを実行すると、指定したキャラクタリスティックにデータが書き込まれたことが確認できます。
キャラクタリスティックを通じてデバイスの状態を変更したり、設定を行ったりすることが可能です。
通知の受信
通知機能の概要
BLEデバイスは、特定のキャラクタリスティックに対して通知機能を提供することができます。
通知は、デバイスが新しいデータを持っていることをホストに知らせるためのメカニズムです。
これにより、ホストはデバイスからのデータをリアルタイムで受信することができ、ユーザーに即座に情報を提供することが可能になります。
通知は、デバイスが自発的にデータを送信するため、ポーリング方式に比べて効率的です。
通知の有効化(start_notify)
通知を受信するためには、まず対象のキャラクタリスティックに対して通知を有効化する必要があります。
bleakライブラリでは、start_notifyメソッド
を使用して通知を有効化します。
以下は、通知を有効化するサンプルコードです。
import asyncio
from bleak import BleakClient
def notification_handler(sender, data):
print(f"通知を受信しました。デバイス: {sender}, データ: {data}")
async def enable_notifications(address, characteristic_uuid):
async with BleakClient(address) as client:
await client.start_notify(characteristic_uuid, notification_handler)
print(f"{characteristic_uuid} の通知を有効化しました。")
# 通知を受信するために一定時間待機
await asyncio.sleep(10) # 10秒間通知を受信
# デバイスのアドレスとキャラクタリスティックのUUIDを指定して通知を有効化
asyncio.run(enable_notifications("00:11:22:33:44:55", "00002a37-0000-1000-8000-00805f9b34fb"))
このコードを実行すると、指定したキャラクタリスティックの通知が有効化され、通知が受信されるたびにnotification_handler関数
が呼び出されます。
通知の無効化(stop_notify)
通知を無効化するには、stop_notifyメソッド
を使用します。
これにより、指定したキャラクタリスティックからの通知を停止することができます。
以下は、通知を無効化するサンプルコードです。
import asyncio
from bleak import BleakClient
async def disable_notifications(address, characteristic_uuid):
async with BleakClient(address) as client:
await client.stop_notify(characteristic_uuid)
print(f"{characteristic_uuid} の通知を無効化しました。")
# デバイスのアドレスとキャラクタリスティックのUUIDを指定して通知を無効化
asyncio.run(disable_notifications("00:11:22:33:44:55", "00002a37-0000-1000-8000-00805f9b34fb"))
このコードを実行すると、指定したキャラクタリスティックからの通知が無効化されます。
通知データの処理方法
通知を受信した際のデータ処理は、通知ハンドラ内で行います。
受信したデータは、バイト列として渡されるため、必要に応じてデータをデコードしたり、解析したりすることができます。
以下は、通知データを処理する際の例です。
def notification_handler(sender, data):
# データをデコードして表示
decoded_data = data.decode('utf-8') # UTF-8でデコード
print(f"通知を受信しました。デバイス: {sender}, データ: {decoded_data}")
このように、通知ハンドラ内で受信したデータを適切に処理することで、リアルタイムでデバイスの状態を把握したり、ユーザーに情報を提供したりすることができます。
通知機能を活用することで、BLEデバイスとのインタラクションがよりスムーズになります。
エラーハンドリング
BLE通信を行う際には、さまざまなエラーが発生する可能性があります。
これらのエラーに適切に対処することで、アプリケーションの安定性を向上させることができます。
以下では、接続エラー、読み書きエラー、タイムアウトエラーの対処法について説明します。
接続エラーの対処法
接続エラーは、BLEデバイスに接続できない場合に発生します。
これには、デバイスがオフラインである、アドレスが間違っている、または接続が拒否された場合などが含まれます。
接続エラーを処理するためには、例外処理を使用します。
以下は、接続エラーを処理するサンプルコードです。
import asyncio
from bleak import BleakClient, BleakError
async def connect_with_error_handling(address):
try:
async with BleakClient(address) as client:
print(f"{client.address} に接続しました。")
except BleakError as e:
print(f"接続エラー: {e}")
# デバイスのアドレスを指定して接続
asyncio.run(connect_with_error_handling("00:11:22:33:44:55"))
このコードでは、接続時にエラーが発生した場合にエラーメッセージを表示します。
読み書きエラーの対処法
読み書きエラーは、キャラクタリスティックからのデータの読み取りや書き込みが失敗した場合に発生します。
これには、デバイスが応答しない、または不正なデータが送信された場合などが含まれます。
読み書きエラーを処理するためにも、例外処理を使用します。
以下は、読み書きエラーを処理するサンプルコードです。
import asyncio
from bleak import BleakClient, BleakError
async def read_with_error_handling(address, characteristic_uuid):
try:
async with BleakClient(address) as client:
data = await client.read_gatt_char(characteristic_uuid)
print(f"データ: {data}")
except BleakError as e:
print(f"読み取りエラー: {e}")
# デバイスのアドレスとキャラクタリスティックのUUIDを指定してデータを読み取る
asyncio.run(read_with_error_handling("00:11:22:33:44:55", "00002a37-0000-1000-8000-00805f9b34fb"))
このコードでは、読み取り時にエラーが発生した場合にエラーメッセージを表示します。
タイムアウトエラーの対処法
タイムアウトエラーは、接続やデータの読み書きが指定した時間内に完了しなかった場合に発生します。
タイムアウトエラーを処理するためには、適切なタイムアウト設定を行い、例外処理を使用します。
以下は、タイムアウトエラーを処理するサンプルコードです。
import asyncio
from bleak import BleakClient, BleakError
async def connect_with_timeout(address):
try:
async with BleakClient(address, timeout=5.0) as client: # 5秒のタイムアウトを設定
print(f"{client.address} に接続しました。")
except BleakError as e:
print(f"タイムアウトエラー: {e}")
# デバイスのアドレスを指定して接続
asyncio.run(connect_with_timeout("00:11:22:33:44:55"))
このコードでは、接続時に5秒のタイムアウトを設定し、タイムアウトが発生した場合にエラーメッセージを表示します。
これらのエラーハンドリングを適切に実装することで、BLE通信の信頼性を向上させ、ユーザーにとって快適な体験を提供することができます。
応用例
複数デバイスへの同時接続
bleakライブラリを使用すると、複数のBLEデバイスに同時に接続することが可能です。
非同期処理を活用することで、各デバイスとの通信を効率的に行うことができます。
以下は、複数のデバイスに同時接続するサンプルコードです。
import asyncio
from bleak import BleakClient
async def connect_to_device(address):
async with BleakClient(address) as client:
print(f"{client.address} に接続しました。")
await asyncio.sleep(5) # 5秒間接続を維持
async def connect_multiple_devices(addresses):
tasks = [connect_to_device(address) for address in addresses]
await asyncio.gather(*tasks)
# 接続するデバイスのアドレスを指定
device_addresses = ["00:11:22:33:44:55", "66:77:88:99:AA:BB"]
asyncio.run(connect_multiple_devices(device_addresses))
このコードを実行すると、指定した複数のデバイスに同時に接続されます。
センサーからのデータ取得
BLEデバイスに接続してセンサーからデータを取得することも可能です。
以下は、センサーからのデータを定期的に取得するサンプルコードです。
import asyncio
from bleak import BleakClient
async def read_sensor_data(address, characteristic_uuid):
async with BleakClient(address) as client:
while True:
data = await client.read_gatt_char(characteristic_uuid)
print(f"センサーデータ: {data}")
await asyncio.sleep(2) # 2秒ごとにデータを取得
# デバイスのアドレスとキャラクタリスティックのUUIDを指定
asyncio.run(read_sensor_data("00:11:22:33:44:55", "00002a37-0000-1000-8000-00805f9b34fb"))
このコードを実行すると、指定したキャラクタリスティックからセンサーデータが2秒ごとに取得され、表示されます。
BLEデバイスの制御(例:LEDのオン/オフ)
BLEデバイスを制御することも可能です。
例えば、LEDをオン/オフするためのコマンドを送信することができます。
以下は、LEDを制御するサンプルコードです。
import asyncio
from bleak import BleakClient
async def control_led(address, characteristic_uuid, state):
async with BleakClient(address) as client:
data = bytearray([1]) if state == "on" else bytearray([0])
await client.write_gatt_char(characteristic_uuid, data)
print(f"LEDを{state}にしました。")
# デバイスのアドレスとキャラクタリスティックのUUIDを指定
asyncio.run(control_led("00:11:22:33:44:55", "00002a37-0000-1000-8000-00805f9b34fb", "on"))
このコードを実行すると、指定したキャラクタリスティックに対してLEDをオンにするコマンドが送信されます。
スマートフォンとのBLE通信
スマートフォンとBLE通信を行うことで、アプリケーションの機能を拡張することができます。
例えば、スマートフォンからデータを受信したり、スマートフォンにデータを送信したりすることが可能です。
以下は、スマートフォンとのBLE通信を行うサンプルコードです。
import asyncio
from bleak import BleakClient
async def communicate_with_smartphone(address, characteristic_uuid):
async with BleakClient(address) as client:
# スマートフォンからのデータを受信
data = await client.read_gatt_char(characteristic_uuid)
print(f"スマートフォンからのデータ: {data}")
# スマートフォンにデータを送信
response_data = bytearray([0x01]) # 例: 1バイトのデータ
await client.write_gatt_char(characteristic_uuid, response_data)
print("スマートフォンにデータを送信しました。")
# デバイスのアドレスとキャラクタリスティックのUUIDを指定
asyncio.run(communicate_with_smartphone("00:11:22:33:44:55", "00002a37-0000-1000-8000-00805f9b34fb"))
このコードを実行すると、スマートフォンとの間でデータの受信と送信が行われます。
BLE通信を活用することで、さまざまなアプリケーションを実現することができます。
まとめ
この記事では、Pythonのbleakライブラリを使用してBLE通信を行う方法について詳しく解説しました。
具体的には、BLEデバイスのスキャン、接続、キャラクタリスティックの読み書き、通知の受信、エラーハンドリング、さらには応用例として複数デバイスへの同時接続やセンサーからのデータ取得など、多岐にわたる内容を取り上げました。
これらの知識を活用することで、BLEデバイスとのインタラクションをよりスムーズに行うことができるでしょう。
今後は、実際にbleakライブラリを使って自分のプロジェクトに取り入れ、BLEデバイスとの通信を試みてみてください。
新たなアイデアやアプリケーションの開発に役立つことでしょう。