Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:throws: :class:`~iterm2.rpc.RPCException` if something goes wrong.
"""
response = await iterm2.rpc.async_invoke_function(
self.connection,
invocation,
tab_id=self.tab_id,
timeout=timeout)
which = response.invoke_function_response.WhichOneof('disposition')
if which == 'error':
# pylint: disable=no-member
if (response.invoke_function_response.error.status ==
iterm2.api_pb2.InvokeFunctionResponse.Status.
Value("TIMEOUT")):
raise iterm2.rpc.RPCException("Timeout")
raise iterm2.rpc.RPCException("{}: {}".format(
iterm2.api_pb2.InvokeFunctionResponse.Status.Name(
response.invoke_function_response.error.status),
response.invoke_function_response.error.error_reason))
return json.loads(
response.invoke_function_response.success.json_result)
:returns: The result of the invocation if successful.
:throws: :class:`~iterm2.rpc.RPCException` if something goes wrong.
"""
response = await iterm2.rpc.async_invoke_function(
connection,
invocation,
timeout=timeout)
which = response.invoke_function_response.WhichOneof('disposition')
if which == 'error':
# pylint: disable=no-member
if (response.invoke_function_response.error.status ==
iterm2.api_pb2.InvokeFunctionResponse.Status.Value("TIMEOUT")):
raise iterm2.rpc.RPCException("Timeout")
raise iterm2.rpc.RPCException("{}: {}".format(
iterm2.api_pb2.InvokeFunctionResponse.Status.Name(
response.invoke_function_response.error.status),
response.invoke_function_response.error.error_reason))
return json.loads(response.invoke_function_response.success.json_result)
:throws: :class:`~iterm2.rpc.RPCException` if something goes wrong.
"""
response = await iterm2.rpc.async_invoke_function(
self.connection,
invocation,
session_id=self.session_id,
timeout=timeout)
which = response.invoke_function_response.WhichOneof('disposition')
# pylint: disable=no-member
if which == 'error':
if (response.invoke_function_response.error.status ==
iterm2.api_pb2.InvokeFunctionResponse.Status.
Value("TIMEOUT")):
raise iterm2.rpc.RPCException("Timeout")
raise iterm2.rpc.RPCException("{}: {}".format(
iterm2.api_pb2.InvokeFunctionResponse.Status.Name(
response.invoke_function_response.error.status),
response.invoke_function_response.error.error_reason))
return json.loads(
response.invoke_function_response.success.json_result)
async def async_invoke_method(connection, receiver, invocation, timeout):
"""Convenience wrapper around async_invoke_function for methods."""
assert receiver
response = await iterm2.rpc.async_invoke_function(
connection,
invocation,
receiver=receiver,
timeout=timeout)
which = response.invoke_function_response.WhichOneof('disposition')
if which == 'error':
if (response.invoke_function_response.error.status ==
iterm2.api_pb2.InvokeFunctionResponse.Status.Value("TIMEOUT")):
raise iterm2.rpc.RPCException("Timeout")
raise iterm2.rpc.RPCException("{}: {}".format(
iterm2.api_pb2.InvokeFunctionResponse.Status.Name(
response.invoke_function_response.error.status),
response.invoke_function_response.error.error_reason))
return json.loads(response.invoke_function_response.success.json_result)
:throws: :class:`~iterm2.rpc.RPCException` if something goes wrong.
"""
response = await iterm2.rpc.async_invoke_function(
self.connection,
invocation,
window_id=self.window_id,
timeout=timeout)
which = response.invoke_function_response.WhichOneof('disposition')
# pylint: disable=no-member
if which == 'error':
if (response.invoke_function_response.error.status ==
iterm2.api_pb2.InvokeFunctionResponse.Status.Value(
"TIMEOUT")):
raise iterm2.rpc.RPCException("Timeout")
raise iterm2.rpc.RPCException("{}: {}".format(
iterm2.api_pb2.InvokeFunctionResponse.Status.Name(
response.invoke_function_response.error.status),
response.invoke_function_response.error.error_reason))
return json.loads(
response.invoke_function_response.success.json_result)