Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def parse_state(state: str) -> dict:
"""Parse a state line to a dictionary
Internal method, you normally wouldn't call this yourself.
"""
state = state.strip()
Tello.LOGGER.debug('Raw state data: {}'.format(state))
if state == 'ok':
return {}
state_dict = {}
for field in state.split(';'):
split = field.split(':')
if len(split) < 2:
continue
key = split[0]
value = split[1]
if key in Tello.state_field_converters:
try:
value = Tello.state_field_converters[key](value)
def udp_response_receiver():
"""Setup drone UDP receiver. This method listens for responses of Tello.
Must be run from a background thread in order to not block the main thread.
Internal method, you normally wouldn't call this yourself.
"""
global client_socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.bind(('', Tello.CONTROL_UDP_PORT))
while True:
try:
data, address = client_socket.recvfrom(1024)
address = address[0]
Tello.LOGGER.debug('Data received from {} at client_socket'.format(address))
if address not in drones:
continue
drones[address]['responses'].append(data)
except Exception as e:
Tello.LOGGER.error(e)
break
def udp_state_receiver():
"""Setup state UDP receiver. This method listens for state information from
Tello. Must be run from a background thread in order to not block
the main thread.
Internal method, you normally wouldn't call this yourself.
"""
state_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
state_socket.bind(('', Tello.STATE_UDP_PORT))
while True:
try:
data, address = state_socket.recvfrom(1024)
address = address[0]
Tello.LOGGER.debug('Data received from {} at state_socket'.format(address))
if address not in drones:
continue
data = data.decode('ASCII')
drones[address]['state'] = Tello.parse_state(data)
except Exception as e:
Tello.LOGGER.error(e)
break