Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testGetDisplayName(self):
"""Tests the GetDisplayName function."""
session = sessions.Session()
storage_writer = fake_writer.FakeStorageWriter(session)
parsers_mediator = self._CreateParserMediator(storage_writer)
with self.assertRaises(ValueError):
parsers_mediator.GetDisplayName(file_entry=None)
test_file_path = self._GetTestFilePath(['syslog.gz'])
self._SkipIfPathNotExists(test_file_path)
os_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file_path)
file_entry = path_spec_resolver.Resolver.OpenFileEntry(os_path_spec)
display_name = parsers_mediator.GetDisplayName(file_entry=file_entry)
expected_display_name = 'OS:{0:s}'.format(test_file_path)
self.assertEqual(display_name, expected_display_name)
gzip_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_GZIP, parent=os_path_spec)
file_entry = path_spec_resolver.Resolver.OpenFileEntry(gzip_path_spec)
display_name = parsers_mediator.GetDisplayName(file_entry=file_entry)
expected_display_name = 'GZIP:{0:s}'.format(test_file_path)
self.assertEqual(display_name, expected_display_name)
test_file_path = self._GetTestFilePath(['vsstest.qcow2'])
def testOpenFileObject(self):
"""Function to test mount point resolving."""
manager.MountPointManager.RegisterMountPoint('C', self._qcow_path_spec)
parent_path_spec = mount_path_spec.MountPathSpec(identifier='C')
path_spec = tsk_path_spec.TSKPathSpec(
location='/passwords.txt', parent=parent_path_spec)
file_object = resolver.Resolver.OpenFileObject(
path_spec, resolver_context=self._resolver_context)
self.assertIsNotNone(file_object)
self.assertEqual(file_object.get_size(), 116)
file_object.close()
parent_path_spec = mount_path_spec.MountPathSpec(identifier='D')
path_spec = tsk_path_spec.TSKPathSpec(
location='/passwords.txt', parent=parent_path_spec)
with self.assertRaises(errors.MountPointError):
file_object = resolver.Resolver.OpenFileObject(
path_spec, resolver_context=self._resolver_context)
manager.MountPointManager.DeregisterMountPoint('C')
def setUp(self):
"""Sets up the needed objects used throughout the test."""
super(BDEFileWithKeyChainTest, self).setUp()
test_file = self._GetTestFilePath(['bdetogo.raw'])
self._SkipIfPathNotExists(test_file)
self._os_path_spec = os_path_spec.OSPathSpec(location=test_file)
self._bde_path_spec = bde_path_spec.BDEPathSpec(parent=self._os_path_spec)
resolver.Resolver.key_chain.SetCredential(
self._bde_path_spec, 'password', self._BDE_PASSWORD)
test_filter_file = yaml_filter_file.YAMLFilterFile()
test_path_filters = test_filter_file._ReadFromFileObject(
io.StringIO(self._YAML_FILTER_FILE_DATA))
environment_variable = artifacts.EnvironmentVariableArtifact(
case_sensitive=False, name='SystemRoot', value='C:\\Windows')
test_helper = path_filters.PathCollectionFiltersHelper()
test_helper.BuildFindSpecs(
test_path_filters, environment_variables=[environment_variable])
self.assertEqual(len(test_helper.included_file_system_find_specs), 5)
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location='.')
file_system = path_spec_resolver.Resolver.OpenFileSystem(path_spec)
searcher = file_system_searcher.FileSystemSearcher(
file_system, path_spec)
path_spec_generator = searcher.Find(
find_specs=test_helper.included_file_system_find_specs)
self.assertIsNotNone(path_spec_generator)
path_specs = list(path_spec_generator)
file_system.Close()
# Two evtx, one symbolic link to evtx, one AUTHORS, two filter_*.txt files,
# total 6 path specifications.
self.assertEqual(len(path_specs), 6)
data_stream.name for data_stream in self._GetDataStreams()]
if data_stream_name and data_stream_name not in data_stream_names:
return None
path_spec = copy.deepcopy(self.path_spec)
if data_stream_name:
# For HFS DECOMP fork name is exposed however libtsk 4.6.0 seems to handle
# these differently when opened and the correct behavior seems to be
# treating this as the default (nameless) fork instead. For context libtsk
# 4.5.0 is unable to read the data steam and yields an error.
if self._file_system.IsHFS() and data_stream_name == 'DECOMP':
data_stream_name = ''
setattr(path_spec, 'data_stream', data_stream_name)
return resolver.Resolver.OpenFileObject(
path_spec, resolver_context=self._resolver_context)
def _GetDecrypter(self):
"""Retrieves a decrypter.
Returns:
Decrypter: decrypter.
Raises:
IOError: if the decrypter cannot be initialized.
OSError: if the decrypter cannot be initialized.
"""
resolver.Resolver.key_chain.ExtractCredentialsFromPathSpec(self._path_spec)
try:
credentials = resolver.Resolver.key_chain.GetCredentials(self._path_spec)
return encryption_manager.EncryptionManager.GetDecrypter(
self._encryption_method, **credentials)
except ValueError as exception:
raise IOError(exception)
def Open(self, path_spec):
"""Opens a volume defined by path specification.
Args:
path_spec (PathSpec): a path specification.
Raises:
VolumeSystemError: if the LVM virtual file system could not be resolved.
"""
self._file_system = resolver.Resolver.OpenFileSystem(path_spec)
if self._file_system is None:
raise errors.VolumeSystemError('Unable to resolve path specification.')
type_indicator = self._file_system.type_indicator
if type_indicator != definitions.TYPE_INDICATOR_LVM:
raise errors.VolumeSystemError('Unsupported type indicator.')
gzip file entry's path spec, which is different from trying to retrieve it
from the gzip file entry's parent file entry.
It would be preferable to retrieve the modification time from the metadata
in the gzip file itself, but it appears to not be set when the file is
written by fseventsd.
Args:
gzip_file_entry (dfvfs.FileEntry): file entry of the gzip file containing
the fseventsd data.
Returns:
dfdatetime.DateTimeValues: parent modification time, or None if not
available.
"""
parent_file_entry = path_spec_resolver.Resolver.OpenFileEntry(
gzip_file_entry.path_spec.parent)
if not parent_file_entry:
return None
return parent_file_entry.modification_time
Args:
path_spec (PathSpec): path specification.
mode (Optional[str]): file access mode. The default is 'rb'
read-only binary.
Raises:
AccessError: if the access to open the file was denied.
IOError: if the file system could not be opened.
PathSpecError: if the path specification is incorrect.
ValueError: if the path specification is invalid.
"""
if not path_spec.HasParent():
raise errors.PathSpecError(
'Unsupported path specification without parent.')
resolver.Resolver.key_chain.ExtractCredentialsFromPathSpec(path_spec)
bde_volume = pybde.volume()
file_object = resolver.Resolver.OpenFileObject(
path_spec.parent, resolver_context=self._resolver_context)
try:
bde.BDEVolumeOpen(
bde_volume, path_spec, file_object, resolver.Resolver.key_chain)
except:
file_object.close()
raise
self._bde_volume = bde_volume
self._file_object = file_object