Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def make_valid_url(url):
"""Urlencode all non-ascii characters in url path and query
Args:
* `url`: a url string
"""
if '/+/http' in url:
glue = '/+/'
quoted = [make_valid_url(unquoted) for unquoted in url.split(glue)]
return (glue).join(quoted)
scheme, netloc, path, query, fragment = compat.urlsplit(url)
path = url_encode_non_ascii(path)
query = url_encode_non_ascii(query)
new_url_tuple = (scheme, netloc, path, query, fragment)
quoted_url = compat.urlunsplit(new_url_tuple)
return quoted_url
def _decode_to_textstream(self, stream, encoding, textstream):
"""Return a textstream in `self.DEFAULT_ENCODING`"""
if isinstance(stream, compat.bytes):
stream = codecs.iterdecode([stream], encoding, self.decode_strategy)
elif isinstance(stream, compat.str):
_stream = io.StringIO()
_stream.write(stream)
stream = _stream
stream.seek(0)
else:
stream = codecs.iterdecode(stream, encoding, self.decode_strategy)
try:
for line in stream:
recoded = line.encode(self.DEFAULT_ENCODING).decode(self.DEFAULT_ENCODING)
textstream.write(recoded)
except UnicodeDecodeError as e:
raise exceptions.DataSourceDecodeError
'result_level': RESULT_LEVEL_ERROR,
'result_message': _type['msg'],
'result_id': _type['id'],
'result_name': _type['name'],
'row_index': None,
'row_name': '',
'column_index': None,
'column_name': ''
}
report.write(entry)
for fk in schema['foreignKeys']:
# ensure that `foreignKey.fields` match field names
if isinstance(fk.get('fields'), compat.str):
if fk.get('fields') not in [f['name'] for f in
schema['fields']]:
valid = False
_type = RESULTS['schema_056']
entry = {
'processor': 'schema',
'result_category': RESULT_CATEGORY_SCHEMA,
'result_level': RESULT_LEVEL_ERROR,
'result_message': _type['msg'],
'result_id': _type['id'],
'result_name': _type['name'],
'row_index': None,
'row_name': '',
'column_index': None,
'column_name': ''
def _stream_from_url(self, url):
"""Return a seekable and readable stream from a URL."""
stream = io.BufferedRandom(io.BytesIO())
valid_url = helpers.make_valid_url(url)
try:
document = compat.urlopen(valid_url)
except compat.HTTPError as e:
raise exceptions.DataSourceHTTPError(status=e.getcode())
stream.write(document.read())
stream.seek(0)
return stream
def _stream_from_url(self, url):
"""Return a seekable and readable stream from a URL."""
stream = io.BufferedRandom(io.BytesIO())
valid_url = helpers.make_valid_url(url)
try:
document = compat.urlopen(valid_url)
except compat.HTTPError as e:
raise exceptions.DataSourceHTTPError(status=e.getcode())
stream.write(document.read())
stream.seek(0)
return stream
'result_category': RESULT_CATEGORY_SCHEMA,
'result_level': RESULT_LEVEL_ERROR,
'result_message': _type['msg'],
'result_id': _type['id'],
'result_name': _type['name'],
'row_index': None,
'row_name': '',
'column_index': None,
'column_name': ''
}
report.write(entry)
# IF `pattern` key, then it is a regex
if constraints.get('pattern') and not \
isinstance(constraints['pattern'], compat.str):
valid = False
_type = RESULTS['schema_074']
entry = {
'processor': 'schema',
'result_category': RESULT_CATEGORY_SCHEMA,
'result_level': RESULT_LEVEL_ERROR,
'result_message': _type['msg'],
'result_id': _type['id'],
'result_name': _type['name'],
'row_index': None,
'row_name': '',
'column_index': None,
'column_name': ''
}
def _decode_to_textstream(self, stream, encoding, textstream):
"""Return a textstream in `self.DEFAULT_ENCODING`"""
if isinstance(stream, compat.bytes):
stream = codecs.iterdecode([stream], encoding, self.decode_strategy)
elif isinstance(stream, compat.str):
_stream = io.StringIO()
_stream.write(stream)
stream = _stream
stream.seek(0)
else:
stream = codecs.iterdecode(stream, encoding, self.decode_strategy)
try:
for line in stream:
recoded = line.encode(self.DEFAULT_ENCODING).decode(self.DEFAULT_ENCODING)
textstream.write(recoded)
except UnicodeDecodeError as e:
raise exceptions.DataSourceDecodeError
textstream.seek(0)
if isinstance(data_source, io.TextIOBase):
# if not data_source.encoding == self.DEFAULT_ENCODING:
# return
return data_source
else:
self.encoding = self._detect_stream_encoding(data_source)
textstream = self._decode_to_textstream(data_source, self.encoding, textstream)
return textstream
elif isinstance(data_source, compat.str) and \
compat.urlparse(data_source).scheme in self.REMOTE_SCHEMES:
stream = self._stream_from_url(data_source)
self.encoding = self._detect_stream_encoding(stream)
textstream = self._decode_to_textstream(stream, self.encoding, textstream)
return textstream
elif (isinstance(data_source, compat.str) or isinstance(data_source, compat.bytes)) and not \
os.path.exists(data_source):
self.encoding = self._detect_stream_encoding(data_source)
textstream = self._decode_to_textstream(data_source, self.encoding, textstream)
return textstream
else:
def _detect_stream_encoding(self, stream):
"""Return best guess at encoding of stream."""
sample_length = 64*1024
self._check_for_unsupported_format(stream)
if self.passed_encoding:
return self.passed_encoding
if isinstance(stream, compat.str):
sample = compat.to_bytes(stream)[:sample_length]
elif isinstance(stream, compat.bytes):
sample = stream[:sample_length]
else:
sample = stream.read(sample_length)
stream.seek(0)
encoding = chardet.detect(sample)['encoding'].lower()
# default to utf-8 for safety
if encoding == 'ascii':
encoding = 'utf-8'
return encoding