Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def begin(self, request: BeginRequest) -> BeginResponse:
return self._process.send(request, BeginResponse)
def ainvoke(
self, obj: Referenceable, method: str, args: Optional[List[Any]] = None
) -> Any:
if args is None:
args = []
promise = self.provider.begin(
BeginRequest(
objref=obj.__jsii_ref__,
method=method,
args=_make_reference_for_native(self, args),
)
)
if isinstance(promise, Callback):
promise = _callback_till_result(self, promise, BeginResponse)
callbacks = self.provider.callbacks(CallbacksRequest()).callbacks
while callbacks:
for callback in callbacks:
try:
result = _handle_callback(self, callback)
except Exception as exc:
# TODO: Maybe we want to print the whole traceback here?
complete = self.provider.complete(
CompleteRequest(cbid=callback.cbid, err=str(exc))
)
else:
complete = self.provider.complete(
CompleteRequest(cbid=callback.cbid, result=result)
)