Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'allow_pickle': True,
'allow_getattr': True,
'allow_setattr': True,
'allow_delattr': True
}
retry = 10
delay = 0.5
success = False
while retry > 0:
if success:
break
try:
stream_r = rpyc.SocketStream.connect('127.0.0.1', port, timeout=5.0)
conn_read = rpyc.connect_stream(stream_r, config=config)
conn_read.root.hello()
logDebug('Connected to ClearCase Service for `{}`, operation `read`.'.format(user_view))
success = True
except Exception as exp_err:
logWarning('Cannot connect to ClearCase Service for `{}`!'\
'Exception: `{}`! Retry...'.format(user_view, exp_err))
if success:
try:
stream_w = rpyc.SocketStream.connect('127.0.0.1', port, timeout=5.0)
conn_write = rpyc.connect_stream(stream_w, config=config)
conn_write.root.hello()
logDebug('Connected to ClearCase Service for `{}`, operation `write`.'.format(user_view))
break
except Exception as exp_err:
def connect(host, port, service = VoidService, config = {}, ipv6 = False, keepalive = False):
"""
creates a socket-connection to the given host and port
:param host: the hostname to connect to
:param port: the TCP port to use
:param service: the local service to expose (defaults to Void)
:param config: configuration dict
:param ipv6: whether to use IPv6 or not
:returns: an RPyC connection
"""
s = SocketStream.connect(host, port, ipv6 = ipv6, keepalive = keepalive)
return connect_stream(s, service, config)
def connect(host, port, service = VoidService, config = {}, ipv6 = False, keepalive = None):
"""
creates a socket-connection to the given host and port
:param host: the hostname to connect to
:param port: the TCP port to use
:param service: the local service to expose (defaults to Void)
:param config: configuration dict
:param ipv6: whether to use IPv6 or not
:returns: an RPyC connection
"""
s = SocketStream.connect(host, port, ipv6 = ipv6, keepalive = keepalive)
return connect_stream(s, service, config)
def __init__(self, port, password):
channel = rpyc.Channel(rpyc.SocketStream.connect('127.0.0.1', port))
channel.send(password.encode('utf-8'))
response = channel.recv()
if response == AUTH_ERROR:
# TODO: What to raise here. I guess we create a custom error
raise ValueError('Invalid password for daemon')
self.conn = rpyc.utils.factory.connect_channel(
channel, service=ClientService, config={'sync_request_timeout': None}
)
if success:
break
try:
stream_r = rpyc.SocketStream.connect('127.0.0.1', port, timeout=5.0)
conn_read = rpyc.connect_stream(stream_r, config=config)
conn_read.root.hello()
logDebug('Connected to ClearCase Service for `{}`, operation `read`.'.format(user_view))
success = True
except Exception as exp_err:
logWarning('Cannot connect to ClearCase Service for `{}`!'\
'Exception: `{}`! Retry...'.format(user_view, exp_err))
if success:
try:
stream_w = rpyc.SocketStream.connect('127.0.0.1', port, timeout=5.0)
conn_write = rpyc.connect_stream(stream_w, config=config)
conn_write.root.hello()
logDebug('Connected to ClearCase Service for `{}`, operation `write`.'.format(user_view))
break
except Exception as exp_err:
logWarning('Cannot connect to ClearCase Service \
for `{}`! Exception: `{}`! Retry...'.\
format(user_view, exp_err))
success = False
time.sleep(delay)
retry -= 1
delay += 0.75
if not success:
logError('Error on starting ClearCase Service for `{}`!'.format(user_view))