Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_report_result_types(self):
self.assertTrue(utilities.helpers.get_report_result_types())
def resolve_processor(self, processor_name):
"""Return a processor class."""
if processor_name in helpers.builtin_processors():
processor_class = helpers.builtin_processors()[processor_name]
else:
# resolve a custom processor
_module, _class = processor_name.rsplit('.', 1)
try:
processor_class = getattr(importlib.import_module(_module),
_class)
except ImportError:
msg = ('The processsor \'{0}\' could not be resolved due to an '
'Import Error.').format(processor_name)
raise exceptions.InvalidPipelineOptions(msg)
return processor_class
def __init__(self, data, processors=None, dialect=None, format='csv',
transform=True, encoding=None, decode_strategy='replace',
options=None, fail_fast=False, row_limit=20000,
report_limit=1000, report_stream=None, header_index=0,
break_on_invalid_processor=True, post_task=None,
report_type='basic'):
if data is None:
_msg = '`data` must be a filepath, url or stream.'
raise exceptions.PipelineBuildError(_msg)
self.openfiles = []
self.data_source = data
self.processors = processors or helpers.DEFAULT_PIPELINE
self.dialect = self.get_dialect(dialect)
self.format = format
self.encoding = encoding
self.decode_strategy = decode_strategy
self.options = options or {}
self.transform = transform
self.fail_fast = fail_fast
self.row_limit = self.get_row_limit(row_limit)
self.report_limit = self.get_report_limit(report_limit)
self.report_stream = report_stream
self.header_index = header_index
self.break_on_invalid_processor = break_on_invalid_processor
helpers.validate_handler(post_task)
if report_type == 'grouped':
def validate(source):
"""Validate a CSV Dialect source file."""
schemafile = os.path.abspath(os.path.join(
os.path.dirname(os.path.dirname(__file__)), 'schemas',
'csv-dialect-description-format.json'))
with io.open(schemafile) as stream:
schema = json.load(stream)
try:
source = helpers.load_json_source(source)
jsonschema.validate(source, schema)
return True
except (jsonschema.ValidationError, ValueError, TypeError):
return False
if report is None:
if report_stream:
report_stream_tests = [isinstance(report_stream, io.TextIOBase),
report_stream.writable(),
report_stream.seekable()]
if not all(report_stream_tests):
_msg = '`report_stream` must be a seekable and writable text stream.'
raise exceptions.ProcessorBuildError(_msg)
report_backend = 'client'
else:
report_backend = 'yaml'
report_options = {
'schema': helpers.report_schema,
'backend': report_backend,
'client_stream': report_stream,
'limit': report_limit
}
self.report = tellme.Report(self.name, **report_options)
else:
self.report = report
def __init__(self, source, source_type='csv', sleep=None, data_key='data',
schema_key='schema', format_key='format', encoding_key='encoding',
pipeline_options=None, post_task=None, pipeline_post_task=None):
self.source = source
self.source_type = source_type
self.data_key = data_key
self.schema_key = schema_key
self.format_key = format_key
self.encoding_key = encoding_key
self.dataset = self.get_dataset()
self.pipeline_options = pipeline_options or {}
self.current_pipeline = None
self.reports = []
helpers.validate_handler(post_task, 1)
helpers.validate_handler(pipeline_post_task, 1)
self.post_task = post_task
self.pipeline_post_task = pipeline_post_task
if sleep and not isinstance(sleep, (int, float)):
raise ValueError('Received non int or float for the \'sleep\' argument.')
self.sleep = sleep
def get_dataset_datapackage(self):
"""Get the dataset from a Data Package for this batch process."""
_name = 'datapackage.json'
descriptor = os.path.join(self.source, _name)
dataset = []
# TODO: We want to use https://github.com/tryggvib/datapackage here
# but, in order to do so, we need these issues resolved:
# https://github.com/tryggvib/datapackage/issues/35
# https://github.com/tryggvib/datapackage/issues/33
pkg = helpers.load_json_source(descriptor)
for entry in pkg['resources']:
if entry.get('url'):
data = entry['url']
elif entry.get('path'):
if pkg.get('base'):
data = '{0}{1}'.format(pkg['base'], entry['path'])
else:
data = os.path.join(self.source, entry['path'])
else:
data = entry.get('data')
dataset.append({
'data': data,
'schema': entry.get('schema')
})