Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def start(self):
"""
Start the update loop.
.. note:: The updates will not happen until the control reaches
the Qt event loop.
"""
if self.__state != SignalManager.Running:
self.__state = SignalManager.Running
self.stateChanged.emit(SignalManager.Running)
self._update()
def setScheme(self, scheme):
"""
Set the :class:`~.scheme.Scheme` instance to display/edit.
"""
if self.__scheme is not scheme:
if self.__scheme:
self.__scheme.title_changed.disconnect(self.titleChanged)
self.__scheme.removeEventFilter(self)
sm = self.__scheme.findChild(signalmanager.SignalManager)
if sm:
sm.stateChanged.disconnect(self.__signalManagerStateChanged)
self.__scheme = scheme
self.setPath("")
if self.__scheme:
self.__scheme.title_changed.connect(self.titleChanged)
self.titleChanged.emit(scheme.title)
self.__cleanProperties = node_properties(scheme)
sm = scheme.findChild(signalmanager.SignalManager)
if sm:
sm.stateChanged.connect(self.__signalManagerStateChanged)
else:
self.__cleanProperties = []
def _can_process(self):
"""
Return a bool indicating if the manger can enter the main
processing loop.
"""
return self.__state not in [SignalManager.Error, SignalManager.Stoped]
def set_scheme(self, scheme):
"""
Set the :class:`WidgetsScheme` instance to manage.
"""
self.__scheme = scheme
self.__signal_manager = scheme.findChild(SignalManager)
self.__signal_manager.processingStarted[SchemeNode].connect(
self.__on_processing_started
)
self.__signal_manager.processingFinished[SchemeNode].connect(
self.__on_processing_finished
)
scheme.node_added.connect(self.add_widget_for_node)
scheme.node_removed.connect(self.remove_widget_for_node)
scheme.runtime_env_changed.connect(self.__on_env_changed)
scheme.installEventFilter(self)
return
try:
channel = node.output_channel(channelname)
except ValueError:
log.error(
"%r is not valid signal name for %r", channelname, node.description.name
)
return
# Expand the signal_id with the unique widget id and the
# channel name. This is needed for OWBaseWidget's input
# handlers (Multiple flag).
signal_id = (widget.widget_id, channelname, signal_id)
SignalManager.send(self, node, channel, value, signal_id)
# being empty). There is nothing really to do in this
# case.
state = None
if state == SignalManager.Processing:
log.info(
"Deferring a 'DeferredDelete' event for the Scheme "
"instance until SignalManager exits the current "
"update loop."
)
event.setAccepted(False)
self.processingFinished.connect(self.scheme().deleteLater)
self.stop()
return True
return SignalManager.eventFilter(self, receiver, event)
def eventFilter(self, receiver, event):
if event.type() == QEvent.DeferredDelete and receiver is self.scheme():
try:
state = self.runtime_state()
except AttributeError:
# If the scheme (which is a parent of this object) is
# already being deleted the SignalManager can also be in
# the process of destruction (noticeable by its __dict__
# being empty). There is nothing really to do in this
# case.
state = None
if state == SignalManager.Processing:
log.info(
"Deferring a 'DeferredDelete' event for the Scheme "
"instance until SignalManager exits the current "
"update loop."
)
event.setAccepted(False)
self.processingFinished.connect(self.scheme().deleteLater)
self.stop()
return True
return SignalManager.eventFilter(self, receiver, event)
def onchanged(state):
# Update widget creation policy based on signal manager's state
if state == SignalManager.Running:
self.widget_manager.set_creation_policy(WidgetManager.Normal)
else:
self.widget_manager.set_creation_policy(WidgetManager.OnDemand)
log.info("SignalManager: Processing queued signals")
node_update_front = self.node_update_front()
log.debug(
"SignalManager: Nodes eligible for update %s",
[node.title for node in node_update_front],
)
if node_update_front:
node = node_update_front[0]
self._set_runtime_state(SignalManager.Processing)
try:
self.process_node(node)
finally:
self._set_runtime_state(SignalManager.Waiting)