Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def translate_to_rpc(self, rpcCameraMode):
return {
0: camera_pb2.UNKNOWN,
1: camera_pb2.PHOTO,
2: camera_pb2.VIDEO
}.get(self.value, None)
async def stop_photo_interval(self):
"""
Stop a running photo timelapse.
Raises
------
CameraError
If the request fails. The error contains the reason for the failure.
"""
request = camera_pb2.StopPhotoIntervalRequest()
response = await self._stub.StopPhotoInterval(request)
result = self._extract_result(response)
if result.result is not CameraResult.Result.SUCCESS:
raise CameraError(result, "stop_photo_interval()")
async def take_photo(self):
"""
Take one photo.
Raises
------
CameraError
If the request fails. The error contains the reason for the failure.
"""
request = camera_pb2.TakePhotoRequest()
response = await self._stub.TakePhoto(request)
result = self._extract_result(response)
if result.result is not CameraResult.Result.SUCCESS:
raise CameraError(result, "take_photo()")
def translate_to_rpc(self, rpcVideoStreamStatus):
return {
0: camera_pb2.VideoStreamInfo.NOT_RUNNING,
1: camera_pb2.VideoStreamInfo.IN_PROGRESS
}.get(self.value, None)
async def start_video_streaming(self):
"""
Start video streaming.
Raises
------
CameraError
If the request fails. The error contains the reason for the failure.
"""
request = camera_pb2.StartVideoStreamingRequest()
response = await self._stub.StartVideoStreaming(request)
result = self._extract_result(response)
if result.result is not CameraResult.Result.SUCCESS:
raise CameraError(result, "start_video_streaming()")
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.TakePhoto = channel.unary_unary(
'/mavsdk.rpc.camera.CameraService/TakePhoto',
request_serializer=camera__pb2.TakePhotoRequest.SerializeToString,
response_deserializer=camera__pb2.TakePhotoResponse.FromString,
)
self.StartPhotoInterval = channel.unary_unary(
'/mavsdk.rpc.camera.CameraService/StartPhotoInterval',
request_serializer=camera__pb2.StartPhotoIntervalRequest.SerializeToString,
response_deserializer=camera__pb2.StartPhotoIntervalResponse.FromString,
)
self.StopPhotoInterval = channel.unary_unary(
'/mavsdk.rpc.camera.CameraService/StopPhotoInterval',
request_serializer=camera__pb2.StopPhotoIntervalRequest.SerializeToString,
response_deserializer=camera__pb2.StopPhotoIntervalResponse.FromString,
)
self.StartVideo = channel.unary_unary(
'/mavsdk.rpc.camera.CameraService/StartVideo',
request_serializer=camera__pb2.StartVideoRequest.SerializeToString,
response_deserializer=camera__pb2.StartVideoResponse.FromString,
async def capture_info(self):
"""
Subscribe to capture info updates.
Yields
-------
capture_info : CaptureInfo
Capture info
"""
request = camera_pb2.SubscribeCaptureInfoRequest()
capture_info_stream = self._stub.SubscribeCaptureInfo(request)
try:
async for response in capture_info_stream:
yield CaptureInfo.translate_from_rpc(response.capture_info)
finally:
capture_info_stream.cancel()
async def stop_video(self):
"""
Stop a running video recording.
Raises
------
CameraError
If the request fails. The error contains the reason for the failure.
"""
request = camera_pb2.StopVideoRequest()
response = await self._stub.StopVideo(request)
result = self._extract_result(response)
if result.result is not CameraResult.Result.SUCCESS:
raise CameraError(result, "stop_video()")
def translate_to_rpc(self, rpcCameraMode):
return {
0: camera_pb2.UNKNOWN,
1: camera_pb2.PHOTO,
2: camera_pb2.VIDEO
}.get(self.value, None)