Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
_STATUS_NS_NOT_RUNNING = ("#FFFF00", "Nameserver Error", "Pyro4 Nameserver not running")
_STATUS_ERR_NS_RUNNING = ("#FF0000", "Nameserver Exception", "Exception when checking if Pyro4 Nameserver is running")
_STATUS_NS_NOT_LISTED = ("#FFFF00", "Nameserver Error", "Pyro4 PyXRD server not listed")
_STATUS_ERR_NS_LISTED = ("#FF0000", "Nameserver Exception", "Exception when checking if Pyro4 PyXRD server is listed")
_STATUS_NO_CONN_PYXRD_SERVER = ("#FFFF00", "PyXRD Server Error", "Cannot connect to Pyro4 PyXRD server")
_STATUS_ERR_CONN_PYXRD_SERVER = ("#FF0000", "PyXRD Server Exception", "Exception when connecting to Pyro4 PyXRD server")
_STATUS_SUCCESS = ("#00FF00", "Connected (Pyro4)", "Succesfully connected to Pyro4 PyXRD Server")
_updater = None
status = _STATUS_NS_NOT_RUNNING
status_lock = RLock()
NS_CACHE_TIMEOUT = 30
ns = None
ns_ts = 0
@classmethod
def _locate_ns(cls):
if cls.ns is None or time.time() - cls.ns_ts >= cls.NS_CACHE_TIMEOUT:
cls.ns = Pyro4.naming.locateNS()
cls.ns_ts = time.time()
return cls.ns
PROXY_CACHE_TIMEOUT = 30
proxy = None
proxy_ts = 0
@classmethod
def _get_proxy(cls):
def __init__(self, *args, **kwargs):
super(Model, self).__init__(*args, **kwargs)
self._prop_lock = RLock() # @UndefinedVariable
self.__observers = WeakList()
self.__observer_threads = WeakKeyDictionary()
# keys are properties names, values are pairs (method,
# kwargs|None) inside the observer. kwargs is the keyword
# argument possibly specified when explicitly defining the
# notification method in observers, and it is used to build
# the NTInfo instance passed down when the notification method
# is invoked. If kwargs is None (special case), the
# notification method is "old style" (property__...) and
# won't be receiving the property name.
self.__value_notifications = {}
self.__instance_notif_before = {}
self.__instance_notif_after = {}
self.__signal_notif = {}