Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _GetTestFileEntry(self, path):
"""Retrieves the test file entry.
Args:
path: the path of the test file.
Returns:
The test file entry (instance of dfvfs.FileEntry).
"""
path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_OS, location=path)
return path_spec_resolver.Resolver.OpenFileEntry(path_spec)
filters helper.
knowledge_base_values (Optional[dict]): knowledge base values.
timezone (str): timezone.
Returns:
FakeStorageWriter: storage writer.
Raises:
SkipTest: if the path inside the test data directory does not exist and
the test should be skipped.
"""
test_file_path = self._GetTestFilePath(path_segments)
self._SkipIfPathNotExists(test_file_path)
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file_path)
return self._ParseFileByPathSpec(
path_spec, parser, collection_filters_helper=collection_filters_helper,
knowledge_base_values=knowledge_base_values, timezone=timezone)
def testScanForFileSystemOnVSS(self):
"""Test the ScanForFileSystem function on VSS."""
test_path = self._GetTestFilePath(['vsstest.qcow2'])
self._SkipIfPathNotExists(test_path)
test_os_path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_OS, location=test_path)
test_qcow_path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_QCOW, parent=test_os_path_spec)
test_vss_path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_VSHADOW, store_index=1,
parent=test_qcow_path_spec)
path_spec = self._source_scanner.ScanForFileSystem(test_vss_path_spec)
self.assertIsNotNone(path_spec)
expected_type_indicator = definitions.PREFERRED_NTFS_BACK_END
self.assertEqual(path_spec.type_indicator, expected_type_indicator)
def _TestScanSourceDirectory(self, source_path):
"""Tests the ScanSource function on a directory.
Args:
source_path: the path of the source device, directory or file.
"""
test_front_end = storage_media_frontend.StorageMediaFrontend()
scan_context = test_front_end.ScanSource(source_path)
self.assertNotEqual(scan_context, None)
scan_node = scan_context.GetRootScanNode()
self.assertNotEqual(scan_node, None)
self.assertEqual(
scan_node.type_indicator, dfvfs_definitions.TYPE_INDICATOR_OS)
path_spec = scan_node.path_spec
self.assertEqual(path_spec.location, os.path.abspath(source_path))
test_filter_file = filter_file.FilterFile()
test_path_filters = test_filter_file._ReadFromFileObject(
io.StringIO(self._FILTER_FILE_DATA))
environment_variable = artifacts.EnvironmentVariableArtifact(
case_sensitive=False, name='SystemRoot', value='C:\\Windows')
test_helper = path_filters.PathCollectionFiltersHelper()
test_helper.BuildFindSpecs(
test_path_filters, environment_variables=[environment_variable])
self.assertEqual(len(test_helper.included_file_system_find_specs), 5)
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location='.')
file_system = path_spec_resolver.Resolver.OpenFileSystem(path_spec)
searcher = file_system_searcher.FileSystemSearcher(
file_system, path_spec)
path_spec_generator = searcher.Find(
find_specs=test_helper.included_file_system_find_specs)
self.assertIsNotNone(path_spec_generator)
path_specs = list(path_spec_generator)
file_system.Close()
# Two evtx, one symbolic link to evtx, one AUTHORS, two filter_*.txt files,
# total 6 path specifications.
self.assertEqual(len(path_specs), 6)
class JSONOutputTest(test_lib.OutputModuleTestCase):
"""Tests for the JSON output module."""
_OS_PATH_SPEC = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location='{0:s}{1:s}'.format(
os.path.sep, os.path.join('cases', 'image.dd')))
_TEST_EVENTS = [
{'data_type': 'test:output',
'display_name': 'OS: /var/log/syslog.1',
'hostname': 'ubuntu',
'inode': 12345678,
'pathspec': path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, inode=15,
location='/var/log/syslog.1', parent=_OS_PATH_SPEC),
'text': (
'Reporter PID: |8442| (pam_unix(cron:session): session\n '
'closed for user root)'),
'timestamp': timelib.Timestamp.CopyFromString('2012-06-27 18:17:01'),
'timestamp_desc': definitions.TIME_DESCRIPTION_UNKNOWN,
'username': 'root'}]
def setUp(self):
"""Makes preparations before running an individual test."""
output_mediator = self._CreateOutputMediator()
self._output_writer = cli_test_lib.TestOutputWriter()
self._output_module = json_out.JSONOutputModule(output_mediator)
self._output_module.SetOutputWriter(self._output_writer)
def testWriteHeader(self):
from tests.output import test_lib
class NativePythonOutputTest(test_lib.OutputModuleTestCase):
"""Tests for the "raw" (or native) Python output module."""
_OS_PATH_SPEC = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location='{0:s}{1:s}'.format(
os.path.sep, os.path.join('cases', 'image.dd')))
_TEST_EVENTS = [
{'data_type': 'test:output',
'display_name': 'OS: /var/log/syslog.1',
'hostname': 'ubuntu',
'inode': 12345678,
'pathspec': path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, inode=15,
location='/var/log/syslog.1', parent=_OS_PATH_SPEC),
'text': (
'Reporter PID: |8442| (pam_unix(cron:session): session\n '
'closed for user root)'),
'timestamp': timelib.Timestamp.CopyFromString('2012-06-27 18:17:01'),
'timestamp_desc': definitions.TIME_DESCRIPTION_UNKNOWN,
'username': 'root'}]
def testWriteEventBody(self):
"""Tests the WriteEventBody function."""
output_mediator = self._CreateOutputMediator()
output_writer = cli_test_lib.TestOutputWriter()
output_module = rawpy.NativePythonOutputModule(output_mediator)
output_module.SetOutputWriter(output_writer)
def testParseFile(self):
"""Tests the Parse function on a stand-alone $MFT file."""
parser = ntfs.NTFSMFTParser()
test_file_path = self._GetTestFilePath(['MFT'])
self._SkipIfPathNotExists(test_file_path)
os_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file_path)
storage_writer = self._ParseFileByPathSpec(os_path_spec, parser)
self.assertEqual(storage_writer.number_of_warnings, 0)
self.assertEqual(storage_writer.number_of_events, 126352)
events = list(storage_writer.GetEvents())
# A distributed link tracking event.
event = events[3684]
self.CheckTimestamp(event.timestamp, '2007-06-30 12:58:40.500004')
self.assertEqual(
event.timestamp_desc, definitions.TIME_DESCRIPTION_CREATION)
from tests.output import test_lib
class JSONLinesOutputTest(test_lib.OutputModuleTestCase):
"""Tests for the JSON lines output module."""
_OS_PATH_SPEC = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location='{0:s}{1:s}'.format(
os.path.sep, os.path.join('cases', 'image.dd')))
_TEST_EVENTS = [
{'data_type': 'test:output',
'display_name': 'OS: /var/log/syslog.1',
'hostname': 'ubuntu',
'inode': 12345678,
'pathspec': path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, inode=15,
location='/var/log/syslog.1', parent=_OS_PATH_SPEC),
'text': (
'Reporter PID: |8442| (pam_unix(cron:session): session\n '
'closed for user root)'),
'timestamp': timelib.Timestamp.CopyFromString('2012-06-27 18:17:01'),
'timestamp_desc': definitions.TIME_DESCRIPTION_UNKNOWN,
'username': 'root'}]
def setUp(self):
"""Makes preparations before running an individual test."""
output_mediator = self._CreateOutputMediator()
self._output_writer = cli_test_lib.TestOutputWriter()
self._output_module = json_line.JSONLineOutputModule(output_mediator)
self._output_module.SetOutputWriter(self._output_writer)
def testPrintAPFSVolumeIdentifiersOverview(self):
"""Tests the _PrintAPFSVolumeIdentifiersOverview function."""
test_file_path = self._GetTestFilePath(['apfs.dmg'])
self._SkipIfPathNotExists(test_file_path)
test_os_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file_path)
test_raw_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_RAW, parent=test_os_path_spec)
test_tsk_partition_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK_PARTITION, location='/p1',
parent=test_raw_path_spec)
test_apfs_container_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_APFS_CONTAINER, location='/',
parent=test_tsk_partition_path_spec)
volume_system = apfs_volume_system.APFSVolumeSystem()
volume_system.Open(test_apfs_container_path_spec)
file_object = io.BytesIO()
test_output_writer = tools.FileObjectOutputWriter(file_object)
test_tool = storage_media_tool.StorageMediaTool(
output_writer=test_output_writer)
test_tool._PrintAPFSVolumeIdentifiersOverview(volume_system, ['apfs1'])
file_object.seek(0, os.SEEK_SET)
output_data = file_object.read()