Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def set_return_to_launch_after_mission(self, enable):
"""
Set whether to trigger Return-to-Launch (RTL) after the mission is complete.
This will only take effect for the next mission upload, meaning that
the mission may have to be uploaded again.
Parameters
----------
enable : bool
If true, trigger an RTL at the end of the mission
"""
request = mission_pb2.SetReturnToLaunchAfterMissionRequest()
request.enable = enable
response = await self._stub.SetReturnToLaunchAfterMission(request)
async def cancel_mission_download(self):
"""
Cancel an ongoing mission download.
"""
request = mission_pb2.CancelMissionDownloadRequest()
response = await self._stub.CancelMissionDownload(request)
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.UploadMission = channel.unary_unary(
'/mavsdk.rpc.mission.MissionService/UploadMission',
request_serializer=mission__pb2.UploadMissionRequest.SerializeToString,
response_deserializer=mission__pb2.UploadMissionResponse.FromString,
)
self.CancelMissionUpload = channel.unary_unary(
'/mavsdk.rpc.mission.MissionService/CancelMissionUpload',
request_serializer=mission__pb2.CancelMissionUploadRequest.SerializeToString,
response_deserializer=mission__pb2.CancelMissionUploadResponse.FromString,
)
self.DownloadMission = channel.unary_unary(
'/mavsdk.rpc.mission.MissionService/DownloadMission',
request_serializer=mission__pb2.DownloadMissionRequest.SerializeToString,
response_deserializer=mission__pb2.DownloadMissionResponse.FromString,
)
self.CancelMissionDownload = channel.unary_unary(
'/mavsdk.rpc.mission.MissionService/CancelMissionDownload',
request_serializer=mission__pb2.CancelMissionDownloadRequest.SerializeToString,
response_deserializer=mission__pb2.CancelMissionDownloadResponse.FromString,
async def get_return_to_launch_after_mission(self):
"""
Get whether to trigger Return-to-Launch (RTL) after mission is complete.
Before getting this option, it needs to be set, or a mission
needs to be downloaded.
Returns
-------
enable : bool
If true, trigger an RTL at the end of the mission
"""
request = mission_pb2.GetReturnToLaunchAfterMissionRequest()
response = await self._stub.GetReturnToLaunchAfterMission(request)
return response.enable
Parameters
----------
mission_items : [MissionItem]
List of mission items
Raises
------
MissionError
If the request fails. The error contains the reason for the failure.
"""
request = mission_pb2.UploadMissionRequest()
rpc_elems_list = []
for elem in mission_items:
rpc_elem = mission_pb2.MissionItem()
elem.translate_to_rpc(rpc_elem)
rpc_elems_list.append(rpc_elem)
request.mission_items.extend(rpc_elems_list)
response = await self._stub.UploadMission(request)
result = self._extract_result(response)
if result.result is not MissionResult.Result.SUCCESS:
raise MissionError(result, "upload_mission()", mission_items)
async def pause_mission(self):
"""
Pause the mission.
Pausing the mission puts the vehicle into
[HOLD mode](https://docs.px4.io/en/flight_modes/hold.html).
A multicopter should just hover at the spot while a fixedwing vehicle should loiter
around the location where it paused.
Raises
------
MissionError
If the request fails. The error contains the reason for the failure.
"""
request = mission_pb2.PauseMissionRequest()
response = await self._stub.PauseMission(request)
result = self._extract_result(response)
if result.result is not MissionResult.Result.SUCCESS:
raise MissionError(result, "pause_mission()")
The mission items are uploaded to a drone. Once uploaded the mission can be started and
executed even if the connection is lost.
Parameters
----------
mission_items : [MissionItem]
List of mission items
Raises
------
MissionError
If the request fails. The error contains the reason for the failure.
"""
request = mission_pb2.UploadMissionRequest()
rpc_elems_list = []
for elem in mission_items:
rpc_elem = mission_pb2.MissionItem()
elem.translate_to_rpc(rpc_elem)
rpc_elems_list.append(rpc_elem)
request.mission_items.extend(rpc_elems_list)
response = await self._stub.UploadMission(request)
result = self._extract_result(response)
if result.result is not MissionResult.Result.SUCCESS:
raise MissionError(result, "upload_mission()", mission_items)
async def clear_mission(self):
"""
Clear the mission saved on the vehicle.
Raises
------
MissionError
If the request fails. The error contains the reason for the failure.
"""
request = mission_pb2.ClearMissionRequest()
response = await self._stub.ClearMission(request)
result = self._extract_result(response)
if result.result is not MissionResult.Result.SUCCESS:
raise MissionError(result, "clear_mission()")
async def is_mission_finished(self):
"""
Check if the mission has been finished.
Returns
-------
is_finished : bool
True if the mission is finished and the last mission item has been reached
"""
request = mission_pb2.IsMissionFinishedRequest()
response = await self._stub.IsMissionFinished(request)
return response.is_finished