Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def listen(self, address, async=False):
"""Listen on given address.
:param address: A valid IPv4 address we can bind to.
"""
fut = Future(_qipy.qi_session_listen(self._session, address))
if async:
return fut
if fut.has_error():
#todo: another error?
raise ConnectionError('Cannot listen on ' + address + ': ' + fut.error())
def services(self, async=False):
"""Retrieves the list of the names of all the services available on the
service directory."""
fut = Future(_qipy.qi_session_get_services(self._session))
if async:
return fut
return fut.value()
def connect(self, address, async=False):
""" Connect to service directory.
:raises: ConnectionError
"""
fut = Future(_qipy.qi_session_connect(self._session, address))
if async:
return fut
if fut.has_error():
raise ConnectionError('Cannot connect to ' + address + ': ' + fut.error())
def register_service(self, name, obj, async=False):
"""Register given service and expose it to the world.
:param name: Name of the service to register.
:param obj: The actual service to bind.
:returns: The id of the service.
"""
#TODO
fut = Future(_qipy.qi_session_register_service(self._session, name, obj._obj))
if async:
return fut
return fut.value()
def unregister_service(self, idx, async=False):
"""Unregister service, it is not visible anymore.
:param idx: id of the service given on register.
"""
#TODO
fut = Future(_qipy.qi_session_unregister_service(self._session, idx))
if async:
return fut
return fut.value()
def service(self, name, default=None, async=False):
"""Fetches a new instance of GenericObject binding to a service. If the
service is unknown, returns *default*.
:param name: Name of the service.
:param default: Default value if service is unknown.
"""
if not isinstance(name, str):
raise TypeError('keys must be strings.')
# Get C object.
fut = Future(_qipy.qi_session_get_service(self._session, name))
if async:
return fut
return fut.value()