Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testAPFSUnlockVolumeOnAPFS(self):
"""Tests the APFSUnlockVolume function on an APFS image."""
resolver_context = context.Context()
test_path = self._GetTestFilePath(['apfs.raw'])
self._SkipIfPathNotExists(test_path)
test_os_path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_OS, location=test_path)
test_raw_path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_RAW, parent=test_os_path_spec)
test_apfs_container_path_spec = path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_APFS_CONTAINER, location='/apfs1',
parent=test_raw_path_spec)
container_file_entry = resolver.Resolver.OpenFileEntry(
test_apfs_container_path_spec, resolver_context=resolver_context)
fsapfs_volume = container_file_entry.GetAPFSVolume()
is_unlocked = apfs_helper.APFSUnlockVolume(
fsapfs_volume, test_apfs_container_path_spec,
resolver.Resolver.key_chain)
self.assertTrue(is_unlocked)
"""Sets up the needed objects used throughout the test."""
self._resolver_context = context.Context()
test_file = self._GetTestFilePath(['syslog.des3'])
self._SkipIfPathNotExists(test_file)
self._os_path_spec = os_path_spec.OSPathSpec(location=test_file)
self._encrypted_stream_path_spec = (
encrypted_stream_path_spec.EncryptedStreamPathSpec(
encryption_method=definitions.ENCRYPTION_METHOD_DES3,
parent=self._os_path_spec))
resolver.Resolver.key_chain.SetCredential(
self._encrypted_stream_path_spec, 'key', self._DES3_KEY)
resolver.Resolver.key_chain.SetCredential(
self._encrypted_stream_path_spec, 'initialization_vector',
self._DES3_IV)
resolver.Resolver.key_chain.SetCredential(
self._encrypted_stream_path_spec, 'cipher_mode', self._DES3_MODE)
self.padding_size = 1
def testScanVolumeOnEncryptedAPFS(self):
"""Tests the _ScanVolume function on an encrypted APFS image."""
test_file_path = self._GetTestFilePath(['apfs_encrypted.dmg'])
self._SkipIfPathNotExists(test_file_path)
resolver.Resolver.key_chain.Empty()
test_tool = storage_media_tool.StorageMediaTool()
test_tool._credentials = [('password', self._APFS_PASSWORD)]
scan_context = source_scanner.SourceScannerContext()
scan_context.OpenSourcePath(test_file_path)
test_tool._source_scanner.Scan(scan_context)
scan_node = self._GetTestScanNode(scan_context)
apfs_container_scan_node = scan_node.sub_nodes[4].sub_nodes[0]
# Test on volume system root node.
base_path_specs = []
test_tool._ScanVolume(
scan_context, apfs_container_scan_node, base_path_specs)
BackEndError: if the scan node cannot be unlocked.
ValueError: if the scan context or scan node is invalid.
"""
if scan_node.type_indicator == definitions.TYPE_INDICATOR_APFS_CONTAINER:
# TODO: consider changes this when upstream changes have been made.
# Currently pyfsapfs does not support reading from a volume as a device.
# Also see: https://github.com/log2timeline/dfvfs/issues/332
container_file_entry = resolver.Resolver.OpenFileEntry(
scan_node.path_spec, resolver_context=self._resolver_context)
fsapfs_volume = container_file_entry.GetAPFSVolume()
# TODO: unlocking the volume multiple times is inefficient cache volume
# object in scan node and use is_locked = fsapfs_volume.is_locked()
try:
is_locked = not apfs_helper.APFSUnlockVolume(
fsapfs_volume, scan_node.path_spec, resolver.Resolver.key_chain)
except IOError as exception:
raise errors.BackEndError(
'Unable to unlock APFS volume with error: {0!s}'.format(exception))
else:
file_object = resolver.Resolver.OpenFileObject(
scan_node.path_spec, resolver_context=self._resolver_context)
is_locked = not file_object or file_object.is_locked
file_object.close()
if is_locked:
scan_context.LockScanNode(scan_node.path_spec)
# For BitLocker To Go add a scan node for the unencrypted part of
# the volume.
if scan_node.type_indicator == definitions.TYPE_INDICATOR_BDE:
"""Opens the file-like object defined by path specification.
Args:
path_spec (PathSpec): path specification.
Returns:
pyvde.volume: BDE volume file-like object.
Raises:
PathSpecError: if the path specification is incorrect.
"""
if not path_spec.HasParent():
raise errors.PathSpecError(
'Unsupported path specification without parent.')
resolver.Resolver.key_chain.ExtractCredentialsFromPathSpec(path_spec)
file_object = resolver.Resolver.OpenFileObject(
path_spec.parent, resolver_context=self._resolver_context)
bde_volume = pybde.volume()
bde.BDEVolumeOpen(
bde_volume, path_spec, file_object, resolver.Resolver.key_chain)
return bde_volume
def FileEntryExistsByPathSpec(self, path_spec):
"""Determines if a file entry for a path specification exists.
Args:
path_spec (PathSpec): path specification.
Returns:
bool: True if the file entry exists.
"""
# All checks for correct path spec is done in SQLiteBlobFile.
# Therefore, attempt to open the path specification and
# check if errors occurred.
try:
file_object = resolver.Resolver.OpenFileObject(
path_spec, resolver_context=self._resolver_context)
except (IOError, ValueError, errors.AccessError, errors.PathSpecError):
return False
file_object.close()
return True
Args:
path_spec (PathSpec): a path specification.
mode (Optional[str])): file access mode. The default is 'rb' read-only
binary.
Raises:
AccessError: if the access to open the file was denied.
IOError: if the file system object could not be opened.
PathSpecError: if the path specification is incorrect.
ValueError: if the path specification is invalid.
"""
if not path_spec.HasParent():
raise errors.PathSpecError(
'Unsupported path specification without parent.')
file_object = resolver.Resolver.OpenFileObject(
path_spec.parent, resolver_context=self._resolver_context)
try:
fsapfs_container = pyfsapfs.container()
fsapfs_container.open_file_object(file_object)
except:
file_object.close()
raise
self._file_object = file_object
self._fsapfs_container = fsapfs_container
# Note that we don't want to set the keyword arguments when not used
# because the path specification base class will check for unused
# keyword arguments and raise.
kwargs = path_spec_factory.Factory.GetProperties(path_spec)
kwargs['location'] = parent_file_location
if path_spec.parent is not None:
kwargs['parent'] = path_spec.parent
parent_file_path_spec = path_spec_factory.Factory.NewPathSpec(
path_spec.type_indicator, **kwargs)
if not file_system.FileEntryExistsByPathSpec(parent_file_path_spec):
return
file_object = resolver.Resolver.OpenFileObject(
parent_file_path_spec, resolver_context=self._resolver_context)
vhdi_parent_file = pyvhdi.file()
vhdi_parent_file.open_file_object(file_object)
if vhdi_parent_file.parent_identifier: # pylint: disable=using-constant-test
self._OpenParentFile(
file_system, parent_file_path_spec, vhdi_parent_file)
vhdi_file.set_parent(vhdi_parent_file)
self._parent_vhdi_files.append(vhdi_parent_file)
self._sub_file_objects.append(file_object)
path_spec (Optional[dfvfs.PathSpec]): a path specification.
mode (Optional[str]): file access mode. The default is 'rb' read-only
binary.
Raises:
AccessError: if the access to open the file was denied.
IOError: if the file system object could not be opened.
OSError: if the file system object could not be opened.
PathSpecError: if the path specification is incorrect.
ValueError: if the path specification is invalid.
"""
if not path_spec.HasParent():
raise errors.PathSpecError(
'Unsupported path specification without parent.')
file_object = resolver.Resolver.OpenFileObject(
path_spec.parent, resolver_context=self._resolver_context)
regf_file = pyregf.file()
regf_file.open_file_object(file_object)
regf_base_key = self._regf_file.get_root_key()
self._file_object = file_object
self._regf_file = regf_file
self._regf_base_key = regf_base_key