Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def ainvoke(
self, obj: Referenceable, method: str, args: Optional[List[Any]] = None
) -> Any:
if args is None:
args = []
promise = self.provider.begin(
BeginRequest(
objref=obj.__jsii_ref__,
method=method,
args=_make_reference_for_native(self, args),
)
)
if isinstance(promise, Callback):
promise = _callback_till_result(self, promise, BeginResponse)
callbacks = self.provider.callbacks(CallbacksRequest()).callbacks
while callbacks:
for callback in callbacks:
try:
result = _handle_callback(self, callback)
except Exception as exc:
# TODO: Maybe we want to print the whole traceback here?
complete = self.provider.complete(
SetRequest, _with_api_key("set", self._serializer.unstructure_attrs_asdict)
)
self._serializer.register_unstructure_hook(
StaticSetRequest,
_with_api_key("sset", self._serializer.unstructure_attrs_asdict),
)
self._serializer.register_unstructure_hook(
InvokeRequest,
_with_api_key("invoke", self._serializer.unstructure_attrs_asdict),
)
self._serializer.register_unstructure_hook(
StaticInvokeRequest,
_with_api_key("sinvoke", self._serializer.unstructure_attrs_asdict),
)
self._serializer.register_unstructure_hook(
BeginRequest,
_with_api_key("begin", self._serializer.unstructure_attrs_asdict),
)
self._serializer.register_unstructure_hook(
EndRequest, _with_api_key("end", self._serializer.unstructure_attrs_asdict)
)
self._serializer.register_unstructure_hook(
CallbacksRequest,
_with_api_key("callbacks", self._serializer.unstructure_attrs_asdict),
)
self._serializer.register_unstructure_hook(
CompleteRequest,
_with_api_key("complete", self._serializer.unstructure_attrs_asdict),
)
self._serializer.register_unstructure_hook(
StatsRequest,
_with_api_key("stats", self._serializer.unstructure_attrs_asdict),