Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def cancel(self):
"""
Cancel ongoing calibration process.
"""
request = calibration_pb2.CancelRequest()
response = await self._stub.Cancel(request)
response_serializer=calibration__pb2.CalibrateGyroResponse.SerializeToString,
),
'SubscribeCalibrateAccelerometer': grpc.unary_stream_rpc_method_handler(
servicer.SubscribeCalibrateAccelerometer,
request_deserializer=calibration__pb2.SubscribeCalibrateAccelerometerRequest.FromString,
response_serializer=calibration__pb2.CalibrateAccelerometerResponse.SerializeToString,
),
'SubscribeCalibrateMagnetometer': grpc.unary_stream_rpc_method_handler(
servicer.SubscribeCalibrateMagnetometer,
request_deserializer=calibration__pb2.SubscribeCalibrateMagnetometerRequest.FromString,
response_serializer=calibration__pb2.CalibrateMagnetometerResponse.SerializeToString,
),
'SubscribeCalibrateGimbalAccelerometer': grpc.unary_stream_rpc_method_handler(
servicer.SubscribeCalibrateGimbalAccelerometer,
request_deserializer=calibration__pb2.SubscribeCalibrateGimbalAccelerometerRequest.FromString,
response_serializer=calibration__pb2.CalibrateGimbalAccelerometerResponse.SerializeToString,
),
'Cancel': grpc.unary_unary_rpc_method_handler(
servicer.Cancel,
request_deserializer=calibration__pb2.CancelRequest.FromString,
response_serializer=calibration__pb2.CancelResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'mavsdk.rpc.calibration.CalibrationService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
def translate_to_rpc(self, rpcResult):
return {
0: calibration_pb2.CalibrationResult.UNKNOWN,
1: calibration_pb2.CalibrationResult.SUCCESS,
2: calibration_pb2.CalibrationResult.IN_PROGRESS,
3: calibration_pb2.CalibrationResult.INSTRUCTION,
4: calibration_pb2.CalibrationResult.FAILED,
5: calibration_pb2.CalibrationResult.NO_SYSTEM,
6: calibration_pb2.CalibrationResult.CONNECTION_ERROR,
7: calibration_pb2.CalibrationResult.BUSY,
8: calibration_pb2.CalibrationResult.COMMAND_DENIED,
9: calibration_pb2.CalibrationResult.TIMEOUT,
10: calibration_pb2.CalibrationResult.CANCELLED
}.get(self.value, None)
async def calibrate_gimbal_accelerometer(self):
"""
Perform gimbal accelerometer calibration.
Yields
-------
progress_data : ProgressData
Progress data
Raises
------
CalibrationError
If the request fails. The error contains the reason for the failure.
"""
request = calibration_pb2.SubscribeCalibrateGimbalAccelerometerRequest()
calibrate_gimbal_accelerometer_stream = self._stub.SubscribeCalibrateGimbalAccelerometer(request)
try:
async for response in calibrate_gimbal_accelerometer_stream:
result = self._extract_result(response)
success_codes = [CalibrationResult.Result.SUCCESS]
if 'IN_PROGRESS' in [return_code.name for return_code in CalibrationResult.Result]:
success_codes.append(CalibrationResult.Result.IN_PROGRESS)
if 'INSTRUCTION' in [return_code.name for return_code in CalibrationResult.Result]:
success_codes.append(CalibrationResult.Result.INSTRUCTION)
if result.result not in success_codes:
raise CalibrationError(result, "calibrate_gimbal_accelerometer()")
async def calibrate_accelerometer(self):
"""
Perform accelerometer calibration.
Yields
-------
progress_data : ProgressData
Progress data
Raises
------
CalibrationError
If the request fails. The error contains the reason for the failure.
"""
request = calibration_pb2.SubscribeCalibrateAccelerometerRequest()
calibrate_accelerometer_stream = self._stub.SubscribeCalibrateAccelerometer(request)
try:
async for response in calibrate_accelerometer_stream:
result = self._extract_result(response)
success_codes = [CalibrationResult.Result.SUCCESS]
if 'IN_PROGRESS' in [return_code.name for return_code in CalibrationResult.Result]:
success_codes.append(CalibrationResult.Result.IN_PROGRESS)
if 'INSTRUCTION' in [return_code.name for return_code in CalibrationResult.Result]:
success_codes.append(CalibrationResult.Result.INSTRUCTION)
if result.result not in success_codes:
raise CalibrationError(result, "calibrate_accelerometer()")
async def calibrate_magnetometer(self):
"""
Perform magnetometer caliration.
Yields
-------
progress_data : ProgressData
Progress data
Raises
------
CalibrationError
If the request fails. The error contains the reason for the failure.
"""
request = calibration_pb2.SubscribeCalibrateMagnetometerRequest()
calibrate_magnetometer_stream = self._stub.SubscribeCalibrateMagnetometer(request)
try:
async for response in calibrate_magnetometer_stream:
result = self._extract_result(response)
success_codes = [CalibrationResult.Result.SUCCESS]
if 'IN_PROGRESS' in [return_code.name for return_code in CalibrationResult.Result]:
success_codes.append(CalibrationResult.Result.IN_PROGRESS)
if 'INSTRUCTION' in [return_code.name for return_code in CalibrationResult.Result]:
success_codes.append(CalibrationResult.Result.INSTRUCTION)
if result.result not in success_codes:
raise CalibrationError(result, "calibrate_magnetometer()")
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.SubscribeCalibrateGyro = channel.unary_stream(
'/mavsdk.rpc.calibration.CalibrationService/SubscribeCalibrateGyro',
request_serializer=calibration__pb2.SubscribeCalibrateGyroRequest.SerializeToString,
response_deserializer=calibration__pb2.CalibrateGyroResponse.FromString,
)
self.SubscribeCalibrateAccelerometer = channel.unary_stream(
'/mavsdk.rpc.calibration.CalibrationService/SubscribeCalibrateAccelerometer',
request_serializer=calibration__pb2.SubscribeCalibrateAccelerometerRequest.SerializeToString,
response_deserializer=calibration__pb2.CalibrateAccelerometerResponse.FromString,
)
self.SubscribeCalibrateMagnetometer = channel.unary_stream(
'/mavsdk.rpc.calibration.CalibrationService/SubscribeCalibrateMagnetometer',
request_serializer=calibration__pb2.SubscribeCalibrateMagnetometerRequest.SerializeToString,
response_deserializer=calibration__pb2.CalibrateMagnetometerResponse.FromString,
)
self.SubscribeCalibrateGimbalAccelerometer = channel.unary_stream(
'/mavsdk.rpc.calibration.CalibrationService/SubscribeCalibrateGimbalAccelerometer',
request_serializer=calibration__pb2.SubscribeCalibrateGimbalAccelerometerRequest.SerializeToString,
response_deserializer=calibration__pb2.CalibrateGimbalAccelerometerResponse.FromString,
def translate_to_rpc(self, rpcResult):
return {
0: calibration_pb2.CalibrationResult.UNKNOWN,
1: calibration_pb2.CalibrationResult.SUCCESS,
2: calibration_pb2.CalibrationResult.IN_PROGRESS,
3: calibration_pb2.CalibrationResult.INSTRUCTION,
4: calibration_pb2.CalibrationResult.FAILED,
5: calibration_pb2.CalibrationResult.NO_SYSTEM,
6: calibration_pb2.CalibrationResult.CONNECTION_ERROR,
7: calibration_pb2.CalibrationResult.BUSY,
8: calibration_pb2.CalibrationResult.COMMAND_DENIED,
9: calibration_pb2.CalibrationResult.TIMEOUT,
10: calibration_pb2.CalibrationResult.CANCELLED
}.get(self.value, None)
async def calibrate_gyro(self):
"""
Perform gyro calibration.
Yields
-------
progress_data : ProgressData
Progress data
Raises
------
CalibrationError
If the request fails. The error contains the reason for the failure.
"""
request = calibration_pb2.SubscribeCalibrateGyroRequest()
calibrate_gyro_stream = self._stub.SubscribeCalibrateGyro(request)
try:
async for response in calibrate_gyro_stream:
result = self._extract_result(response)
success_codes = [CalibrationResult.Result.SUCCESS]
if 'IN_PROGRESS' in [return_code.name for return_code in CalibrationResult.Result]:
success_codes.append(CalibrationResult.Result.IN_PROGRESS)
if 'INSTRUCTION' in [return_code.name for return_code in CalibrationResult.Result]:
success_codes.append(CalibrationResult.Result.INSTRUCTION)
if result.result not in success_codes:
raise CalibrationError(result, "calibrate_gyro()")