Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testParseOptions(self):
"""Tests the ParseOptions function."""
options = cli_test_lib.TestOptions()
test_tool = tools.CLITool()
process_resources.ProcessResourcesArgumentsHelper.ParseOptions(
options, test_tool)
with self.assertRaises(errors.BadConfigObject):
process_resources.ProcessResourcesArgumentsHelper.ParseOptions(
options, None)
with self.assertRaises(errors.BadConfigOption):
options.process_memory_limit = 'bogus'
process_resources.ProcessResourcesArgumentsHelper.ParseOptions(
options, test_tool)
with self.assertRaises(errors.BadConfigOption):
options.process_memory_limit = -1
process_resources.ProcessResourcesArgumentsHelper.ParseOptions(
options, test_tool)
options.profiling_sample_rate = '100'
profiling.ProfilingArgumentsHelper.ParseOptions(options, test_tool)
self.assertEqual(test_tool._profiling_sample_rate, 100)
with shared_test_lib.TempDirectory() as temp_directory:
options = cli_test_lib.TestOptions()
options.profilers = 'processing'
options.profiling_directory = temp_directory
profiling.ProfilingArgumentsHelper.ParseOptions(options, test_tool)
self.assertEqual(test_tool._profilers, set(['processing']))
self.assertEqual(test_tool._profiling_directory, temp_directory)
self.assertEqual(test_tool._profiling_sample_rate, 1000)
with self.assertRaises(errors.BadConfigObject):
options = cli_test_lib.TestOptions()
profiling.ProfilingArgumentsHelper.ParseOptions(options, None)
with self.assertRaises(errors.BadConfigOption):
options = cli_test_lib.TestOptions()
options.profilers = 'bogus'
profiling.ProfilingArgumentsHelper.ParseOptions(options, test_tool)
with self.assertRaises(errors.BadConfigOption):
options = cli_test_lib.TestOptions()
options.profiling_directory = '/bogus'
profiling.ProfilingArgumentsHelper.ParseOptions(options, test_tool)
def testParseOptions(self):
"""Tests the ParseOptions function."""
options = cli_test_lib.TestOptions()
analysis_plugin = viper.ViperAnalysisPlugin()
with self.assertRaises(errors.BadConfigOption):
viper_analysis.ViperAnalysisArgumentsHelper.ParseOptions(
options, analysis_plugin)
with self.assertRaises(errors.BadConfigObject):
viper_analysis.ViperAnalysisArgumentsHelper.ParseOptions(
options, None)
def testParseOptions(self):
"""Tests the ParseOptions function."""
test_file_path = self._GetTestFilePath(['tagging_file', 'valid.txt'])
self._SkipIfPathNotExists(test_file_path)
options = cli_test_lib.TestOptions()
options.tagging_file = test_file_path
analysis_plugin = tagging.TaggingAnalysisPlugin()
tagging_analysis.TaggingAnalysisArgumentsHelper.ParseOptions(
options, analysis_plugin)
with self.assertRaises(errors.BadConfigObject):
tagging_analysis.TaggingAnalysisArgumentsHelper.ParseOptions(
options, None)
options.tagging_file = None
with self.assertRaises(errors.BadConfigOption):
tagging_analysis.TaggingAnalysisArgumentsHelper.ParseOptions(
options, analysis_plugin)
test_file_path = self._GetTestFilePath([
'tagging_file', 'invalid_syntax.txt'])
self._SkipIfPathNotExists(test_file_path)
options.tagging_file = test_file_path
with self.assertRaises(errors.BadConfigOption):
def ParseOptions(cls, options, configuration_object):
"""Parses and validates options.
Args:
options (argparse.Namespace): parser options.
configuration_object (CLITool): object to be configured by the argument
helper.
Raises:
BadConfigObject: when the configuration object is of the wrong type.
BadConfigOption: if the storage format or task storage is not defined
or supported.
"""
if not isinstance(configuration_object, tools.CLITool):
raise errors.BadConfigObject(
'Configuration object is not an instance of CLITool')
storage_format = cls._ParseStringOption(options, 'storage_format')
if not storage_format:
raise errors.BadConfigOption('Unable to determine storage format.')
if storage_format not in definitions.STORAGE_FORMATS:
raise errors.BadConfigOption(
'Unsupported storage format: {0:s}'.format(storage_format))
setattr(configuration_object, '_storage_format', storage_format)
task_storage_format = cls._ParseStringOption(options, 'task_storage_format')
if not task_storage_format:
raise errors.BadConfigOption('Unable to determine task storage format.')
def ParseOptions(cls, options, configuration_object):
"""Parses and validates options.
Args:
options (argparse.Namespace): parser options.
configuration_object (CLITool): object to be configured by the argument
helper.
Raises:
BadConfigObject: when the configuration object is of the wrong type.
"""
if not isinstance(configuration_object, tools.CLITool):
raise errors.BadConfigObject(
'Configuration object is not an instance of CLITool')
temporary_directory = getattr(options, 'temporary_directory', None)
if temporary_directory and not os.path.isdir(temporary_directory):
raise errors.BadConfigOption(
'No such temporary directory: {0:s}'.format(temporary_directory))
setattr(configuration_object, '_temporary_directory', temporary_directory)
BadConfigObject: when the configuration object is of the wrong type.
"""
if not isinstance(configuration_object, tools.CLITool):
raise errors.BadConfigObject(
'Configuration object is not an instance of CLITool')
yara_rules_string = None
path = getattr(options, 'yara_rules_path', None)
if path:
try:
with io.open(path, 'rt', encoding='utf-8') as rules_file:
yara_rules_string = rules_file.read()
except IOError as exception:
raise errors.BadConfigObject(
'Unable to read Yara rules file: {0:s} with error: {1!s}'.format(
path, exception))
try:
# We try to parse the rules here, to check that the definitions are
# valid. We then pass the string definitions along to the workers, so
# that they don't need read access to the rules file.
yara.compile(source=yara_rules_string)
except yara.Error as exception:
raise errors.BadConfigObject(
'Unable to parse Yara rules in: {0:s} with error: {1!s}'.format(
path, exception))
setattr(configuration_object, '_yara_rules_string', yara_rules_string)
def ParseOptions(cls, options, output_module):
"""Parses and validates options.
Args:
options (argparse.Namespace): parser options.
output_module (OutputModule): output module to configure.
Raises:
BadConfigObject: when the output module object is of the wrong type.
"""
if not isinstance(output_module, mysql_4n6time.MySQL4n6TimeOutputModule):
raise errors.BadConfigObject(
'Output module is not an instance of MySQL4n6TimeOutputModule')
MySQL4n6TimeDatabaseArgumentsHelper.ParseOptions(options, output_module)
shared_4n6time_output.Shared4n6TimeOutputArgumentsHelper.ParseOptions(
options, output_module)
def ParseOptions(cls, options, analysis_plugin):
"""Parses and validates options.
Args:
options (argparse.Namespace): parser options.
analysis_plugin (ViperAnalysisPlugin): analysis plugin to configure.
Raises:
BadConfigObject: when the output module object is of the wrong type.
BadConfigOption: when unable to connect to Viper instance.
"""
if not isinstance(analysis_plugin, viper.ViperAnalysisPlugin):
raise errors.BadConfigObject(
'Analysis plugin is not an instance of ViperAnalysisPlugin')
lookup_hash = cls._ParseStringOption(
options, 'viper_hash', default_value=cls._DEFAULT_HASH)
analysis_plugin.SetLookupHash(lookup_hash)
host = cls._ParseStringOption(
options, 'viper_host', default_value=cls._DEFAULT_HOST)
analysis_plugin.SetHost(host)
port = cls._ParseNumericOption(
options, 'viper_port', default_value=cls._DEFAULT_PORT)
analysis_plugin.SetPort(port)
protocol = cls._ParseStringOption(
options, 'viper_protocol', default_value=cls._DEFAULT_PROTOCOL)
def ParseOptions(cls, options, configuration_object):
"""Parses and validates options.
Args:
options (argparse.Namespace): parser options.
configuration_object (CLITool): object to be configured by the argument
helper.
Raises:
BadConfigObject: when the configuration object is of the wrong type.
BadConfigOption: when a configuration parameter fails validation.
"""
if not isinstance(configuration_object, tools.CLITool):
raise errors.BadConfigObject(
'Configuration object is not an instance of CLITool')
hashers = cls._ParseStringOption(
options, 'hashers', default_value=cls._DEFAULT_HASHER_STRING)
hasher_file_size_limit = cls._ParseNumericOption(
options, 'hasher_file_size_limit', default_value=0)
# TODO: validate hasher names.
if hasher_file_size_limit < 0:
raise errors.BadConfigOption(
'Invalid hasher file size limit value cannot be negative.')
setattr(configuration_object, '_hasher_names_string', hashers)
setattr(