Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
------
CalibrationError
If the request fails. The error contains the reason for the failure.
"""
request = calibration_pb2.SubscribeCalibrateMagnetometerRequest()
calibrate_magnetometer_stream = self._stub.SubscribeCalibrateMagnetometer(request)
try:
async for response in calibrate_magnetometer_stream:
result = self._extract_result(response)
success_codes = [CalibrationResult.Result.SUCCESS]
if 'IN_PROGRESS' in [return_code.name for return_code in CalibrationResult.Result]:
success_codes.append(CalibrationResult.Result.IN_PROGRESS)
if 'INSTRUCTION' in [return_code.name for return_code in CalibrationResult.Result]:
success_codes.append(CalibrationResult.Result.INSTRUCTION)
if result.result not in success_codes:
raise CalibrationError(result, "calibrate_magnetometer()")
if result.result is CalibrationResult.Result.SUCCESS:
calibrate_magnetometer_stream.cancel();
return
yield ProgressData.translate_from_rpc(response.progress_data)
finally:
calibrate_magnetometer_stream.cancel()
def translate_from_rpc(rpc_enum_value):
""" Parses a gRPC response """
return {
0: CalibrationResult.Result.UNKNOWN,
1: CalibrationResult.Result.SUCCESS,
2: CalibrationResult.Result.IN_PROGRESS,
3: CalibrationResult.Result.INSTRUCTION,
4: CalibrationResult.Result.FAILED,
5: CalibrationResult.Result.NO_SYSTEM,
6: CalibrationResult.Result.CONNECTION_ERROR,
7: CalibrationResult.Result.BUSY,
8: CalibrationResult.Result.COMMAND_DENIED,
9: CalibrationResult.Result.TIMEOUT,
10: CalibrationResult.Result.CANCELLED,
}.get(rpc_enum_value, None)
def translate_from_rpc(rpcCalibrationResult):
""" Translates a gRPC struct to the SDK equivalent """
return CalibrationResult(
CalibrationResult.Result.translate_from_rpc(rpcCalibrationResult.result),
rpcCalibrationResult.result_str
)
def translate_from_rpc(rpc_enum_value):
""" Parses a gRPC response """
return {
0: CalibrationResult.Result.UNKNOWN,
1: CalibrationResult.Result.SUCCESS,
2: CalibrationResult.Result.IN_PROGRESS,
3: CalibrationResult.Result.INSTRUCTION,
4: CalibrationResult.Result.FAILED,
5: CalibrationResult.Result.NO_SYSTEM,
6: CalibrationResult.Result.CONNECTION_ERROR,
7: CalibrationResult.Result.BUSY,
8: CalibrationResult.Result.COMMAND_DENIED,
9: CalibrationResult.Result.TIMEOUT,
10: CalibrationResult.Result.CANCELLED,
}.get(rpc_enum_value, None)
def translate_from_rpc(rpc_enum_value):
""" Parses a gRPC response """
return {
0: CalibrationResult.Result.UNKNOWN,
1: CalibrationResult.Result.SUCCESS,
2: CalibrationResult.Result.IN_PROGRESS,
3: CalibrationResult.Result.INSTRUCTION,
4: CalibrationResult.Result.FAILED,
5: CalibrationResult.Result.NO_SYSTEM,
6: CalibrationResult.Result.CONNECTION_ERROR,
7: CalibrationResult.Result.BUSY,
8: CalibrationResult.Result.COMMAND_DENIED,
9: CalibrationResult.Result.TIMEOUT,
10: CalibrationResult.Result.CANCELLED,
}.get(rpc_enum_value, None)
def translate_from_rpc(rpc_enum_value):
""" Parses a gRPC response """
return {
0: CalibrationResult.Result.UNKNOWN,
1: CalibrationResult.Result.SUCCESS,
2: CalibrationResult.Result.IN_PROGRESS,
3: CalibrationResult.Result.INSTRUCTION,
4: CalibrationResult.Result.FAILED,
5: CalibrationResult.Result.NO_SYSTEM,
6: CalibrationResult.Result.CONNECTION_ERROR,
7: CalibrationResult.Result.BUSY,
8: CalibrationResult.Result.COMMAND_DENIED,
9: CalibrationResult.Result.TIMEOUT,
10: CalibrationResult.Result.CANCELLED,
}.get(rpc_enum_value, None)
try:
async for response in calibrate_accelerometer_stream:
result = self._extract_result(response)
success_codes = [CalibrationResult.Result.SUCCESS]
if 'IN_PROGRESS' in [return_code.name for return_code in CalibrationResult.Result]:
success_codes.append(CalibrationResult.Result.IN_PROGRESS)
if 'INSTRUCTION' in [return_code.name for return_code in CalibrationResult.Result]:
success_codes.append(CalibrationResult.Result.INSTRUCTION)
if result.result not in success_codes:
raise CalibrationError(result, "calibrate_accelerometer()")
if result.result is CalibrationResult.Result.SUCCESS:
calibrate_accelerometer_stream.cancel();
return
yield ProgressData.translate_from_rpc(response.progress_data)
finally:
calibrate_accelerometer_stream.cancel()
def translate_from_rpc(rpc_enum_value):
""" Parses a gRPC response """
return {
0: CalibrationResult.Result.UNKNOWN,
1: CalibrationResult.Result.SUCCESS,
2: CalibrationResult.Result.IN_PROGRESS,
3: CalibrationResult.Result.INSTRUCTION,
4: CalibrationResult.Result.FAILED,
5: CalibrationResult.Result.NO_SYSTEM,
6: CalibrationResult.Result.CONNECTION_ERROR,
7: CalibrationResult.Result.BUSY,
8: CalibrationResult.Result.COMMAND_DENIED,
9: CalibrationResult.Result.TIMEOUT,
10: CalibrationResult.Result.CANCELLED,
}.get(rpc_enum_value, None)
def translate_from_rpc(rpc_enum_value):
""" Parses a gRPC response """
return {
0: CalibrationResult.Result.UNKNOWN,
1: CalibrationResult.Result.SUCCESS,
2: CalibrationResult.Result.IN_PROGRESS,
3: CalibrationResult.Result.INSTRUCTION,
4: CalibrationResult.Result.FAILED,
5: CalibrationResult.Result.NO_SYSTEM,
6: CalibrationResult.Result.CONNECTION_ERROR,
7: CalibrationResult.Result.BUSY,
8: CalibrationResult.Result.COMMAND_DENIED,
9: CalibrationResult.Result.TIMEOUT,
10: CalibrationResult.Result.CANCELLED,
}.get(rpc_enum_value, None)
def translate_from_rpc(rpc_enum_value):
""" Parses a gRPC response """
return {
0: CalibrationResult.Result.UNKNOWN,
1: CalibrationResult.Result.SUCCESS,
2: CalibrationResult.Result.IN_PROGRESS,
3: CalibrationResult.Result.INSTRUCTION,
4: CalibrationResult.Result.FAILED,
5: CalibrationResult.Result.NO_SYSTEM,
6: CalibrationResult.Result.CONNECTION_ERROR,
7: CalibrationResult.Result.BUSY,
8: CalibrationResult.Result.COMMAND_DENIED,
9: CalibrationResult.Result.TIMEOUT,
10: CalibrationResult.Result.CANCELLED,
}.get(rpc_enum_value, None)