Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def sign(self, method, bucket_name=None, url=None, headers=None,
sub_resources=None):
headers = HeaderDict() if headers is None else headers.copy()
string_to_sign = []
string_to_sign.append('{0!s}\n'.format(method))
string_to_sign.append('{0!s}\n'.format(headers.get('content-md5', ('',))))
string_to_sign.append('{0!s}\n'.format(headers.get('content-type', ('',))))
if 'x-amz-date' in headers:
string_to_sign.append('\n')
else:
string_to_sign.append('{0!s}\n'.format(headers.get('date', ('',))))
for key in sorted(set(imap(str.lower, headers.keys())) - self.SIGN_IGNORE_HEADERS):
string_to_sign.append('{0!s}:{1!s}\n'.format(key, headers[key]))
if bucket_name is not None:
string_to_sign.append('/{0!s}'.format(bucket_name))
if url is not None:
string_to_sign.append(urlparse(url).path)
if sub_resources:
query_params = []
headers['x-amz-date'] = \
datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S +0000')
if self.iam_token is not None:
self._refresh_iam_credentials()
headers['x-amz-security-token'] = self.iam_token
headers['Authorization'] = self.sign(method, bucket_name, url, headers,
sub_resources)
while True:
try:
if self.connection is None:
self.connection = httplib.HTTPConnection(self.host)
self.connection.request(method, url, body, headers)
response = self.connection.getresponse()
if response.status not in xrange(200, 300):
raise S3Error(method, url, body, headers, response)
headers = HeaderDict(response.getheaders())
body = response.read()
return (headers, body)
except httplib.BadStatusLine as exc:
logger.warn(exc)
self.connection = None
except httplib.CannotSendRequest as exc:
logger.warn(exc)
self.connection = None
except ssl.SSLError as exc:
logger.warn(exc)
self.connection = None
except IOError as exc:
if exc.errno == errno.ECONNRESET:
logger.warn(exc)
self.connection = None
else:
def request(self, method, bucket_name=None, url=None, headers=None,
body=None, sub_resources=None):
headers = HeaderDict() if headers is None else headers.copy()
if 'x-amz-date' not in headers:
headers['x-amz-date'] = \
datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S +0000')
if self.iam_token is not None:
self._refresh_iam_credentials()
headers['x-amz-security-token'] = self.iam_token
headers['Authorization'] = self.sign(method, bucket_name, url, headers,
sub_resources)
while True:
try:
if self.connection is None:
self.connection = httplib.HTTPConnection(self.host)
self.connection.request(method, url, body, headers)
response = self.connection.getresponse()
if response.status not in xrange(200, 300):
raise S3Error(method, url, body, headers, response)
def put(self, bucket_name=None, url=None, headers=None, body=None):
headers = HeaderDict() if headers is None else headers.copy()
if 'Content-MD5' not in headers:
headers['Content-MD5'] = b64encode(hashlib.md5(body).digest())
return self.request('PUT', bucket_name, url, body=body, headers=headers)