Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def rpc_method(*args):
try:
self._sock = xmlrpclib_custom.TimeoutServerProxy(
self._url, allow_none=True,
timeout=self._connector.timeout)
sock_method = getattr(self._sock, method, False)
return sock_method(*args)
#NOTE: exception raised with these kind of requests:
# - execute('fake.model', 'search', [])
# - execute('sale.order', 'fake_method')
except xmlrpclib.Fault as exc:
# faultCode: error message
# faultString: Server traceback (following the server version
# used, a bad request can produce a server traceback, or not).
raise error.ConnectorError(exc.faultCode, exc.faultString)
#TODO NEED TEST (when is raised this exception?)
except xmlrpclib.Error as exc:
raise error.ConnectorError(' - '.join(exc.args))
return rpc_method
def __init__(self, host, port=8069, timeout=120, version=None):
self.host = host
try:
int(port)
except ValueError:
txt = "The port '{0}' is invalid. An integer is required."
txt = txt.format(port)
raise error.ConnectorError(txt)
else:
self.port = int(port)
self._timeout = timeout
self.version = version
def __init__(self, host, port=8069, timeout=120, version=None):
self.host = host
try:
int(port)
except (ValueError, TypeError):
txt = "The port '{0}' is invalid. An integer is required."
txt = txt.format(port)
raise error.ConnectorError(txt)
else:
self.port = int(port)
self._timeout = timeout
self.version = version
def rpc_method(*args):
try:
self._sock = xmlrpclib_custom.TimeoutServerProxy(
self._url, allow_none=True,
timeout=self._connector.timeout)
sock_method = getattr(self._sock, method, False)
return sock_method(*args)
#NOTE: exception raised with these kind of requests:
# - execute('fake.model', 'search', [])
# - execute('sale.order', 'fake_method')
except xmlrpclib.Fault as exc:
# faultCode: error message
# faultString: Server traceback (following the server version
# used, a bad request can produce a server traceback, or not).
raise error.ConnectorError(exc.faultCode, exc.faultString)
#TODO NEED TEST (when is raised this exception?)
except xmlrpclib.Error as exc:
raise error.ConnectorError(' - '.join(exc.args))
return rpc_method