Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def battery(self):
"""
Subscribe to 'battery' updates.
Yields
-------
battery : Battery
The next 'battery' state
"""
request = telemetry_pb2.SubscribeBatteryRequest()
battery_stream = self._stub.SubscribeBattery(request)
try:
async for response in battery_stream:
yield Battery.translate_from_rpc(response.battery)
finally:
battery_stream.cancel()
async def armed(self):
"""
Subscribe to armed updates.
Yields
-------
is_armed : bool
The next 'armed' state
"""
request = telemetry_pb2.SubscribeArmedRequest()
armed_stream = self._stub.SubscribeArmed(request)
try:
async for response in armed_stream:
yield response.is_armed
finally:
armed_stream.cancel()
async def rc_status(self):
"""
Subscribe to 'RC status' updates.
Yields
-------
rc_status : RcStatus
The next RC status
"""
request = telemetry_pb2.SubscribeRcStatusRequest()
rc_status_stream = self._stub.SubscribeRcStatus(request)
try:
async for response in rc_status_stream:
yield RcStatus.translate_from_rpc(response.rc_status)
finally:
rc_status_stream.cancel()
async def in_air(self):
"""
Subscribe to in-air updates.
Yields
-------
is_in_air : bool
The next 'in-air' state
"""
request = telemetry_pb2.SubscribeInAirRequest()
in_air_stream = self._stub.SubscribeInAir(request)
try:
async for response in in_air_stream:
yield response.is_in_air
finally:
in_air_stream.cancel()
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.SubscribePosition = channel.unary_stream(
'/mavsdk.rpc.telemetry.TelemetryService/SubscribePosition',
request_serializer=telemetry__pb2.SubscribePositionRequest.SerializeToString,
response_deserializer=telemetry__pb2.PositionResponse.FromString,
)
self.SubscribeHome = channel.unary_stream(
'/mavsdk.rpc.telemetry.TelemetryService/SubscribeHome',
request_serializer=telemetry__pb2.SubscribeHomeRequest.SerializeToString,
response_deserializer=telemetry__pb2.HomeResponse.FromString,
)
self.SubscribeInAir = channel.unary_stream(
'/mavsdk.rpc.telemetry.TelemetryService/SubscribeInAir',
request_serializer=telemetry__pb2.SubscribeInAirRequest.SerializeToString,
response_deserializer=telemetry__pb2.InAirResponse.FromString,
)
self.SubscribeLandedState = channel.unary_stream(
'/mavsdk.rpc.telemetry.TelemetryService/SubscribeLandedState',
request_serializer=telemetry__pb2.SubscribeLandedStateRequest.SerializeToString,
response_deserializer=telemetry__pb2.LandedStateResponse.FromString,
def translate_to_rpc(self, rpcFlightMode):
return {
0: telemetry_pb2.UNKNOWN,
1: telemetry_pb2.READY,
2: telemetry_pb2.TAKEOFF,
3: telemetry_pb2.HOLD,
4: telemetry_pb2.MISSION,
5: telemetry_pb2.RETURN_TO_LAUNCH,
6: telemetry_pb2.LAND,
7: telemetry_pb2.OFFBOARD,
8: telemetry_pb2.FOLLOW_ME
}.get(self.value, None)
async def health(self):
"""
Subscribe to 'health' updates.
Yields
-------
health : Health
The next 'health' state
"""
request = telemetry_pb2.SubscribeHealthRequest()
health_stream = self._stub.SubscribeHealth(request)
try:
async for response in health_stream:
yield Health.translate_from_rpc(response.health)
finally:
health_stream.cancel()
async def status_text(self):
"""
Subscribe to 'status text' updates.
Yields
-------
status_text : StatusText
The next 'status text'
"""
request = telemetry_pb2.SubscribeStatusTextRequest()
status_text_stream = self._stub.SubscribeStatusText(request)
try:
async for response in status_text_stream:
yield StatusText.translate_from_rpc(response.status_text)
finally:
status_text_stream.cancel()
async def flight_mode(self):
"""
Subscribe to 'flight mode' updates.
Yields
-------
flight_mode : FlightMode
The next flight mode
"""
request = telemetry_pb2.SubscribeFlightModeRequest()
flight_mode_stream = self._stub.SubscribeFlightMode(request)
try:
async for response in flight_mode_stream:
yield FlightMode.translate_from_rpc(response.flight_mode)
finally:
flight_mode_stream.cancel()
async def actuator_control_target(self):
"""
Yields
-------
actuator_control_target : ActuatorControlTarget
Actuator control target
"""
request = telemetry_pb2.SubscribeActuatorControlTargetRequest()
actuator_control_target_stream = self._stub.SubscribeActuatorControlTarget(request)
try:
async for response in actuator_control_target_stream:
yield ActuatorControlTarget.translate_from_rpc(response.actuator_control_target)
finally:
actuator_control_target_stream.cancel()