Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
"""Sets up the needed objects used throughout the test."""
self._resolver_context = context.Context()
test_file = self._GetTestFilePath(['ext2.raw'])
self._SkipIfPathNotExists(test_file)
self._os_path_spec = os_path_spec.OSPathSpec(location=test_file)
self._tsk_path_spec = tsk_path_spec.TSKPathSpec(
location='/', parent=self._os_path_spec)
self._file_system = tsk_file_system.TSKFileSystem(self._resolver_context)
self._file_system.Open(self._tsk_path_spec)
def setUp(self):
"""Sets up the needed objects used throughout the test."""
self._resolver_context = context.Context()
def setUp(self):
"""Sets up the needed objects used throughout the test."""
self._resolver_context = context.Context()
test_file = self._GetTestFilePath([])
self._SkipIfPathNotExists(test_file)
self._os_path_spec = os_path_spec.OSPathSpec(location=test_file)
self._os_file_system = os_file_system.OSFileSystem(self._resolver_context)
# TODO: add RAW volume only test image.
test_file = self._GetTestFilePath(['vsstest.qcow2'])
self._SkipIfPathNotExists(test_file)
path_spec = os_path_spec.OSPathSpec(location=test_file)
self._qcow_path_spec = qcow_path_spec.QCOWPathSpec(parent=path_spec)
self._tsk_path_spec = tsk_path_spec.TSKPathSpec(
location='/', parent=self._qcow_path_spec)
def setUp(self):
"""Sets up the needed objects used throughout the test."""
self._resolver_context = context.Context()
test_file = self._GetTestFilePath(['syslog.bz2'])
self._SkipIfPathNotExists(test_file)
path_spec = os_path_spec.OSPathSpec(location=test_file)
self._compressed_stream_path_spec = (
compressed_stream_path_spec.CompressedStreamPathSpec(
compression_method=definitions.COMPRESSION_METHOD_BZIP2,
parent=path_spec))
self._file_system = (
compressed_stream_file_system.CompressedStreamFileSystem(
self._resolver_context))
self._file_system.Open(self._compressed_stream_path_spec)
def setUp(self):
"""Sets up the needed objects used throughout the test."""
self._resolver_context = context.Context()
test_file = self._GetTestFilePath(['blob.db'])
self._SkipIfPathNotExists(test_file)
path_spec = os_path_spec.OSPathSpec(location=test_file)
self._sqlite_blob_path_spec = sqlite_blob_path_spec.SQLiteBlobPathSpec(
table_name='myblobs', column_name='blobs',
row_condition=('name', '==', 'mmssms.db'), parent=path_spec)
self._sqlite_blob_path_spec_2 = sqlite_blob_path_spec.SQLiteBlobPathSpec(
table_name='myblobs', column_name='blobs',
row_index=2, parent=path_spec)
self._sqlite_blob_path_spec_3 = sqlite_blob_path_spec.SQLiteBlobPathSpec(
table_name='myblobs', column_name='blobs',
row_condition=('name', '==', 4), parent=path_spec)
self._sqlite_blob_path_spec_directory = (
sqlite_blob_path_spec.SQLiteBlobPathSpec(
table_name='myblobs', column_name='blobs', parent=path_spec))
def setUp(self):
"""Sets up the needed objects used throughout the test."""
self._resolver_context = context.Context()
test_file = self._GetTestFilePath(['syslog.db'])
self._SkipIfPathNotExists(test_file)
path_spec = os_path_spec.OSPathSpec(location=test_file)
self._sqlite_blob_path_spec = sqlite_blob_path_spec.SQLiteBlobPathSpec(
table_name='blobs', column_name='blob',
row_condition=('identifier', '==', 'myblob'), parent=path_spec)
def _BuildFileFakeFileSystem(
self, filename, number_of_segments, segment_file_path_specs):
"""Builds a fake file system containing EWF segment files.
Args:
filename (str): filename of the first segment file with extension.
number_of_segments (int): number of segments.
segment_file_path_specs (list[PathSpec]): resulting segment file path
specifications.
Returns:
FakeFileSystem: fake file system.
"""
resolver_context = context.Context()
file_system = fake_file_system.FakeFileSystem(resolver_context)
filename, _, extension = filename.partition('.')
number_of_segments += 1
for segment_number in range(1, number_of_segments):
if segment_number < 100:
if extension[1] == 'x':
path = '/{0:s}.{1:s}x{2:02d}'.format(
filename, extension[0], segment_number)
else:
path = '/{0:s}.{1:s}{2:02d}'.format(
filename, extension[0], segment_number)
else:
segment_index = segment_number - 100
segment_index, remainder = divmod(segment_index, 26)
def testCacheFileSystem(self):
"""Tests the cache file system object functionality."""
resolver_context = context.Context()
# pylint: disable=protected-access
self.assertEqual(len(resolver_context._file_system_cache._values), 0)
path_spec = fake_path_spec.FakePathSpec(location='/')
file_system = fake_file_system.FakeFileSystem(resolver_context)
resolver_context.CacheFileSystem(path_spec, file_system)
self.assertEqual(len(resolver_context._file_system_cache._values), 1)
cached_object = resolver_context.GetFileSystem(path_spec)
self.assertEqual(cached_object, file_system)
resolver_context.GrabFileSystem(path_spec)
self.assertEqual(len(resolver_context._file_system_cache._values), 1)
def _Main(self):
"""The main loop."""
# We need a resolver context per process to prevent multi processing
# issues with file objects stored in images.
resolver_context = context.Context()
for credential_configuration in self._processing_configuration.credentials:
resolver.Resolver.key_chain.SetCredential(
credential_configuration.path_spec,
credential_configuration.credential_type,
credential_configuration.credential_data)
self._parser_mediator = parsers_mediator.ParserMediator(
None, self._knowledge_base,
collection_filters_helper=self._collection_filters_helper,
preferred_year=self._processing_configuration.preferred_year,
resolver_context=resolver_context,
temporary_directory=self._processing_configuration.temporary_directory)
self._parser_mediator.SetEventExtractionConfiguration(
self._processing_configuration.event_extraction)