Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def run():
drone = System()
await drone.connect(system_address="udp://:14540")
asyncio.ensure_future(print_camera_mode(drone))
asyncio.ensure_future(print_camera_status(drone))
print("Setting mode to 'PHOTO'")
try:
await drone.camera.set_mode(CameraMode.PHOTO)
except CameraError as error:
print(f"Setting mode failed with error code: {error._result.result}")
await asyncio.sleep(2)
print("Taking a photo")
try:
await drone.camera.take_photo()
async def run():
""" Does Offboard control using velocity body coordinates. """
drone = System()
await drone.connect(system_address="udp://:14540")
print("-- Arming")
await drone.action.arm()
print("-- Setting initial setpoint")
await drone.offboard.set_velocity_body(VelocityBodyYawspeed(0.0, 0.0, 0.0, 0.0))
print("-- Starting offboard")
try:
await drone.offboard.start()
except OffboardError as error:
print(f"Starting offboard mode failed with error code: {error._result.result}")
print("-- Disarming")
await drone.action.disarm()
return
async def run():
drone = System()
await drone.connect(system_address="udp://:14540")
print("Waiting for drone...")
async for state in drone.core.connection_state():
if state.is_connected:
print(f"Drone discovered with UUID: {state.uuid}")
break
lat = 47.3977508
lon = 8.5456074
p1 = Point(lat - 0.0001, lon - 0.0001)
p2 = Point(lat + 0.0001, lon - 0.0001)
p3 = Point(lat + 0.0001, lon + 0.0001)
p4 = Point(lat - 0.0001, lon + 0.0001)
polygon = Polygon([p1, p2, p3, p4], Polygon.Type.INCLUSION)
async def print_flight_mode():
drone = System()
await drone.connect(system_address="udp://:14540")
print("Waiting for drone to connect...")
async for state in drone.core.connection_state():
if state.is_connected:
print(f"Drone discovered with UUID: {state.uuid}")
break
async for flight_mode in drone.telemetry.flight_mode():
print("FlightMode:", flight_mode)
async def run():
""" Does Offboard control using velocity NED coordinates. """
drone = System()
await drone.connect(system_address="udp://:14540")
print("-- Arming")
await drone.action.arm()
print("-- Setting initial setpoint")
await drone.offboard.set_velocity_ned(VelocityNedYaw(0.0, 0.0, 0.0, 0.0))
print("-- Starting offboard")
try:
await drone.offboard.start()
except OffboardError as error:
print(f"Starting offboard mode failed with error code: {error._result.result}")
print("-- Disarming")
await drone.action.disarm()
return
async def run():
drone = System()
await drone.connect(system_address="udp://:14540")
print("-- Starting gyro calibration")
async for progress_data in drone.calibration.calibrate_gyro():
print(progress_data)
async def run():
drone = System()
await drone.connect(system_address="udp://:14540")
print("Waiting for drone...")
async for state in drone.core.connection_state():
if state.is_connected:
print(f"Drone discovered with UUID: {state.uuid}")
break
print("-- Arming")
await drone.action.arm()
print("-- Taking off")
await drone.action.takeoff()
await asyncio.sleep(5)
async def run():
drone = System()
await drone.connect(system_address="udp://:14540")
asyncio.ensure_future(print_mission_progress(drone))
termination_task = asyncio.ensure_future(observe_is_in_air(drone))
mission_items = []
mission_items.append(MissionItem(47.398039859999997,
8.5455725400000002,
25,
10,
True,
float('nan'),
float('nan'),
MissionItem.CameraAction.NONE,
float('nan'),
float('nan')))
Then it registers tasks to be run in parallel (one can think of them as threads):
- print_altitude: print the altitude
- print_flight_mode: print the flight mode
- observe_is_in_air: this monitors the flight mode and returns when the
drone has been in air and is back on the ground.
Finally, it goes to the actual works: arm the drone, initiate a takeoff
and finally land.
Note that "observe_is_in_air" is not necessary, but it ensures that the
script waits until the drone is actually landed, so that we receive feedback
during the landing as well.
"""
# Init the drone
drone = System()
await drone.connect(system_address="udp://:14540")
# Start parallel tasks
asyncio.ensure_future(print_altitude(drone))
asyncio.ensure_future(print_flight_mode(drone))
termination_task = asyncio.ensure_future(observe_is_in_air(drone))
# Execute the maneuvers
print("-- Arming")
await drone.action.arm()
print("-- Taking off")
await drone.action.takeoff()
await asyncio.sleep(5)
async def run():
drone = System()
await drone.connect(system_address="udp://:14540")
print("Waiting for drone to connect...")
async for state in drone.core.connection_state():
if state.is_connected:
print(f"Drone discovered with UUID: {state.uuid}")
break
info = await drone.info.get_version()
print(info)