Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_detect_scheme_and_format(source, scheme, format):
assert helpers.detect_scheme_and_format(source) == (scheme, format)
def opener():
_params = dict(headers=1)
format = __resource.get("format")
if format == "txt":
# datapackage-pipelines processing requires having a header row
# for txt format we add a single "data" column
_params["headers"] = ["data"]
_params["custom_parsers"] = {"txt": TXTParser}
_params["allow_html"] = True
else:
if format is None:
_, format = tabulator.helpers.detect_scheme_and_format(__url)
if format in tabulator.config.SUPPORTED_COMPRESSION:
format = None
else:
try:
parser_cls = tabulator.helpers.import_attribute(tabulator.config.PARSERS[format])
except KeyError:
logging.error("Unknown format %r", format)
raise
_params.update(
dict(x for x in __resource.items()
if x[0] in parser_cls.options))
_params.update(
dict(x for x in __resource.items()
if x[0] in {'headers', 'scheme', 'encoding', 'sample_size', 'allow_html',
'force_strings', 'force_parse', 'skip_rows', 'compression',
'http_timeout'}))
# Arguments:
target (str): Path where to save the stream.
format (str, optional):
The format the stream will be saved as. If
None, detects from the ``target`` path. Defaults to None.
encoding (str, optional):
Saved file encoding. Defaults to ``config.DEFAULT_ENCODING``.
**options: Extra options passed to the writer.
"""
# Get encoding/format
if encoding is None:
encoding = config.DEFAULT_ENCODING
if format is None:
_, format = helpers.detect_scheme_and_format(target)
# Prepare writer class
writer_class = self.__custom_writers.get(format)
if writer_class is None:
if format not in config.WRITERS:
message = 'Format "%s" is not supported' % format
raise exceptions.FormatError(message)
writer_class = helpers.import_attribute(config.WRITERS[format])
# Prepare writer options
writer_options = helpers.extract_options(options, writer_class.options)
if options:
message = 'Not supported options "%s" for format "%s"'
message = message % (', '.join(options), format)
raise exceptions.TabulatorException(message)
source.write(line)
source.seek(0)
# We redefine loader/format/schema after decompression
self.__loader = StreamLoader(bytes_sample_size=self.__bytes_sample_size)
format = self.__format or helpers.detect_scheme_and_format(source.name)[1]
scheme = 'stream'
# Gzip compression
elif compression == 'gz' and six.PY3:
name = ''
if isinstance(source, str):
name = source.replace('.gz', '')
source = gzip.open(self.__loader.load(source, mode='b'))
# We redefine loader/format/schema after decompression
self.__loader = StreamLoader(bytes_sample_size=self.__bytes_sample_size)
format = self.__format or helpers.detect_scheme_and_format(name)[1]
scheme = 'stream'
# Not supported compression
elif compression:
message = 'Compression "%s" is not supported for your Python version'
raise exceptions.TabulatorException(message % compression)
# Attach stats to the loader
if getattr(self.__loader, 'attach_stats', None):
self.__stats = {'size': 0, 'hash': ''}
getattr(self.__loader, 'attach_stats')(self.__stats)
# Initiate parser
parser_class = self.__custom_parsers.get(format)
if parser_class is None:
if format not in config.PARSERS:
# Zip compression
if compression == 'zip' and six.PY3:
source = self.__loader.load(source, mode='b')
with zipfile.ZipFile(source) as archive:
name = archive.namelist()[0]
if 'filename' in options.keys():
name = options['filename']
del options['filename']
with archive.open(name) as file:
source = tempfile.NamedTemporaryFile(suffix='.' + name)
for line in file:
source.write(line)
source.seek(0)
# We redefine loader/format/schema after decompression
self.__loader = StreamLoader(bytes_sample_size=self.__bytes_sample_size)
format = self.__format or helpers.detect_scheme_and_format(source.name)[1]
scheme = 'stream'
# Gzip compression
elif compression == 'gz' and six.PY3:
name = ''
if isinstance(source, str):
name = source.replace('.gz', '')
source = gzip.open(self.__loader.load(source, mode='b'))
# We redefine loader/format/schema after decompression
self.__loader = StreamLoader(bytes_sample_size=self.__bytes_sample_size)
format = self.__format or helpers.detect_scheme_and_format(name)[1]
scheme = 'stream'
# Not supported compression
elif compression:
message = 'Compression "%s" is not supported for your Python version'
def open(self):
"""Opens the stream for reading.
# Raises:
TabulatorException: if an error
"""
source = self.__source
options = copy(self.__options)
# Get scheme and format if not already given
compression = None
if self.__scheme is None or self.__format is None:
detected_scheme, detected_format = helpers.detect_scheme_and_format(source)
scheme = self.__scheme or detected_scheme
format = self.__format or detected_format
# Get compression
for type in config.SUPPORTED_COMPRESSION:
if self.__compression == type or detected_format == type:
compression = type
else:
scheme = self.__scheme
format = self.__format
# Initiate loader
self.__loader = None
if scheme is not None:
loader_class = self.__custom_loaders.get(scheme)
if loader_class is None:
if scheme not in config.LOADERS: