Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_add_multi_values(self):
h = HttpHeaders()
h.add('Cookie', 'a=1')
h.add('Cookie', 'b=2')
self.assertEqual('a=1', h.get('Cookie'))
self.assertEqual(['a=1', 'b=2'], list(h.get_values('Cookie')))
credentials_provider = awscrt.auth.AwsCredentialsProvider.new_static(
SIGV4TEST_ACCESS_KEY_ID, SIGV4TEST_SECRET_ACCESS_KEY, SIGV4TEST_SESSION_TOKEN)
signing_config = awscrt.auth.AwsSigningConfig(
algorithm=awscrt.auth.AwsSigningAlgorithm.SigV4Header,
credentials_provider=credentials_provider,
region=SIGV4TEST_REGION,
service=SIGV4TEST_SERVICE,
date=SIGV4TEST_DATE,
body_signing_type=awscrt.auth.AwsBodySigningConfigType.BodySigningOff)
http_request = awscrt.http.HttpRequest(
method=SIGV4TEST_METHOD,
path=SIGV4TEST_PATH,
headers=awscrt.http.HttpHeaders(SIGV4TEST_UNSIGNED_HEADERS))
signing_future = awscrt.auth.aws_sign_request(http_request, signing_config)
signing_result = signing_future.result(TIMEOUT)
self.assertIs(http_request, signing_result) # should be same object
self.assertEqual(SIGV4TEST_METHOD, http_request.method)
self.assertEqual(SIGV4TEST_PATH, http_request.path)
# existing headers should remain
for prev_header in SIGV4TEST_UNSIGNED_HEADERS:
self.assertIn(prev_header, http_request.headers)
# signed headers must be present
for signed_header in SIGV4TEST_SIGNED_HEADERS:
def _test_put(self, secure):
self._start_server(secure)
connection = self._new_client_connection(secure)
test_asset_path = 'test/test_http_client.py'
with open(test_asset_path, 'rb') as outgoing_body_stream:
outgoing_body_bytes = outgoing_body_stream.read()
headers = HttpHeaders([
('Content-Length', str(len(outgoing_body_bytes))),
])
# seek back to start of stream before trying to send it
outgoing_body_stream.seek(0)
request = HttpRequest('PUT', '/' + test_asset_path, headers, outgoing_body_stream)
response = Response()
http_stream = connection.request(request, response.on_response, response.on_body)
# wait for stream to complete
stream_completion_result = http_stream.completion_future.result(self.timeout)
self.assertEqual(200, response.status_code)
self.assertEqual(200, stream_completion_result)
def test_iter(self):
# test that we iterate over everything we put in
src = [('Host', 'example.org'), ('Cookie', 'a=1')]
h = HttpHeaders(src)
for pair in h:
src.remove(pair)
self.assertEqual(0, len(src))
def test_add(self):
h = HttpHeaders()
h.add('Host', 'example.org')
self.assertEqual('example.org', h.get('Host'))
self.assertEqual(['example.org'], list(h.get_values('Host')))
def test_set(self):
h = HttpHeaders()
# create
h.set('Host', 'example.org')
self.assertEqual(['example.org'], list(h.get_values('Host')))
# replace
h.set('Host', 'example2.org')
self.assertEqual(['example2.org'], list(h.get_values('Host')))
# replace many
h.add('Host', 'example3.org')
h.add('Host', 'example4.org')
h.set('Host', 'example5.org')
self.assertEqual(['example5.org'], list(h.get_values('Host')))
def test_iter_order(self):
# test that headers with multiple values are iterated in insertion order
src = [('Cookie', 'a=1'), ('cookie', 'b=2')]
h = HttpHeaders(src)
gather = [pair for pair in h]
# note this also compares that we preserved case of the names
self.assertEqual(src, gather)
def __init__(self, method='GET', path='/', headers=None, body_stream=None):
assert isinstance(headers, HttpHeaders) or headers is None
if headers is None:
headers = HttpHeaders()
binding = _awscrt.http_message_new_request(headers)
super(HttpRequest, self).__init__(binding, headers, body_stream)
self.method = method
self.path = path