Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_infinite_retries_deadline_and_backoff(backoff):
service = _AlwaysUnavailable()
server = grpc_server(service.handler)
with default_channel() as channel:
interceptor = RetryInterceptor(max_retry_count=-1, retriable_codes=[grpc.StatusCode.UNAVAILABLE],
add_retry_count_to_header=True, back_off_func=backoff)
ch = grpc.intercept_channel(channel, interceptor)
client = zone_service_pb2_grpc.ZoneServiceStub(ch)
with pytest.raises(grpc.RpcError) as e:
client.Get(zone_service_pb2.GetZoneRequest(zone_id="id"), timeout=5)
assert e.value.code() == grpc.StatusCode.DEADLINE_EXCEEDED
server.stop(0)
def test_five_retries():
service = _FailFirstAttempts(5)
server = grpc_server(service.handler)
with default_channel() as channel:
for max_retry_count in range(4):
interceptor = RetryInterceptor(max_retry_count=max_retry_count)
ch = grpc.intercept_channel(channel, interceptor)
client = zone_service_pb2_grpc.ZoneServiceStub(ch)
with pytest.raises(grpc.RpcError) as e:
client.Get(zone_service_pb2.GetZoneRequest(zone_id="id"))
assert e.value.code() == grpc.StatusCode.UNAVAILABLE
service.reset(5)
interceptor = RetryInterceptor(max_retry_count=5)
ch = grpc.intercept_channel(channel, interceptor)
client = zone_service_pb2_grpc.ZoneServiceStub(ch)
res = client.Get(zone_service_pb2.GetZoneRequest(zone_id="id"))
assert res == DEFAULT_ZONE
def test_idempotency_token_not_changed():
token = str(uuid.uuid4())
service = _TokenUnchanged(token)
server = grpc_server(service.handler)
with default_channel() as channel:
interceptor = RetryInterceptor(max_retry_count=100, retriable_codes=[grpc.StatusCode.UNAVAILABLE],
add_retry_count_to_header=True)
ch = grpc.intercept_channel(channel, interceptor)
client = zone_service_pb2_grpc.ZoneServiceStub(ch)
with pytest.raises(grpc.RpcError) as e:
client.Get(zone_service_pb2.GetZoneRequest(zone_id="id"), metadata=[("idempotency-key", token)])
assert e.value.code() == grpc.StatusCode.UNAVAILABLE
assert not service.token_changed
server.stop(0)
service = _RetriableCodes(retriable_codes)
server = grpc_server(service.handler)
with default_channel() as channel:
for retry_qty in range(len(retriable_codes)):
interceptor = RetryInterceptor(max_retry_count=retry_qty, retriable_codes=retriable_codes)
ch = grpc.intercept_channel(channel, interceptor)
client = zone_service_pb2_grpc.ZoneServiceStub(ch)
with pytest.raises(grpc.RpcError) as e:
client.Get(zone_service_pb2.GetZoneRequest(zone_id="id"))
assert e.value.code() == retriable_codes[retry_qty]
service.reset_state()
interceptor = RetryInterceptor(max_retry_count=len(retriable_codes), retriable_codes=retriable_codes)
ch = grpc.intercept_channel(channel, interceptor)
client = zone_service_pb2_grpc.ZoneServiceStub(ch)
assert client.Get(zone_service_pb2.GetZoneRequest(zone_id="id")) == DEFAULT_ZONE
server.stop(0)
def test_header_token_and_retry_count():
service = _HeaderTokenAndRetryCount()
server = grpc_server(service.handler)
with default_channel() as channel:
interceptor = RetryInterceptor(max_retry_count=100, retriable_codes=[grpc.StatusCode.UNAVAILABLE],
add_retry_count_to_header=True)
ch = grpc.intercept_channel(channel, interceptor)
client = zone_service_pb2_grpc.ZoneServiceStub(ch)
with pytest.raises(grpc.RpcError) as e:
client.Get(zone_service_pb2.GetZoneRequest(zone_id="id"))
assert e.value.code() == grpc.StatusCode.UNAVAILABLE
assert not service.error
server.stop(0)
def main(product_id, sku_id, quantity, timestamp=None, uuid=None):
# NOTE: IAM token will be taken automatically from metadata agent of VM
interceptor = yandexcloud.RetryInterceptor(max_retry_count=5, retriable_codes=[grpc.StatusCode.UNAVAILABLE])
sdk = yandexcloud.SDK(interceptor=interceptor)
service = sdk.client(ImageProductUsageServiceStub)
request = build_product_usage_write_request(product_id, sku_id, quantity, timestamp, uuid)
# Step 0. Ensure consumer has all permissions to use the product (validate_only=True)
request.validate_only = True
response = service.Write(request)
if len(response.accepted) == 0:
raise ValueError('Unable to provide the service to customer. Reason: %s' % str(response.rejected))
# Step 1. Provide your service to the customer
business_logic(product_id, sku_id)
def main():
logging.basicConfig(level=logging.INFO)
arguments = parse_args()
interceptor = yandexcloud.RetryInterceptor(max_retry_count=5, retriable_codes=[grpc.StatusCode.UNAVAILABLE])
if arguments.token:
sdk = yandexcloud.SDK(interceptor=interceptor, token=arguments.token)
else:
with open(arguments.sa_json_path) as infile:
sdk = yandexcloud.SDK(interceptor=interceptor, service_account_key=json.load(infile))
fill_missing_arguments(sdk, arguments)
instance_id = None
try:
operation = create_instance(sdk, arguments.folder_id, arguments.zone, arguments.name, arguments.subnet_id)
operation_result = sdk.wait_operation_and_get_result(
operation,
response_type=Instance,
meta_type=CreateInstanceMetadata,
)