Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testParseSystemWithArtifactFilters(self):
"""Tests the Parse function on a SYSTEM file with artifact filters."""
artifacts_path = self._GetTestFilePath(['artifacts'])
self._SkipIfPathNotExists(artifacts_path)
parser = winreg.WinRegistryParser()
knowledge_base = knowledge_base_engine.KnowledgeBase()
artifact_filter_names = ['TestRegistryKey', 'TestRegistryValue']
registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()
registry.ReadFromDirectory(reader, artifacts_path)
artifacts_filters_helper = (
artifact_filters.ArtifactDefinitionsFiltersHelper(
registry, knowledge_base))
artifacts_filters_helper.BuildFindSpecs(
artifact_filter_names, environment_variables=None)
storage_writer = self._ParseFile(
['SYSTEM'], parser, collection_filters_helper=artifacts_filters_helper)
events = list(storage_writer.GetEvents())
def _CreateTestArtifactDefinitionsFiltersHelper(self, knowledge_base):
"""Creates an artifact definitions filters helper for testing.
Args:
knowledge_base (KnowledgeBase): contains information from the source
data needed for filtering.
Returns:
ArtifactDefinitionsFiltersHelper: artifact definitions filters helper.
Raises:
SkipTest: if the path inside the test data directory does not exist and
the test should be skipped.
"""
registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()
test_artifacts_path = self._GetTestFilePath(['artifacts'])
self._SkipIfPathNotExists(test_artifacts_path)
registry.ReadFromDirectory(reader, test_artifacts_path)
return artifact_filters.ArtifactDefinitionsFiltersHelper(
registry, knowledge_base)
def testCollectFromFileSystem(self):
"""Tests the CollectFromFileSystem function."""
artifacts_path = self._GetTestFilePath(['artifacts'])
self._SkipIfPathNotExists(artifacts_path)
registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()
registry.ReadFromDirectory(reader, artifacts_path)
knowledge_base_object = knowledge_base_library.KnowledgeBase()
_ = knowledge_base_object
def testProcessSources(self):
"""Tests the ProcessSources function."""
test_artifacts_path = self._GetTestFilePath(['artifacts'])
self._SkipIfPathNotExists(test_artifacts_path)
test_file_path = self._GetTestFilePath(['ímynd.dd'])
self._SkipIfPathNotExists(test_file_path)
registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()
registry.ReadFromDirectory(reader, test_artifacts_path)
test_engine = single_process.SingleProcessEngine()
resolver_context = context.Context()
session = sessions.Session()
os_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file_path)
source_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, location='/',
parent=os_path_spec)
test_engine.PreprocessSources(registry, [source_path_spec])
storage_writer = fake_writer.FakeStorageWriter(session)
def setUpClass(cls):
"""Makes preparations before running any of the tests."""
artifacts_path = shared_test_lib.GetTestFilePath(['artifacts'])
cls._artifacts_registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()
cls._artifacts_registry.ReadFromDirectory(reader, artifacts_path)
def testPreprocessSources(self):
"""Tests the PreprocessSources function."""
test_file_path = self._GetTestFilePath(['SOFTWARE'])
self._SkipIfPathNotExists(test_file_path)
test_file_path = self._GetTestFilePath(['SYSTEM'])
self._SkipIfPathNotExists(test_file_path)
test_artifacts_path = shared_test_lib.GetTestFilePath(['artifacts'])
self._SkipIfPathNotExists(test_artifacts_path)
registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()
registry.ReadFromDirectory(reader, test_artifacts_path)
test_engine = TestEngine()
source_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_FAKE, location='/')
test_engine.PreprocessSources(registry, [source_path_spec])
operating_system = test_engine.knowledge_base.GetValue('operating_system')
self.assertEqual(operating_system, 'Windows NT')
test_engine.PreprocessSources(registry, [None])
super(RekallEfilter, self).__init__()
self.type_name = type_name
self.fields = fields or []
def AsDict(self):
source_type_attributes = dict(query=self.query)
if self.type_name:
source_type_attributes["type_name"] = self.type_name
if self.fields:
source_type_attributes["fields"] = self.fields
return source_type_attributes
artifact_registry = registry.ArtifactDefinitionsRegistry()
artifact_registry.RegisterSourceType(RekallEfilter)
def is_definition_in_db(current, name):
db = current.db
return db(db.artifacts.name == name).select().first()
def add(current, artifact):
"""Adds a new artifact to the database."""
db = current.db
decoded_artifacts = []
artifact_snippets = re.split("^---$", artifact, flags=re.M | re.S)
for snippet in artifact_snippets:
decoded_artifact = yaml.safe_load(snippet)
if not decoded_artifact:
def __init__(self):
"""Initializes an artifact definitions validator."""
super(ArtifactDefinitionsValidator, self).__init__()
self._artifact_registry = registry.ArtifactDefinitionsRegistry()
self._artifact_registry_key_paths = set()
artifacts.ArtifactDefinitionsRegistry: artifact definitions registry.
Raises:
BadConfigOption: if artifact definitions cannot be read.
"""
if artifact_definitions_path and not os.path.isdir(
artifact_definitions_path):
raise errors.BadConfigOption(
'No such artifacts filter file: {0:s}.'.format(
artifact_definitions_path))
if custom_artifacts_path and not os.path.isfile(custom_artifacts_path):
raise errors.BadConfigOption(
'No such artifacts filter file: {0:s}.'.format(custom_artifacts_path))
registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()
try:
registry.ReadFromDirectory(reader, artifact_definitions_path)
except (KeyError, artifacts_errors.FormatError) as exception:
raise errors.BadConfigOption((
'Unable to read artifact definitions from: {0:s} with error: '
'{1!s}').format(artifact_definitions_path, exception))
if custom_artifacts_path:
try:
registry.ReadFromFile(reader, custom_artifacts_path)
except (KeyError, artifacts_errors.FormatError) as exception:
raise errors.BadConfigOption((
Args:
type_indicator (str): source type indicator.
attributes (dict[str, object]): source attributes.
Returns:
SourceType: a source type.
Raises:
FormatError: if the type indicator is not set or unsupported,
or if required attributes are missing.
"""
if not type_indicator:
raise errors.FormatError('Missing type indicator.')
try:
source_object = registry.ArtifactDefinitionsRegistry.CreateSourceType(
type_indicator, attributes)
except (AttributeError, TypeError) as exception:
raise errors.FormatError((
'Unable to create source type: {0:s} for artifact definition: {1:s} '
'with error: {2!s}').format(type_indicator, self.name, exception))
self.sources.append(source_object)
return source_object