Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
pass
class AppStartError(Error):
"""Raised when the app is not able to be started."""
class AppRestoreConnectionError(Error):
"""Raised when failed to restore app from disconnection."""
class ApiError(Error):
"""Raised when remote API reports an error."""
class ProtocolError(Error):
"""Raised when there is some error in exchanging data with server."""
NO_RESPONSE_FROM_HANDSHAKE = 'No response from handshake.'
NO_RESPONSE_FROM_SERVER = 'No response from server.'
MISMATCHED_API_ID = 'RPC request-response ID mismatch.'
class JsonRpcCommand(object):
"""Commands that can be invoked on all jsonrpc clients.
INIT: Initializes a new session.
CONTINUE: Creates a connection.
"""
INIT = 'initiate'
CONTINUE = 'continue'
def _client_receive(self):
"""Receives the server's response of an Rpc message.
Returns:
Raw byte string of the response.
Raises:
Error: a socket error occurred during the read.
"""
try:
response = self._client.readline()
self.log.debug('Snippet received: %s', response)
return response
except socket.error as e:
raise Error(
self._ad,
'Encountered socket error reading RPC response "%s"' % e)
def _client_send(self, msg):
"""Sends an Rpc message through the connection.
Args:
msg: string, the message to send.
Raises:
Error: a socket error occurred during the send.
"""
try:
self._client.write(msg.encode("utf8") + b'\n')
self._client.flush()
self.log.debug('Snippet sent %s.', msg)
except socket.error as e:
raise Error(
self._ad,
'Encountered socket error "%s" sending RPC message "%s"' %
(e, msg))
# UID of the 'unknown' jsonrpc session. Will cause creation of a new session.
UNKNOWN_UID = -1
# Maximum time to wait for the socket to open on the device.
_SOCKET_CONNECTION_TIMEOUT = 60
# Maximum time to wait for a response message on the socket.
_SOCKET_READ_TIMEOUT = callback_handler.MAX_TIMEOUT
class Error(errors.DeviceError):
pass
class AppStartError(Error):
"""Raised when the app is not able to be started."""
class AppRestoreConnectionError(Error):
"""Raised when failed to restore app from disconnection."""
class ApiError(Error):
"""Raised when remote API reports an error."""
class ProtocolError(Error):
"""Raised when there is some error in exchanging data with server."""
NO_RESPONSE_FROM_HANDSHAKE = 'No response from handshake.'
NO_RESPONSE_FROM_SERVER = 'No response from server.'
MISMATCHED_API_ID = 'RPC request-response ID mismatch.'
# Maximum time to wait for the socket to open on the device.
_SOCKET_CONNECTION_TIMEOUT = 60
# Maximum time to wait for a response message on the socket.
_SOCKET_READ_TIMEOUT = callback_handler.MAX_TIMEOUT
class Error(errors.DeviceError):
pass
class AppStartError(Error):
"""Raised when the app is not able to be started."""
class AppRestoreConnectionError(Error):
"""Raised when failed to restore app from disconnection."""
class ApiError(Error):
"""Raised when remote API reports an error."""
class ProtocolError(Error):
"""Raised when there is some error in exchanging data with server."""
NO_RESPONSE_FROM_HANDSHAKE = 'No response from handshake.'
NO_RESPONSE_FROM_SERVER = 'No response from server.'
MISMATCHED_API_ID = 'RPC request-response ID mismatch.'
class JsonRpcCommand(object):
"""Commands that can be invoked on all jsonrpc clients.