Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_request_create_default(self):
request = HttpRequest()
self.assertEqual("GET", request.method)
self.assertEqual("/", request.path)
self.assertEqual([], list(request.headers))
self.assertIsNone(request.body_stream)
def _test_stream_lives_until_complete(self, secure):
self._start_server(secure)
connection = self._new_client_connection(secure)
request = HttpRequest('GET', '/test/test_http_client.py')
stream = connection.request(request)
completion_future = stream.completion_future
# delete all local references
del stream
del connection
# stream should still complete successfully
completion_future.result(self.timeout)
self._stop_server()
def _test_put(self, secure):
self._start_server(secure)
connection = self._new_client_connection(secure)
test_asset_path = 'test/test_http_client.py'
with open(test_asset_path, 'rb') as outgoing_body_stream:
outgoing_body_bytes = outgoing_body_stream.read()
headers = HttpHeaders([
('Content-Length', str(len(outgoing_body_bytes))),
])
# seek back to start of stream before trying to send it
outgoing_body_stream.seek(0)
request = HttpRequest('PUT', '/' + test_asset_path, headers, outgoing_body_stream)
response = Response()
http_stream = connection.request(request, response.on_response, response.on_body)
# wait for stream to complete
stream_completion_result = http_stream.completion_future.result(self.timeout)
self.assertEqual(200, response.status_code)
self.assertEqual(200, stream_completion_result)
# compare what we sent against what the server received
server_received = self.server.put_requests.get('/' + test_asset_path)
self.assertIsNotNone(server_received)
self.assertEqual(server_received, outgoing_body_bytes)
self.assertEqual(None, connection.close().result(self.timeout))
self._stop_server()
def _test_get(self, secure, proxy_options=None):
# Use HTTP/1.0 in proxy tests or server will keep connection with proxy alive
# and refuse to shut down for 1 minute at the end of each proxy test
http_1_0 = proxy_options is not None
self._start_server(secure, http_1_0)
connection = self._new_client_connection(secure, proxy_options)
test_asset_path = 'test/test_http_client.py'
request = HttpRequest('GET', '/' + test_asset_path)
response = Response()
stream = connection.request(request, response.on_response, response.on_body)
# wait for stream to complete
stream_completion_result = stream.completion_future.result(self.timeout)
self.assertEqual(200, response.status_code)
self.assertEqual(200, stream_completion_result)
with open(test_asset_path, 'rb') as test_asset:
test_asset_bytes = test_asset.read()
self.assertEqual(test_asset_bytes, response.body)
self.assertEqual(None, connection.close().exception(self.timeout))
self._stop_server()
def test_signing_sigv4_headers(self):
credentials_provider = awscrt.auth.AwsCredentialsProvider.new_static(
SIGV4TEST_ACCESS_KEY_ID, SIGV4TEST_SECRET_ACCESS_KEY, SIGV4TEST_SESSION_TOKEN)
signing_config = awscrt.auth.AwsSigningConfig(
algorithm=awscrt.auth.AwsSigningAlgorithm.SigV4Header,
credentials_provider=credentials_provider,
region=SIGV4TEST_REGION,
service=SIGV4TEST_SERVICE,
date=SIGV4TEST_DATE,
body_signing_type=awscrt.auth.AwsBodySigningConfigType.BodySigningOff)
http_request = awscrt.http.HttpRequest(
method=SIGV4TEST_METHOD,
path=SIGV4TEST_PATH,
headers=awscrt.http.HttpHeaders(SIGV4TEST_UNSIGNED_HEADERS))
signing_future = awscrt.auth.aws_sign_request(http_request, signing_config)
signing_result = signing_future.result(TIMEOUT)
self.assertIs(http_request, signing_result) # should be same object
self.assertEqual(SIGV4TEST_METHOD, http_request.method)
self.assertEqual(SIGV4TEST_PATH, http_request.path)
# existing headers should remain
for prev_header in SIGV4TEST_UNSIGNED_HEADERS:
self.assertIn(prev_header, http_request.headers)
def _test_shutdown_error(self, secure):
# Use HTTP/1.0 connection to force a SOCKET_CLOSED error after request completes
self._start_server(secure, http_1_0=True)
connection = self._new_client_connection(secure)
# Send request, don't care what happens
request = HttpRequest('GET', '/')
response = Response()
stream = connection.request(request, response.on_response, response.on_body)
stream.completion_future.result(self.timeout)
# Wait for server to hang up, which should be immediate since it's using HTTP/1.0
shutdown_error = connection.shutdown_future.exception(self.timeout)
self.assertIsInstance(shutdown_error, awscrt.exceptions.AwsCrtError)
self._stop_server()
(1) It is good practice to use a new config for each signature, or the date might get too old.
(2) Do not add the following headers to requests before signing, they may be added by the signer:
x-amz-content-sha256,
X-Amz-Date,
Authorization
(3) Do not add the following query params to requests before signing, they may be added by the signer:
X-Amz-Signature,
X-Amz-Date,
X-Amz-Credential,
X-Amz-Algorithm,
X-Amz-SignedHeaders
"""
assert isinstance(http_request, HttpRequest)
assert isinstance(signing_config, AwsSigningConfig)
future = Future()
def _on_complete(error_code):
try:
if error_code:
future.set_exception(awscrt.exceptions.from_code(error_code))
else:
future.set_result(http_request)
except Exception as e:
future.set_exception(e)
_awscrt.sign_request_aws(http_request, signing_config, _on_complete)
return future
socket_options = io.SocketOptions()
socket_options.connect_timeout_ms = args.connect_timeout
hostname = url.hostname
connect_future = http.HttpClientConnection.new(
host_name=hostname,
port=port,
socket_options=socket_options,
tls_connection_options=tls_connection_options,
bootstrap=client_bootstrap)
connection = connect_future.result(10)
connection.shutdown_future.add_done_callback(on_connection_shutdown)
request = http.HttpRequest(args.method, body_stream=data_stream)
if args.get:
request.method = "GET"
if args.post:
request.method = "POST"
if args.head:
request.method = "HEAD"
if url.path:
request.path = url.path
if url.query:
request.path += '?' + url.query