Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def __aexit__(self, exc_type, exc, _tb):
try:
await iterm2.notifications.async_unsubscribe(
self.__connection, self.__token)
except iterm2.notifications.SubscriptionException:
pass
async def _listen(self):
connection = self.connection
self.tokens.append(await iterm2.notifications.subscribe_to_new_session_notification(connection, self.refresh))
self.tokens.append(await iterm2.notifications.subscribe_to_terminate_session_notification(connection, self.refresh))
self.tokens.append(await iterm2.notifications.subscribe_to_layout_change_notification(connection, self.refresh))
self.tokens.append(await iterm2.notifications.subscribe_to_focus_change_notification(connection, self._focus_change))
async def __aexit__(self, exc_type, exc, _tb):
try:
await iterm2.notifications.async_unsubscribe(
self.connection, self.token)
except iterm2.notifications.SubscriptionException:
pass
async def __aexit__(self, exc_type, exc, _tb):
try:
await iterm2.notifications.async_unsubscribe(
self.connection, self.__token)
except iterm2.notifications.SubscriptionException:
pass
return await func(**kwargs)
async def handle_rpc(connection, notif):
"""This gets run first."""
await generic_handle_rpc(wrapper, connection, notif)
func.rpc_token = (
await iterm2.notifications.
async_subscribe_to_server_originated_rpc_notification(
connection=connection,
callback=handle_rpc,
name=func.__name__,
arguments=signature.parameters.keys(),
timeout_seconds=timeout,
defaults=defaults,
role=iterm2.notifications.RPC_ROLE_STATUS_BAR_COMPONENT,
status_bar_component=component))
func.rpc_connection = connection
async def __aexit__(self, exc_type, exc, _tb):
try:
await iterm2.notifications.async_unsubscribe(
self.__connection, self.__token)
except iterm2.notifications.SubscriptionException:
pass
async def __aexit__(self, exc_type, exc, _tb):
try:
await iterm2.notifications.async_unsubscribe(
self.__connection, self.__token)
except iterm2.notifications.SubscriptionException:
pass
defaults[key] = value.default.name
async def wrapper(**kwargs):
"""handle_rpc->generic_handle_rpc->wrapper->func"""
# Fix up knobs to not be JSON
if "knobs" in kwargs:
knobs_json = kwargs["knobs"]
kwargs["knobs"] = json.loads(knobs_json)
return await func(**kwargs)
async def handle_rpc(connection, notif):
"""This gets run first."""
await generic_handle_rpc(wrapper, connection, notif)
func.rpc_token = (
await iterm2.notifications.
async_subscribe_to_server_originated_rpc_notification(
connection=connection,
callback=handle_rpc,
name=func.__name__,
arguments=signature.parameters.keys(),
timeout_seconds=timeout,
defaults=defaults,
role=iterm2.notifications.RPC_ROLE_STATUS_BAR_COMPONENT,
status_bar_component=component))
func.rpc_connection = connection
async def __aexit__(self, exc_type, exc, _tb):
try:
await iterm2.notifications.async_unsubscribe(
self.__connection, self.__token)
except iterm2.notifications.SubscriptionException:
pass