Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_invalid_options(self):
args = {'data': self.data_filepath}
as_list = dict({ 'options': ['invalid']}, **args)
invalid_processor = dict({'options': {'invalid': {}}}, **args)
invalid_processor_options = dict({'options': {'schema': {'invalid': {}}}}, **args)
self.assertRaises(exceptions.InvalidPipelineOptions, Pipeline, **as_list)
self.assertRaises(exceptions.InvalidPipelineOptions, Pipeline,
**invalid_processor)
self.assertRaises(exceptions.InvalidPipelineOptions, Pipeline,
**invalid_processor_options)
def test_invalid_options(self):
args = {'data': self.data_filepath}
as_list = dict({ 'options': ['invalid']}, **args)
invalid_processor = dict({'options': {'invalid': {}}}, **args)
invalid_processor_options = dict({'options': {'schema': {'invalid': {}}}}, **args)
self.assertRaises(exceptions.InvalidPipelineOptions, Pipeline, **as_list)
self.assertRaises(exceptions.InvalidPipelineOptions, Pipeline,
**invalid_processor)
self.assertRaises(exceptions.InvalidPipelineOptions, Pipeline,
**invalid_processor_options)
def validate_options(self):
"""Validates the options parameter."""
if not isinstance(self.options, dict):
msg = 'Pipeline \'options\' argument must be a dict.'
raise exceptions.InvalidPipelineOptions(msg)
if not set(self.options.keys()).issubset(set(self.processors)):
unknown_opts = set(self.options.keys()).difference(set(self.processors))
msg = ('Option(s) \'{0}\' don\'t correspond to any required '
'processor.').format(','.join(unknown_opts))
raise exceptions.InvalidPipelineOptions(msg)
for processor_name in self.processors:
passed_opts = self.options.get(processor_name, {})
processor = self.resolve_processor(processor_name)
processor_args, _, _, _, = inspect.getargspec(processor.__init__)
base_args, _, _, _, = inspect.getargspec(base.Processor.__init__)
available_opts = processor_args[1::]
available_opts.extend(base_args[1::])
invalid_opts = [option for option in passed_opts.keys()
if option not in available_opts]
if invalid_opts:
msg = ('Option(s) \'{0}\' are not valid arguments for {1} '
'processor.').format(','.join(invalid_opts), processor_name)
raise exceptions.InvalidPipelineOptions(msg)
return None
'processor.').format(','.join(unknown_opts))
raise exceptions.InvalidPipelineOptions(msg)
for processor_name in self.processors:
passed_opts = self.options.get(processor_name, {})
processor = self.resolve_processor(processor_name)
processor_args, _, _, _, = inspect.getargspec(processor.__init__)
base_args, _, _, _, = inspect.getargspec(base.Processor.__init__)
available_opts = processor_args[1::]
available_opts.extend(base_args[1::])
invalid_opts = [option for option in passed_opts.keys()
if option not in available_opts]
if invalid_opts:
msg = ('Option(s) \'{0}\' are not valid arguments for {1} '
'processor.').format(','.join(invalid_opts), processor_name)
raise exceptions.InvalidPipelineOptions(msg)
return None
def validate_processors(self):
"""Validates the processors parameter"""
if not isinstance(self.processors, (list, tuple, set)):
msg = 'The \'processors\' argument must be a list, tuple or set.'
raise exceptions.InvalidPipelineOptions(msg)
for processor_name in self.processors:
self.resolve_processor(processor_name)
def validate_options(self):
"""Validates the options parameter."""
if not isinstance(self.options, dict):
msg = 'Pipeline \'options\' argument must be a dict.'
raise exceptions.InvalidPipelineOptions(msg)
if not set(self.options.keys()).issubset(set(self.processors)):
unknown_opts = set(self.options.keys()).difference(set(self.processors))
msg = ('Option(s) \'{0}\' don\'t correspond to any required '
'processor.').format(','.join(unknown_opts))
raise exceptions.InvalidPipelineOptions(msg)
for processor_name in self.processors:
passed_opts = self.options.get(processor_name, {})
processor = self.resolve_processor(processor_name)
processor_args, _, _, _, = inspect.getargspec(processor.__init__)
base_args, _, _, _, = inspect.getargspec(base.Processor.__init__)
available_opts = processor_args[1::]
available_opts.extend(base_args[1::])
invalid_opts = [option for option in passed_opts.keys()
if option not in available_opts]