Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _GetTestFileEntry(self, path):
"""Retrieves the test file entry.
Args:
path: the path of the test file.
Returns:
The test file entry (instance of dfvfs.FileEntry).
"""
path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_OS, location=path)
return path_spec_resolver.Resolver.OpenFileEntry(path_spec)
test_filter_file = filter_file.FilterFile()
test_path_filters = test_filter_file._ReadFromFileObject(
io.StringIO(self._FILTER_FILE_DATA))
environment_variable = artifacts.EnvironmentVariableArtifact(
case_sensitive=False, name='SystemRoot', value='C:\\Windows')
test_helper = path_filters.PathCollectionFiltersHelper()
test_helper.BuildFindSpecs(
test_path_filters, environment_variables=[environment_variable])
self.assertEqual(len(test_helper.included_file_system_find_specs), 5)
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location='.')
file_system = path_spec_resolver.Resolver.OpenFileSystem(path_spec)
searcher = file_system_searcher.FileSystemSearcher(
file_system, path_spec)
path_spec_generator = searcher.Find(
find_specs=test_helper.included_file_system_find_specs)
self.assertIsNotNone(path_spec_generator)
path_specs = list(path_spec_generator)
file_system.Close()
# Two evtx, one symbolic link to evtx, one AUTHORS, two filter_*.txt files,
# total 6 path specifications.
self.assertEqual(len(path_specs), 6)
def testParseV1(self):
"""Tests the Parse function on a version 1 file."""
parser = fseventsd.FseventsdParser()
path = self._GetTestFilePath(['fsevents-0000000002d89b58'])
os_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=path)
gzip_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_GZIP, parent=os_path_spec)
storage_writer = self._ParseFileByPathSpec(gzip_path_spec, parser)
self.assertEqual(storage_writer.number_of_warnings, 0)
self.assertEqual(storage_writer.number_of_events, 12)
events = list(storage_writer.GetEvents())
event = events[3]
# Do not check the timestamp since it is derived from the file entry.
event_data = self._GetEventDataOfEvent(storage_writer, event)
test_artifacts_path = self._GetTestFilePath(['artifacts'])
self._SkipIfPathNotExists(test_artifacts_path)
test_file_path = self._GetTestFilePath(['ímynd.dd'])
self._SkipIfPathNotExists(test_file_path)
registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()
registry.ReadFromDirectory(reader, test_artifacts_path)
test_engine = single_process.SingleProcessEngine()
resolver_context = context.Context()
session = sessions.Session()
os_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file_path)
source_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, location='/',
parent=os_path_spec)
test_engine.PreprocessSources(registry, [source_path_spec])
storage_writer = fake_writer.FakeStorageWriter(session)
configuration = configurations.ProcessingConfiguration()
configuration.parser_filter_expression = 'filestat'
test_engine.ProcessSources(
[source_path_spec], storage_writer, resolver_context, configuration)
self.assertEqual(storage_writer.number_of_events, 15)
expected_display_name = 'OS:{0:s}'.format(test_path)
display_name = path_helper.PathHelper.GetDisplayNameForPathSpec(
os_path_spec)
self.assertEqual(display_name, expected_display_name)
gzip_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_GZIP, parent=os_path_spec)
expected_display_name = 'GZIP:{0:s}'.format(test_path)
display_name = path_helper.PathHelper.GetDisplayNameForPathSpec(
gzip_path_spec)
self.assertEqual(display_name, expected_display_name)
test_path = self._GetTestFilePath(['vsstest.qcow2'])
os_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_path)
qcow_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_QCOW, parent=os_path_spec)
vshadow_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_VSHADOW, location='/vss2',
store_index=1, parent=qcow_path_spec)
tsk_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, inode=35, location='/syslog.gz',
parent=vshadow_path_spec)
expected_display_name = 'VSS2:TSK:/syslog.gz'
display_name = path_helper.PathHelper.GetDisplayNameForPathSpec(
tsk_path_spec)
self.assertEqual(display_name, expected_display_name)
expected_display_name = 'VSS2:TSK:C:/syslog.gz'
display_name = path_helper.PathHelper.GetDisplayNameForPathSpec(
def _GetTestFileEntryFromPath(self, path_segments):
"""Creates a file entry that references a file in the test dir.
Args:
path_segments: the path segments inside the test data directory.
Returns:
A file entry object (instance of dfvfs.FileEntry).
"""
path = self._GetTestFilePath(path_segments)
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=path)
return path_spec_resolver.Resolver.OpenFileEntry(path_spec)
def testMatches(self):
"""Tests the Matches function."""
test_file_path = self._GetTestFilePath(['ímynd.dd'])
self._SkipIfPathNotExists(test_file_path)
os_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file_path)
test_filter = file_entry_filters.NamesFileEntryFilter(['passwords.txt'])
# Test a filter non-match.
tsk_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, inode=16,
location='/a_directory/another_file', parent=os_path_spec)
file_entry = path_spec_resolver.Resolver.OpenFileEntry(tsk_path_spec)
self.assertFalse(test_filter.Matches(file_entry))
# Test a filter on a directory.
tsk_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, inode=12,
location='/a_directory', parent=os_path_spec)
def testGetSourceFileSystem(self):
"""Tests the GetSourceFileSystem function."""
test_engine = engine.BaseEngine()
test_file_path = self._GetTestFilePath(['ímynd.dd'])
self._SkipIfPathNotExists(test_file_path)
os_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file_path)
source_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, location='/',
parent=os_path_spec)
resolver_context = context.Context()
test_file_system, test_mount_point = test_engine.GetSourceFileSystem(
source_path_spec, resolver_context=resolver_context)
self.assertIsNotNone(test_file_system)
self.assertIsInstance(test_file_system, dfvfs_file_system.FileSystem)
self.assertIsNotNone(test_mount_point)
self.assertIsInstance(test_mount_point, path_spec.PathSpec)
test_file_system.Close()
except OSError as exception:
if exception.errno == errno.EACCES:
raise errors.AccessError(
'Access to directory: {0:s} denied with error: {1!s}'.format(
location, exception))
raise errors.BackEndError(
'Unable to list directory: {0:s} with error: {1!s}'.format(
location, exception))
class OSFileEntry(file_entry.FileEntry):
"""File system file entry that uses os."""
TYPE_INDICATOR = definitions.TYPE_INDICATOR_OS
_OS_IS_WINDOWS = platform.system() == 'Windows'
def __init__(self, resolver_context, file_system, path_spec, is_root=False):
"""Initializes a file entry.
Args:
resolver_context (Context): resolver context.
file_system (FileSystem): file system.
path_spec (PathSpec): path specification.
is_root (Optional[bool]): True if the file entry is the root file entry
of the corresponding file system.
"""
location = getattr(path_spec, 'location', None)
# Windows does not support running os.stat on device files so we use
# -*- coding: utf-8 -*-
"""The operating system path specification resolver helper implementation."""
from __future__ import unicode_literals
from dfvfs.file_io import os_file_io
from dfvfs.lib import definitions
from dfvfs.resolver_helpers import manager
from dfvfs.resolver_helpers import resolver_helper
from dfvfs.vfs import os_file_system
class OSResolverHelper(resolver_helper.ResolverHelper):
"""Operating system resolver helper."""
TYPE_INDICATOR = definitions.TYPE_INDICATOR_OS
def NewFileObject(self, resolver_context):
"""Creates a new file-like object.
Args:
resolver_context (Context): resolver context.
Returns:
FileIO: file-like object.
"""
return os_file_io.OSFile(resolver_context)
def NewFileSystem(self, resolver_context):
"""Creates a new file system object.
Args: