Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def StreamUnary(self, stream):
raise GRPCError(Status.UNIMPLEMENTED)
async def test_error_before_send_initial_metadata(stream, stub):
async with stream:
raise Exception()
assert stub.__events__ == [
SendHeaders(
[(':status', '200'),
('content-type', 'application/grpc+proto'),
('grpc-status', str(Status.UNKNOWN.value)),
('grpc-message', 'Internal Server Error')],
end_stream=True,
),
Reset(ErrorCodes.NO_ERROR),
]
) -> AsyncIterator[InstallRequest]:
if destination == InstallRequest.APP:
if path.endswith(".ipa"):
return _generate_ipa_chunks(ipa_path=path, logger=logger)
elif path.endswith(".app"):
return _generate_app_chunks(app_path=path, logger=logger)
elif destination == InstallRequest.XCTEST:
return _generate_xctest_chunks(path=path, logger=logger)
elif destination == InstallRequest.DYLIB:
return _generate_dylib_chunks(path=path, logger=logger)
elif destination == InstallRequest.DSYM:
return _generate_dsym_chunks(path=path, logger=logger)
elif destination == InstallRequest.FRAMEWORK:
return _generate_framework_chunks(path=path, logger=logger)
raise GRPCError(
status=Status(Status.FAILED_PRECONDITION),
message=f"install invalid for {path} {destination}",
)
def _raise_for_grpc_status(self, headers_map: Dict[str, str]) -> None:
grpc_status = headers_map.get('grpc-status')
if grpc_status is None:
raise GRPCError(Status.UNKNOWN, 'Missing grpc-status header')
try:
status = Status(int(grpc_status))
except ValueError:
raise GRPCError(Status.UNKNOWN, ('Invalid grpc-status: {!r}'
.format(grpc_status)))
else:
if status is not Status.OK:
message = headers_map.get('grpc-message')
if message is not None:
message = decode_grpc_message(message)
details = None
if self._status_details_codec is not None:
details_bin = headers_map.get(_STATUS_DETAILS_KEY)
if details_bin is not None:
details = self._status_details_codec.decode(
status, message,
decode_bin_value(details_bin.encode('ascii'))
)
raise GRPCError(status, message, details)