Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def add_override_response_headers(self, cache_control,
content_disposition,
content_encoding,
content_language,
content_type):
self._add_query(_QueryStringConstants.SIGNED_CACHE_CONTROL, cache_control)
self._add_query(_QueryStringConstants.SIGNED_CONTENT_DISPOSITION, content_disposition)
self._add_query(_QueryStringConstants.SIGNED_CONTENT_ENCODING, content_encoding)
self._add_query(_QueryStringConstants.SIGNED_CONTENT_LANGUAGE, content_language)
self._add_query(_QueryStringConstants.SIGNED_CONTENT_TYPE, content_type)
def add_override_response_headers(self, cache_control,
content_disposition,
content_encoding,
content_language,
content_type):
self._add_query(_QueryStringConstants.SIGNED_CACHE_CONTROL, cache_control)
self._add_query(_QueryStringConstants.SIGNED_CONTENT_DISPOSITION, content_disposition)
self._add_query(_QueryStringConstants.SIGNED_CONTENT_ENCODING, content_encoding)
self._add_query(_QueryStringConstants.SIGNED_CONTENT_LANGUAGE, content_language)
self._add_query(_QueryStringConstants.SIGNED_CONTENT_TYPE, content_type)
def add_resource(self, resource):
self._add_query(_QueryStringConstants.SIGNED_RESOURCE, resource)
def add_override_response_headers(self, cache_control,
content_disposition,
content_encoding,
content_language,
content_type):
self._add_query(_QueryStringConstants.SIGNED_CACHE_CONTROL, cache_control)
self._add_query(_QueryStringConstants.SIGNED_CONTENT_DISPOSITION, content_disposition)
self._add_query(_QueryStringConstants.SIGNED_CONTENT_ENCODING, content_encoding)
self._add_query(_QueryStringConstants.SIGNED_CONTENT_LANGUAGE, content_language)
self._add_query(_QueryStringConstants.SIGNED_CONTENT_TYPE, content_type)
def get_value_to_append(query):
return_value = self.query_dict.get(query) or ''
return return_value + '\n'
string_to_sign = \
(account_name + '\n' +
get_value_to_append(_QueryStringConstants.SIGNED_PERMISSION) +
get_value_to_append(_QueryStringConstants.SIGNED_SERVICES) +
get_value_to_append(_QueryStringConstants.SIGNED_RESOURCE_TYPES) +
get_value_to_append(_QueryStringConstants.SIGNED_START) +
get_value_to_append(_QueryStringConstants.SIGNED_EXPIRY) +
get_value_to_append(_QueryStringConstants.SIGNED_IP) +
get_value_to_append(_QueryStringConstants.SIGNED_PROTOCOL) +
get_value_to_append(_QueryStringConstants.SIGNED_VERSION))
self._add_query(_QueryStringConstants.SIGNED_SIGNATURE,
_sign_string(account_key, string_to_sign))
def add_account(self, services, resource_types):
self._add_query(_QueryStringConstants.SIGNED_SERVICES, services)
self._add_query(_QueryStringConstants.SIGNED_RESOURCE_TYPES, resource_types)
def add_account_signature(self, account_name, account_key):
def get_value_to_append(query):
return_value = self.query_dict.get(query) or ''
return return_value + '\n'
string_to_sign = \
(account_name + '\n' +
get_value_to_append(_QueryStringConstants.SIGNED_PERMISSION) +
get_value_to_append(_QueryStringConstants.SIGNED_SERVICES) +
get_value_to_append(_QueryStringConstants.SIGNED_RESOURCE_TYPES) +
get_value_to_append(_QueryStringConstants.SIGNED_START) +
get_value_to_append(_QueryStringConstants.SIGNED_EXPIRY) +
get_value_to_append(_QueryStringConstants.SIGNED_IP) +
get_value_to_append(_QueryStringConstants.SIGNED_PROTOCOL) +
get_value_to_append(_QueryStringConstants.SIGNED_VERSION))
self._add_query(_QueryStringConstants.SIGNED_SIGNATURE,
_sign_string(account_key, string_to_sign))
def add_base(self, permission, expiry, start, ip, protocol, x_ms_version):
if isinstance(start, date):
start = _to_utc_datetime(start)
if isinstance(expiry, date):
expiry = _to_utc_datetime(expiry)
self._add_query(_QueryStringConstants.SIGNED_START, start)
self._add_query(_QueryStringConstants.SIGNED_EXPIRY, expiry)
self._add_query(_QueryStringConstants.SIGNED_PERMISSION, permission)
self._add_query(_QueryStringConstants.SIGNED_IP, ip)
self._add_query(_QueryStringConstants.SIGNED_PROTOCOL, protocol)
self._add_query(_QueryStringConstants.SIGNED_VERSION, x_ms_version)
def add_base(self, permission, expiry, start, ip, protocol, x_ms_version):
if isinstance(start, date):
start = _to_utc_datetime(start)
if isinstance(expiry, date):
expiry = _to_utc_datetime(expiry)
self._add_query(_QueryStringConstants.SIGNED_START, start)
self._add_query(_QueryStringConstants.SIGNED_EXPIRY, expiry)
self._add_query(_QueryStringConstants.SIGNED_PERMISSION, permission)
self._add_query(_QueryStringConstants.SIGNED_IP, ip)
self._add_query(_QueryStringConstants.SIGNED_PROTOCOL, protocol)
self._add_query(_QueryStringConstants.SIGNED_VERSION, x_ms_version)
def _scrub_headers(headers):
# make a copy to avoid contaminating the request
clean_headers = headers.copy()
if _AUTHORIZATION_HEADER_NAME in clean_headers:
clean_headers[_AUTHORIZATION_HEADER_NAME] = _REDACTED_VALUE
# in case of copy operations, there could be a SAS signature present in the header value
if _COPY_SOURCE_HEADER_NAME in clean_headers \
and _QueryStringConstants.SIGNED_SIGNATURE + "=" in clean_headers[_COPY_SOURCE_HEADER_NAME]:
# take the url apart and scrub away the signed signature
scheme, netloc, path, params, query, fragment = urlparse(clean_headers[_COPY_SOURCE_HEADER_NAME])
parsed_qs = dict(parse_qsl(query))
parsed_qs[_QueryStringConstants.SIGNED_SIGNATURE] = _REDACTED_VALUE
# the SAS needs to be put back together
clean_headers[_COPY_SOURCE_HEADER_NAME] = urlunparse(
(scheme, netloc, path, params, urlencode(parsed_qs), fragment))
return clean_headers