Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def refresh_toc(self, refresh_done_callback, toc_cache):
"""
Initiate a refresh of the parameter TOC.
"""
self.toc = Toc()
toc_fetcher = TocFetcher(self.cf, ParamTocElement,
CRTPPort.PARAM, self.toc,
refresh_done_callback, toc_cache)
toc_fetcher.start()
def request_param_update(self, var_id):
"""Place a param update request on the queue"""
pk = CRTPPacket()
pk.set_header(CRTPPort.PARAM, READ_CHANNEL)
pk.data = struct.pack('
def set_value(self, complete_name, value):
"""
Set the value for the supplied parameter.
"""
element = self.toc.get_element_by_complete_name(complete_name)
if not element:
logger.warning("Cannot set value for [%s], it's not in the TOC!",
complete_name)
elif element.access == ParamTocElement.RO_ACCESS:
logger.debug("[%s] is read only, no trying to set value", complete_name)
else:
varid = element.ident
pk = CRTPPacket()
pk.set_header(CRTPPort.PARAM, WRITE_CHANNEL)
pk.data = struct.pack('
Set the value for the supplied parameter.
"""
element = self.toc.get_element_by_complete_name(complete_name)
if not element:
logger.warning("Cannot set value for [%s], it's not in the TOC!",
complete_name)
raise KeyError("{} not in param TOC".format(complete_name))
elif element.access == ParamTocElement.RO_ACCESS:
logger.debug("[%s] is read only, no trying to set value",
complete_name)
raise AttributeError("{} is read-only!".format(complete_name))
else:
varid = element.ident
pk = CRTPPacket()
pk.set_header(CRTPPort.PARAM, WRITE_CHANNEL)
pk.data = struct.pack('
def refresh_toc(self, refresh_done_callback, toc_cache):
"""
Initiate a refresh of the parameter TOC.
"""
toc_fetcher = TocFetcher(self.cf, ParamTocElement,
CRTPPort.PARAM, self.toc,
refresh_done_callback, toc_cache)
toc_fetcher.start()
def __init__(self, cf, updated_callback):
"""Initialize the thread"""
Thread.__init__(self)
self.setDaemon(True)
self.wait_lock = Lock()
self.cf = cf
self.updated_callback = updated_callback
self.request_queue = Queue()
self.cf.add_port_callback(CRTPPort.PARAM, self._new_packet_cb)
self._should_close = False
self._req_param = -1
def __init__(self, cf, updated_callback):
"""Initialize the thread"""
Thread.__init__(self)
self.setDaemon(True)
self.wait_lock = Lock()
self.cf = cf
self.updated_callback = updated_callback
self.request_queue = Queue()
self.cf.add_port_callback(CRTPPort.PARAM, self._new_packet_cb)
self._should_close = False
self._req_param = -1
def request_param_update(self, var_id):
"""Place a param update request on the queue"""
pk = CRTPPacket()
pk.set_header(CRTPPort.PARAM, READ_CHANNEL)
pk.data = struct.pack('