This repository has been archived by the owner on Nov 3, 2020. It is now read-only.
/
low_level.py
106 lines (86 loc) · 4.34 KB
/
low_level.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# Example of low level interaction with a BLE UART device that has an RX and TX
# characteristic for receiving and sending data. This doesn't use any service
# implementation and instead just manipulates the services and characteristics
# on a device. See the uart_service.py example for a simpler UART service
# example that uses a high level service implementation.
# Author: Tony DiCola
import logging
import time
import uuid
import Adafruit_BluefruitLE
# Enable debug output.
#logging.basicConfig(level=logging.DEBUG)
# Define service and characteristic UUIDs used by the UART service.
UART_SERVICE_UUID = uuid.UUID('6E400001-B5A3-F393-E0A9-E50E24DCCA9E')
TX_CHAR_UUID = uuid.UUID('6E400002-B5A3-F393-E0A9-E50E24DCCA9E')
RX_CHAR_UUID = uuid.UUID('6E400003-B5A3-F393-E0A9-E50E24DCCA9E')
# Get the BLE provider for the current platform.
ble = Adafruit_BluefruitLE.get_provider()
# Main function implements the program logic so it can run in a background
# thread. Most platforms require the main thread to handle GUI events and other
# asyncronous events like BLE actions. All of the threading logic is taken care
# of automatically though and you just need to provide a main function that uses
# the BLE provider.
def main():
# Clear any cached data because both bluez and CoreBluetooth have issues with
# caching data and it going stale.
ble.clear_cached_data()
# Get the first available BLE network adapter and make sure it's powered on.
adapter = ble.get_default_adapter()
adapter.power_on()
print('Using adapter: {0}'.format(adapter.name))
# Disconnect any currently connected UART devices. Good for cleaning up and
# starting from a fresh state.
print('Disconnecting any connected UART devices...')
ble.disconnect_devices([UART_SERVICE_UUID])
# Scan for UART devices.
print('Searching for UART device...')
try:
adapter.start_scan()
# Search for the first UART device found (will time out after 60 seconds
# but you can specify an optional timeout_sec parameter to change it).
device = ble.find_device(service_uuids=[UART_SERVICE_UUID])
if device is None:
raise RuntimeError('Failed to find UART device!')
finally:
# Make sure scanning is stopped before exiting.
adapter.stop_scan()
print('Connecting to device...')
device.connect() # Will time out after 60 seconds, specify timeout_sec parameter
# to change the timeout.
# Once connected do everything else in a try/finally to make sure the device
# is disconnected when done.
try:
# Wait for service discovery to complete for at least the specified
# service and characteristic UUID lists. Will time out after 60 seconds
# (specify timeout_sec parameter to override).
print('Discovering services...')
device.discover([UART_SERVICE_UUID], [TX_CHAR_UUID, RX_CHAR_UUID])
# Find the UART service and its characteristics.
uart = device.find_service(UART_SERVICE_UUID)
rx = uart.find_characteristic(RX_CHAR_UUID)
tx = uart.find_characteristic(TX_CHAR_UUID)
# Write a string to the TX characteristic.
print('Sending message to device...')
tx.write_value('Hello world!\r\n')
# Function to receive RX characteristic changes. Note that this will
# be called on a different thread so be careful to make sure state that
# the function changes is thread safe. Use queue or other thread-safe
# primitives to send data to other threads.
def received(data):
print('Received: {0}'.format(data))
# Turn on notification of RX characteristics using the callback above.
print('Subscribing to RX characteristic changes...')
rx.start_notify(received)
# Now just wait for 30 seconds to receive data.
print('Waiting 60 seconds to receive data from the device...')
time.sleep(60)
finally:
# Make sure device is disconnected on exit.
device.disconnect()
# Initialize the BLE system. MUST be called before other BLE calls!
ble.initialize()
# Start the mainloop to process BLE events, and run the provided function in
# a background thread. When the provided main function stops running, returns
# an integer status code, or throws an error the program will exit.
ble.run_mainloop_with(main)