Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _send(self, msg, seq, args): # IO
data = brine.dump((msg, seq, args))
# GC might run while sending data
# if so, a BaseNetref.__del__ might be called
# BaseNetref.__del__ must call asyncreq,
# which will cause a deadlock
# Solution:
# Add the current request to a queue and let the thread that currently
# holds the sendlock send it when it's done with its current job.
# NOTE: Atomic list operations should be thread safe,
# please call me out if they are not on all implementations!
self._send_queue.append(data)
# It is crucial to check the queue each time AFTER releasing the lock:
while self._send_queue:
if not self._sendlock.acquire(False):
# Another thread holds the lock. It will send the data after
# it's done with its current job. We can safely return.
return
def _send(self, msg, seq, args):
data = brine.dump((msg, seq, args))
self._sendlock.acquire()
try:
self._channel.send(data)
finally:
self._sendlock.release()
def _send_request(self, handler, args):
def _send(self, msg, seq, args):
data = brine.dump((msg, seq, args))
self._sendlock.acquire()
try:
self._channel.send(data)
finally:
self._sendlock.release()
def _send_request(self, handler, args):
if typ is StopIteration:
return consts.EXC_STOP_ITERATION # optimization
if type(typ) is str:
return typ
if include_local_traceback:
tbtext = "".join(traceback.format_exception(typ, val, tb))
else:
tbtext = ""
attrs = []
args = []
ignored_attrs = frozenset(["_remote_tb", "with_traceback"])
for name in dir(val):
if name == "args":
for a in val.args:
if brine.dumpable(a):
args.append(a)
else:
args.append(repr(a))
elif name.startswith("_") or name in ignored_attrs:
continue
else:
try:
attrval = getattr(val, name)
except AttributeError:
# skip this attr. see issue #108
continue
if not brine.dumpable(attrval):
attrval = repr(attrval)
attrs.append((name, attrval))
return (typ.__module__, typ.__name__), tuple(args), tuple(attrs), tbtext
def dump(obj):
stream = []
rpyc.core.brine._dump(obj, stream)
return b"".join(map(bytes, stream))
def register(self, aliases, port, interface=""):
self.logger.info("registering on %s:%s", self.ip, self.port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
with closing(sock):
sock.bind((interface, 0))
sock.settimeout(self.timeout)
data = brine.dump(("RPYC", "REGISTER", (aliases, port)))
try:
sock.connect((self.ip, self.port))
sock.send(data)
except (socket.error, socket.timeout):
self.logger.warn("could not connect to registry")
return False
try:
data = sock.recv(MAX_DGRAM_SIZE)
except socket.timeout:
self.logger.warn("registry did not acknowledge")
return False
try:
reply = brine.load(data)
except Exception:
self.logger.warn("received corrupted data from registry")
return False
for op, arg in decode_codeobj(cobj):
if op in ("LOAD_GLOBAL", "STORE_GLOBAL", "DELETE_GLOBAL"):
if arg not in __builtin__.__dict__:
raise TypeError("Cannot export a function with non-builtin globals: %r" % (arg,))
if is_py3k:
exported = (cobj.co_argcount, cobj.co_kwonlyargcount, cobj.co_nlocals, cobj.co_stacksize, cobj.co_flags,
cobj.co_code, tuple(consts2), cobj.co_names, cobj.co_varnames, cobj.co_filename,
cobj.co_name, cobj.co_firstlineno, cobj.co_lnotab, cobj.co_freevars, cobj.co_cellvars)
else:
exported = (cobj.co_argcount, cobj.co_nlocals, cobj.co_stacksize, cobj.co_flags,
cobj.co_code, tuple(consts2), cobj.co_names, cobj.co_varnames, cobj.co_filename,
cobj.co_name, cobj.co_firstlineno, cobj.co_lnotab, cobj.co_freevars, cobj.co_cellvars)
assert brine.dumpable(exported)
return (CODEOBJ_MAGIC, exported)