Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self):
# Init pygame
pygame.init()
# Creat pygame window
pygame.display.set_caption("Tello video stream")
self.screen = pygame.display.set_mode([960, 720])
# Init Tello object that interacts with the Tello drone
self.tello = Tello()
# Drone velocities between -100~100
self.for_back_velocity = 0
self.left_right_velocity = 0
self.up_down_velocity = 0
self.yaw_velocity = 0
self.speed = 10
self.send_rc_control = False
# create update timer
pygame.time.set_timer(pygame.USEREVENT + 1, 1000 // FPS)
def parse_state(state: str) -> dict:
"""Parse a state line to a dictionary
Internal method, you normally wouldn't call this yourself.
"""
state = state.strip()
Tello.LOGGER.debug('Raw state data: {}'.format(state))
if state == 'ok':
return {}
state_dict = {}
for field in state.split(';'):
split = field.split(':')
if len(split) < 2:
continue
key = split[0]
value = split[1]
if key in Tello.state_field_converters:
try:
value = Tello.state_field_converters[key](value)
state = state.strip()
Tello.LOGGER.debug('Raw state data: {}'.format(state))
if state == 'ok':
return {}
state_dict = {}
for field in state.split(';'):
split = field.split(':')
if len(split) < 2:
continue
key = split[0]
value = split[1]
if key in Tello.state_field_converters:
try:
value = Tello.state_field_converters[key](value)
except Exception as e:
Tello.LOGGER.debug('Error parsing state value for {}: {} to {}'
.format(key, value, Tello.state_field_converters[key]))
Tello.LOGGER.error(e)
state_dict[key] = value
return state_dict
return {}
state_dict = {}
for field in state.split(';'):
split = field.split(':')
if len(split) < 2:
continue
key = split[0]
value = split[1]
if key in Tello.state_field_converters:
try:
value = Tello.state_field_converters[key](value)
except Exception as e:
Tello.LOGGER.debug('Error parsing state value for {}: {} to {}'
.format(key, value, Tello.state_field_converters[key]))
Tello.LOGGER.error(e)
state_dict[key] = value
return state_dict
state_dict = {}
for field in state.split(';'):
split = field.split(':')
if len(split) < 2:
continue
key = split[0]
value = split[1]
if key in Tello.state_field_converters:
try:
value = Tello.state_field_converters[key](value)
except Exception as e:
Tello.LOGGER.debug('Error parsing state value for {}: {} to {}'
.format(key, value, Tello.state_field_converters[key]))
Tello.LOGGER.error(e)
state_dict[key] = value
return state_dict
state_dict = {}
for field in state.split(';'):
split = field.split(':')
if len(split) < 2:
continue
key = split[0]
value = split[1]
if key in Tello.state_field_converters:
try:
value = Tello.state_field_converters[key](value)
except Exception as e:
Tello.LOGGER.debug('Error parsing state value for {}: {} to {}'
.format(key, value, Tello.state_field_converters[key]))
Tello.LOGGER.error(e)
state_dict[key] = value
return state_dict
self.address = (host, Tello.CONTROL_UDP_PORT)
self.stream_on = False
self.retry_count = retry_count
self.last_received_command_timestamp = time.time()
self.last_rc_control_timestamp = time.time()
if drones is None:
drones = {}
# Run Tello command responses UDP receiver on background
response_receiver_thread = threading.Thread(target=Tello.udp_response_receiver)
response_receiver_thread.daemon = True
response_receiver_thread.start()
# Run state UDP receiver on background
state_receiver_thread = threading.Thread(target=Tello.udp_state_receiver)
state_receiver_thread.daemon = True
state_receiver_thread.start()
drones[host] = {
'responses': [],
'state': {},
}
def udp_state_receiver():
"""Setup state UDP receiver. This method listens for state information from
Tello. Must be run from a background thread in order to not block
the main thread.
Internal method, you normally wouldn't call this yourself.
"""
state_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
state_socket.bind(('', Tello.STATE_UDP_PORT))
while True:
try:
data, address = state_socket.recvfrom(1024)
address = address[0]
Tello.LOGGER.debug('Data received from {} at state_socket'.format(address))
if address not in drones:
continue
data = data.decode('ASCII')
drones[address]['state'] = Tello.parse_state(data)
except Exception as e:
Tello.LOGGER.error(e)
host=TELLO_IP,
retry_count=RETRY_COUNT):
global drones
self.address = (host, Tello.CONTROL_UDP_PORT)
self.stream_on = False
self.retry_count = retry_count
self.last_received_command_timestamp = time.time()
self.last_rc_control_timestamp = time.time()
if drones is None:
drones = {}
# Run Tello command responses UDP receiver on background
response_receiver_thread = threading.Thread(target=Tello.udp_response_receiver)
response_receiver_thread.daemon = True
response_receiver_thread.start()
# Run state UDP receiver on background
state_receiver_thread = threading.Thread(target=Tello.udp_state_receiver)
state_receiver_thread.daemon = True
state_receiver_thread.start()
drones[host] = {
'responses': [],
'state': {},
}
def fromIps(ips: list):
"""Create TelloSwarm from a list of IP addresses.
Arguments:
ips: list of IP Addresses
"""
if len(ips) == 0:
raise Exception("No ips provided")
tellos = []
for ip in ips:
tellos.append(Tello(ip.strip()))
return TelloSwarm(tellos)