Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
done = asyncio.Event()
# Register AsyncOperationCompletedHandler callback that triggers the above asyncio.Event.
op.Completed = AsyncOperationCompletedHandler[return_type](
lambda x, y: loop.call_soon_threadsafe(done.set)
)
# Wait for callback.
await done.wait()
if op.Status == AsyncStatus.Completed:
return op.GetResults()
elif op.Status == AsyncStatus.Error:
# Exception occurred. Wrap it in BleakDotNetTaskError
# to make it catchable.
raise BleakDotNetTaskError(op.ErrorCode.ToString())
else:
# TODO: Handle IsCancelled.
raise BleakDotNetTaskError("IAsyncOperation Status: {0}".format(op.Status))
# Register AsyncOperationCompletedHandler callback that triggers the above asyncio.Event.
op.Completed = AsyncOperationCompletedHandler[return_type](
lambda x, y: loop.call_soon_threadsafe(done.set)
)
# Wait for callback.
await done.wait()
if op.Status == AsyncStatus.Completed:
return op.GetResults()
elif op.Status == AsyncStatus.Error:
# Exception occurred. Wrap it in BleakDotNetTaskError
# to make it catchable.
raise BleakDotNetTaskError(op.ErrorCode.ToString())
else:
# TODO: Handle IsCancelled.
raise BleakDotNetTaskError("IAsyncOperation Status: {0}".format(op.Status))
if services_result.Status != GattCommunicationStatus.Success:
raise BleakDotNetTaskError("Could not get GATT services.")
# TODO: Check if fetching yeilds failures...
for service in services_result.Services:
characteristics_result = await wrap_IAsyncOperation(
IAsyncOperation[GattCharacteristicsResult](
service.GetCharacteristicsAsync()
),
return_type=GattCharacteristicsResult,
loop=self.loop,
)
self.services.add_service(BleakGATTServiceDotNet(service))
if characteristics_result.Status != GattCommunicationStatus.Success:
raise BleakDotNetTaskError(
"Could not get GATT characteristics for {0}.".format(service)
)
for characteristic in characteristics_result.Characteristics:
descriptors_result = await wrap_IAsyncOperation(
IAsyncOperation[GattDescriptorsResult](
characteristic.GetDescriptorsAsync()
),
return_type=GattDescriptorsResult,
loop=self.loop,
)
self.services.add_characteristic(
BleakGATTCharacteristicDotNet(characteristic)
)
if descriptors_result.Status != GattCommunicationStatus.Success:
raise BleakDotNetTaskError(
"Could not get GATT descriptors for {0}.".format(
def result(self):
# TODO: Handle IsCancelled.
if self.task.IsFaulted:
# Exception occurred. Wrap it in BleakDotNetTaskError
# to make it catchable.
raise BleakDotNetTaskError(self.task.Exception.ToString())
return self.task.Result
def result(self):
if self.operation.Status == AsyncStatus.Completed:
return self.operation.GetResults()
elif self.operation.Status == AsyncStatus.Error:
# Exception occurred. Wrap it in BleakDotNetTaskError
# to make it catchable.
raise BleakDotNetTaskError(self.operation.ErrorCode.ToString())
else:
# TODO: Handle IsCancelled.
raise BleakDotNetTaskError(
"IAsyncOperation Status: {0}".format(self.operation.Status)
)
"""
# Return the Service Collection.
if self._services_resolved:
return self.services
else:
logger.debug("Get Services...")
services_result = await wrap_IAsyncOperation(
IAsyncOperation[GattDeviceServicesResult](
self._requester.GetGattServicesAsync()
),
return_type=GattDeviceServicesResult,
loop=self.loop,
)
if services_result.Status != GattCommunicationStatus.Success:
raise BleakDotNetTaskError("Could not get GATT services.")
# TODO: Check if fetching yeilds failures...
for service in services_result.Services:
characteristics_result = await wrap_IAsyncOperation(
IAsyncOperation[GattCharacteristicsResult](
service.GetCharacteristicsAsync()
),
return_type=GattCharacteristicsResult,
loop=self.loop,
)
self.services.add_service(BleakGATTServiceDotNet(service))
if characteristics_result.Status != GattCommunicationStatus.Success:
raise BleakDotNetTaskError(
"Could not get GATT characteristics for {0}.".format(service)
)
for characteristic in characteristics_result.Characteristics: