Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
logger = logging.getLogger(__name__)
class State:
"""Stat of the connection procedure"""
DISCONNECTED = 0
INITIALIZED = 1
CONNECTED = 2
SETUP_FINISHED = 3
class Crazyflie():
"""The Crazyflie class"""
# Called on disconnect, no matter the reason
disconnected = Caller()
# Called on unintentional disconnect only
connection_lost = Caller()
# Called when the first packet in a new link is received
link_established = Caller()
# Called when the user requests a connection
connection_requested = Caller()
# Called when the link is established and the TOCs (that are not cached)
# have been downloaded
connected = Caller()
# Called if establishing of the link fails (i.e times out)
connection_failed = Caller()
# Called for every packet received
packet_received = Caller()
# Called for every packet sent
packet_sent = Caller()
# Called when the link driver updates the link quality measurement
def __init__(self, period, callback):
self._callbacks = Caller()
self._callbacks.add_callback(callback)
self._started = False
self._period = period
self._thread = None
class Crazyflie():
"""The Crazyflie class"""
# Called on disconnect, no matter the reason
disconnected = Caller()
# Called on unintentional disconnect only
connection_lost = Caller()
# Called when the first packet in a new link is received
link_established = Caller()
# Called when the user requests a connection
connection_requested = Caller()
# Called when the link is established and the TOCs (that are not cached)
# have been downloaded
connected = Caller()
# Called if establishing of the link fails (i.e times out)
connection_failed = Caller()
# Called for every packet received
packet_received = Caller()
# Called for every packet sent
packet_sent = Caller()
# Called when the link driver updates the link quality measurement
link_quality_updated = Caller()
state = State.DISCONNECTED
def __init__(self, link=None, ro_cache=None, rw_cache=None):
"""
Create the objects from this module and register callbacks.
ro_cache -- Path to read-only cache (string)
from cflib.utils.callbacks import Caller
class State:
"""Stat of the connection procedure"""
DISCONNECTED = 0
INITIALIZED = 1
CONNECTED = 2
SETUP_FINISHED = 3
class Crazyflie():
"""The Crazyflie class"""
# Called on disconnect, no matter the reason
disconnected = Caller()
# Called on unintentional disconnect only
connection_lost = Caller()
# Called when the first packet in a new link is received
link_established = Caller()
# Called when the user requests a connection
connection_requested = Caller()
# Called when the link is established and the TOCs (that are not cached)
# have been downloaded
connected = Caller()
# Called if establishing of the link fails (i.e times out)
connection_failed = Caller()
# Called for every packet received
packet_received = Caller()
# Called for every packet sent
packet_sent = Caller()
# Called when the link driver updates the link quality measurement
connection_lost = Caller()
# Called when the first packet in a new link is received
link_established = Caller()
# Called when the user requests a connection
connection_requested = Caller()
# Called when the link is established and the TOCs (that are not cached)
# have been downloaded
connected = Caller()
# Called if establishing of the link fails (i.e times out)
connection_failed = Caller()
# Called for every packet received
packet_received = Caller()
# Called for every packet sent
packet_sent = Caller()
# Called when the link driver updates the link quality measurement
link_quality_updated = Caller()
state = State.DISCONNECTED
def __init__(self, link=None, ro_cache=None, rw_cache=None):
"""
Create the objects from this module and register callbacks.
ro_cache -- Path to read-only cache (string)
rw_cache -- Path to read-write cache (string)
"""
self.link = link
self._toc_cache = TocCache(ro_cache=ro_cache,
rw_cache=rw_cache)
self.incoming = _IncomingPacketHandler(self)
self.incoming.setDaemon(True)
# Called on disconnect, no matter the reason
disconnected = Caller()
# Called on unintentional disconnect only
connection_lost = Caller()
# Called when the first packet in a new link is received
link_established = Caller()
# Called when the user requests a connection
connection_requested = Caller()
# Called when the link is established and the TOCs (that are not cached)
# have been downloaded
connected = Caller()
# Called if establishing of the link fails (i.e times out)
connection_failed = Caller()
# Called for every packet received
packet_received = Caller()
# Called for every packet sent
packet_sent = Caller()
# Called when the link driver updates the link quality measurement
link_quality_updated = Caller()
state = State.DISCONNECTED
def __init__(self, link=None, ro_cache=None, rw_cache=None):
"""
Create the objects from this module and register callbacks.
ro_cache -- Path to read-only cache (string)
rw_cache -- Path to read-write cache (string)
"""
self.link = link
self._toc_cache = TocCache(ro_cache=ro_cache,
def __init__(self, name, period_in_ms):
"""Initialize the entry"""
self.data_received_cb = Caller()
self.error_cb = Caller()
self.started_cb = Caller()
self.added_cb = Caller()
self.err_no = 0
self.id = LogConfig._config_id_counter
LogConfig._config_id_counter = (LogConfig._config_id_counter + 1) % 255
self.cf = None
self.period = int(period_in_ms / 10)
self.period_in_ms = period_in_ms
self._added = False
self._started = False
self.valid = False
self.variables = []
self.default_fetch_as = []
self.name = name
if not os.path.exists(ConfigManager().configs_dir):
logger.info("No user config found, copying dist files")
os.makedirs(ConfigManager().configs_dir)
for f in glob.glob(
cfclient.module_path + "/configs/input/[A-Za-z]*.json"):
dest = os.path.join(ConfigManager().
configs_dir, os.path.basename(f))
if not os.path.isfile(dest):
logger.debug("Copying %s", f)
shutil.copy2(f, ConfigManager().configs_dir)
ConfigManager().get_list_of_configs()
self.input_updated = Caller()
self.assisted_input_updated = Caller()
self.heighthold_input_updated = Caller()
self.hover_input_updated = Caller()
self.rp_trim_updated = Caller()
self.emergency_stop_updated = Caller()
self.device_discovery = Caller()
self.device_error = Caller()
self.assisted_control_updated = Caller()
self.alt1_updated = Caller()
self.alt2_updated = Caller()
# Call with 3 bools (rp_limiting, yaw_limiting, thrust_limiting)
self.limiting_updated = Caller()
cfclient.module_path + "/configs/input/[A-Za-z]*.json"):
dest = os.path.join(ConfigManager().
configs_dir, os.path.basename(f))
if not os.path.isfile(dest):
logger.debug("Copying %s", f)
shutil.copy2(f, ConfigManager().configs_dir)
ConfigManager().get_list_of_configs()
self.input_updated = Caller()
self.assisted_input_updated = Caller()
self.heighthold_input_updated = Caller()
self.hover_input_updated = Caller()
self.rp_trim_updated = Caller()
self.emergency_stop_updated = Caller()
self.device_discovery = Caller()
self.device_error = Caller()
self.assisted_control_updated = Caller()
self.alt1_updated = Caller()
self.alt2_updated = Caller()
# Call with 3 bools (rp_limiting, yaw_limiting, thrust_limiting)
self.limiting_updated = Caller()
def __init__(self, crazyflie=None):
self.log_blocks = []
# Called with newly created blocks
self.block_added_cb = Caller()
self.cf = crazyflie
self.toc = None
self.cf.add_port_callback(CRTPPort.LOGGING, self._new_packet_cb)
self.toc_updated = Caller()
self.state = IDLE
self.fake_toc_crc = 0xDEADBEEF
self._refresh_callback = None
self._toc_cache = None