Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def remove(self, lbl: ProtocolLabel):
if lbl in self:
super().remove(lbl)
else:
logger.warning(lbl.name + " is not in set, so cant be removed")
def write_packets(self, packets, filename: str, sample_rate: int):
"""
:type packets: list of Message
:param filename:
:return:
"""
if os.path.isfile(filename):
logger.warning("{0} already exists. Overwriting it".format(filename))
with open(filename, "wb") as f:
f.write(self.build_global_header())
with open(filename, "ab") as f:
rel_time_offset_ns = 0
for pkt in packets:
f.write(self.build_packet(0, rel_time_offset_ns, pkt.decoded_bits_buffer))
rel_time_offset_ns = pkt.get_duration(sample_rate) * 10 ** 9
part_id = tag.get("participant_id", None)
message_type_id = tag.get("message_type_id", None)
self.modulator_index = int(tag.get("modulator_index", self.modulator_index))
self.pause = int(tag.get("pause", self.pause))
decoding_index = tag.get("decoding_index", None)
if decoding_index and decoders is not None:
try:
self.decoder = decoders[int(decoding_index)]
except IndexError:
pass
if part_id:
self.participant = Participant.find_matching(part_id, participants)
if self.participant is None:
logger.warning("No participant matched the id {0} from xml".format(part_id))
if message_type_id and message_types:
for message_type in message_types:
if message_type.id == message_type_id:
self.message_type = message_type
break
message_type_tag = tag.find("message_type")
if message_type_tag:
self.message_type = MessageType.from_xml(message_type_tag)
def gain(self, value):
try:
self.__dev.gain = value
except AttributeError as e:
logger.warning(str(e))
def value_type(self, value: int):
try:
self.__value_type = int(value)
except ValueError:
logger.warning("{} could not be cast to integer".format(value))
def stop_rx_mode(self, msg):
try:
self.parent_ctrl_conn.send(self.Command.STOP.name)
except (BrokenPipeError, OSError) as e:
logger.debug("Closing parent control connection: " + str(e))
logger.info("{0}: Stopping RX Mode: {1}".format(self.__class__.__name__, msg))
if hasattr(self, "receive_process") and self.receive_process.is_alive():
self.receive_process.join(self.JOIN_TIMEOUT)
if self.receive_process.is_alive():
logger.warning("{0}: Receive process is still alive, terminating it".format(self.__class__.__name__))
self.receive_process.terminate()
self.receive_process.join()
self.is_receiving = False
for connection in (self.parent_ctrl_conn, self.parent_data_conn, self.child_ctrl_conn, self.child_data_conn):
try:
connection.close()
except OSError as e:
logger.exception(e)
def set_device_bandwidth(self, bandwidth):
if self.bandwidth_is_adjustable:
super().set_device_bandwidth(bandwidth)
else:
logger.warning("Setting the bandwidth is not supported by your RTL-SDR driver version.")
result = []
plugin_dirs = [d for d in os.listdir(self.plugin_path) if os.path.isdir(os.path.join(self.plugin_path, d))]
settings = constants.SETTINGS
for d in plugin_dirs:
if d == "__pycache__":
continue
try:
class_module = self.load_plugin(d)
plugin = class_module()
plugin.plugin_path = os.path.join(self.plugin_path, plugin.name)
plugin.load_description()
plugin.enabled = settings.value(plugin.name, type=bool) if plugin.name in settings.allKeys() else False
result.append(plugin)
except ImportError as e:
logger.warning("Could not load plugin {0} ({1})".format(d, e))
continue
return result
def end(self, value: int):
try:
self.__end = int(value)
except ValueError:
logger.warning("{} could not be cast to integer".format(value))