Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_replace(self):
credentials_provider = awscrt.auth.AwsCredentialsProvider.new_static(
EXAMPLE_ACCESS_KEY_ID, EXAMPLE_SECRET_ACCESS_KEY)
# nondefault values, to be sure they're carried over correctly
orig_cfg = awscrt.auth.AwsSigningConfig(algorithm=awscrt.auth.AwsSigningAlgorithm.SigV4QueryParam,
credentials_provider=credentials_provider,
region='us-west-1',
service='aws-suborbital-ion-cannon',
date=datetime.datetime(year=2000, month=1, day=1),
should_sign_param=lambda x: False,
use_double_uri_encode=True,
should_normalize_uri_path=False,
body_signing_type=awscrt.auth.AwsBodySigningConfigType.BodySigningOff)
# Call replace on single attribute, then assert that ONLY the one attribute differs
def _replace_attr(name, value):
new_cfg = orig_cfg.replace(**{name: value})
self.assertIsNot(orig_cfg, new_cfg) # must return new object
self.assertEqual(value, getattr(new_cfg, name)) # must replace specified value
# check that only the one attribute differs
for attr in awscrt.auth.AwsSigningConfig._attributes:
if attr == name:
self.assertNotEqual(getattr(orig_cfg, attr), getattr(new_cfg, attr),
"replaced value should not match original")
else:
self.assertEqual(getattr(orig_cfg, attr), getattr(new_cfg, attr),
"value should match original")
def test_create_no_session_token(self):
credentials = awscrt.auth.AwsCredentials(EXAMPLE_ACCESS_KEY_ID, EXAMPLE_SECRET_ACCESS_KEY)
self.assertEqual(EXAMPLE_ACCESS_KEY_ID, credentials.access_key_id)
self.assertEqual(EXAMPLE_SECRET_ACCESS_KEY, credentials.secret_access_key)
self.assertIsNone(credentials.session_token)
new_cfg = orig_cfg.replace(region='us-west-3', service='aws-slow-blinking')
self.assertEqual('us-west-3', new_cfg.region)
self.assertEqual('aws-slow-blinking', new_cfg.service)
self.assertEqual(orig_cfg.should_sign_param, new_cfg.should_sign_param)
# Test values copied from aws-c-auth/tests/aws-sig-v4-test-suite/get-vanilla"
SIGV4TEST_ACCESS_KEY_ID = 'AKIDEXAMPLE'
SIGV4TEST_SECRET_ACCESS_KEY = 'wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY'
SIGV4TEST_SESSION_TOKEN = None
SIGV4TEST_SERVICE = 'service'
SIGV4TEST_REGION = 'us-east-1'
SIGV4TEST_METHOD = 'GET'
SIGV4TEST_PATH = '/'
SIGV4TEST_DATE = datetime.datetime(year=2015, month=8, day=30, hour=12, minute=36, second=0, tzinfo=awscrt.auth._utc)
SIGV4TEST_UNSIGNED_HEADERS = [
('Host', 'example.amazonaws.com'),
]
SIGV4TEST_SIGNED_HEADERS = [
('Host',
'example.amazonaws.com'),
('X-Amz-Date',
'20150830T123600Z'),
('Authorization',
'AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20150830/us-east-1/service/aws4_request, SignedHeaders=host;x-amz-date, Signature=5fa00fa31553b73ebf1942676e86291e8372ff2a2260956d9b8aae1d763fbf31')]
class TestSigner(NativeResourceTest):
def test_signing_sigv4_headers(self):
self.assertIsNot(orig_cfg, new_cfg) # must return new object
self.assertEqual(value, getattr(new_cfg, name)) # must replace specified value
# check that only the one attribute differs
for attr in awscrt.auth.AwsSigningConfig._attributes:
if attr == name:
self.assertNotEqual(getattr(orig_cfg, attr), getattr(new_cfg, attr),
"replaced value should not match original")
else:
self.assertEqual(getattr(orig_cfg, attr), getattr(new_cfg, attr),
"value should match original")
_replace_attr('algorithm', awscrt.auth.AwsSigningAlgorithm.SigV4Header)
_replace_attr('credentials_provider',
awscrt.auth.AwsCredentialsProvider.new_static(EXAMPLE_ACCESS_KEY_ID, EXAMPLE_SECRET_ACCESS_KEY))
_replace_attr('region', 'us-west-2')
_replace_attr('service', 'aws-nothing-but-bees')
_replace_attr('date', datetime.datetime(year=2001, month=1, day=1))
_replace_attr('should_sign_param', lambda x: True)
_replace_attr('use_double_uri_encode', False)
_replace_attr('should_normalize_uri_path', True)
_replace_attr('body_signing_type', awscrt.auth.AwsBodySigningConfigType.BodySigningOn)
# check that we can replace multiple values at once
new_cfg = orig_cfg.replace(region='us-west-3', service='aws-slow-blinking')
self.assertEqual('us-west-3', new_cfg.region)
self.assertEqual('aws-slow-blinking', new_cfg.service)
self.assertEqual(orig_cfg.should_sign_param, new_cfg.should_sign_param)
def _sign_websocket_handshake_request(transform_args, **kwargs):
# transform_args need to know when transform is done
try:
signing_config = awscrt.auth.AwsSigningConfig(
algorithm=awscrt.auth.AwsSigningAlgorithm.SigV4QueryParam,
credentials_provider=credentials_provider,
region=region,
service='iotdevicegateway',
should_sign_param=_should_sign_param,
body_signing_type=awscrt.auth.AwsBodySigningConfigType.BodySigningOff)
signing_future = awscrt.auth.aws_sign_request(transform_args.http_request, signing_config)
signing_future.add_done_callback(lambda x: transform_args.set_done(x.exception()))
except Exception as e:
transform_args.set_done(e)
def _sign_websocket_handshake_request(transform_args, **kwargs):
# transform_args need to know when transform is done
try:
signing_config = awscrt.auth.AwsSigningConfig(
algorithm=awscrt.auth.AwsSigningAlgorithm.SigV4QueryParam,
credentials_provider=credentials_provider,
region=region,
service='iotdevicegateway',
should_sign_param=_should_sign_param,
body_signing_type=awscrt.auth.AwsBodySigningConfigType.BodySigningOff)
signing_future = awscrt.auth.aws_sign_request(transform_args.http_request, signing_config)
signing_future.add_done_callback(lambda x: transform_args.set_done(x.exception()))
except Exception as e:
transform_args.set_done(e)
def _sign_websocket_handshake_request(transform_args, **kwargs):
# transform_args need to know when transform is done
try:
signing_config = awscrt.auth.AwsSigningConfig(
algorithm=awscrt.auth.AwsSigningAlgorithm.SigV4QueryParam,
credentials_provider=credentials_provider,
region=region,
service='iotdevicegateway',
should_sign_param=_should_sign_param,
body_signing_type=awscrt.auth.AwsBodySigningConfigType.BodySigningOff)
signing_future = awscrt.auth.aws_sign_request(transform_args.http_request, signing_config)
signing_future.add_done_callback(lambda x: transform_args.set_done(x.exception()))
except Exception as e:
transform_args.set_done(e)