Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testRemoveScanNode(self):
"""Test the RemoveScanNode function."""
test_fake_path_spec = fake_path_spec.FakePathSpec(location='/')
test_context = source_scanner.SourceScannerContext()
parent_node = test_context.RemoveScanNode(test_fake_path_spec)
self.assertIsNone(parent_node)
self.assertEqual(len(test_context._scan_nodes), 0)
test_context.AddScanNode(test_fake_path_spec, None)
self.assertEqual(len(test_context._scan_nodes), 1)
parent_node = test_context.RemoveScanNode(test_fake_path_spec)
self.assertIsNone(parent_node)
self.assertEqual(len(test_context._scan_nodes), 0)
def testHasFileSystemScanNodes(self):
"""Test the HasFileSystemScanNodes function."""
test_fake_path_spec = fake_path_spec.FakePathSpec(location='/')
test_context = source_scanner.SourceScannerContext()
self.assertFalse(test_context.HasFileSystemScanNodes())
test_context.AddScanNode(test_fake_path_spec, None)
self.assertTrue(test_context.HasFileSystemScanNodes())
def testScanVolume(self):
"""Tests the _ScanVolume function."""
test_mediator = TestVolumeScannerMediator()
test_scanner = volume_scanner.VolumeScanner(mediator=test_mediator)
test_options = volume_scanner.VolumeScannerOptions()
scan_context = source_scanner.SourceScannerContext()
# Test error conditions.
with self.assertRaises(errors.ScannerError):
test_scanner._ScanVolume(scan_context, None, test_options, [])
scan_node = source_scanner.SourceScanNode(None)
with self.assertRaises(errors.ScannerError):
test_scanner._ScanVolume(scan_context, scan_node, test_options, [])
def testIsSourceTypeDirectory(self):
"""Test the IsSourceTypeDirectory function."""
test_context = source_scanner.SourceScannerContext()
self.assertIsNone(test_context.IsSourceTypeDirectory())
test_context.source_type = definitions.SOURCE_TYPE_DIRECTORY
self.assertTrue(test_context.IsSourceTypeDirectory())
test_context.source_type = definitions.SOURCE_TYPE_FILE
self.assertFalse(test_context.IsSourceTypeDirectory())
def testScanEncryptedVolumeOnEncryptedAPFS(self):
"""Tests the _ScanEncryptedVolume function on an encrypted APFS image."""
test_file_path = self._GetTestFilePath(['apfs_encrypted.dmg'])
self._SkipIfPathNotExists(test_file_path)
resolver.Resolver.key_chain.Empty()
test_tool = storage_media_tool.StorageMediaTool()
test_tool._credentials = [('password', self._APFS_PASSWORD)]
scan_context = source_scanner.SourceScannerContext()
scan_context.OpenSourcePath(test_file_path)
test_tool._source_scanner.Scan(scan_context)
scan_node = self._GetTestScanNode(scan_context)
apfs_container_scan_node = scan_node.sub_nodes[4].sub_nodes[0]
# Test on volume system sub node.
test_tool._ScanEncryptedVolume(
scan_context, apfs_container_scan_node.sub_nodes[0])
def testScanVolumeSystemRootOnVSSDisabled(self):
"""Tests the _ScanVolumeSystemRoot function on VSS with VSS turned off."""
test_file_path = self._GetTestFilePath(['vsstest.qcow2'])
self._SkipIfPathNotExists(test_file_path)
test_tool = storage_media_tool.StorageMediaTool()
test_tool._process_vss = False
scan_context = source_scanner.SourceScannerContext()
scan_context.OpenSourcePath(test_file_path)
test_tool._source_scanner.Scan(scan_context)
scan_node = self._GetTestScanNode(scan_context)
vss_scan_node = scan_node.sub_nodes[0]
base_path_specs = []
test_tool._ScanVolumeSystemRoot(
scan_context, vss_scan_node, base_path_specs)
self.assertEqual(len(base_path_specs), 0)
source_path: the source path.
Returns:
A list of path specifications (instances of dfvfs.PathSpec).
Raises:
RuntimeError: if the source path does not exists, or if the source path
is not a file or directory, or if the format of or within
the source file is not supported.
"""
self.initialized = 0
if (not source_path.startswith(u'\\\\.\\') and
not os.path.exists(source_path)):
raise RuntimeError(
u'No such device, file or directory: {0:s}.'.format(source_path))
scan_context = source_scanner.SourceScannerContext()
scan_context.OpenSourcePath(source_path)
try:
self._source_scanner.Scan(scan_context)
except (errors.BackEndError, ValueError) as exception:
raise RuntimeError(
u'Unable to scan source with error: {0:s}.'.format(exception))
if scan_context.source_type not in [
definitions.SOURCE_TYPE_STORAGE_MEDIA_DEVICE,
definitions.SOURCE_TYPE_STORAGE_MEDIA_IMAGE]:
scan_node = scan_context.GetRootScanNode()
return [scan_node.path_spec]
# Get the first node where where we need to decide what to process.
scan_node = scan_context.GetRootScanNode()
source_path: the source path.
Returns:
A list of path specifications (instances of dfvfs.PathSpec).
Raises:
RuntimeError: if the source path does not exists, or if the source path
is not a file or directory, or if the format of or within
the source file is not supported.
"""
if (not source_path.startswith(u'\\\\.\\') and
not os.path.exists(source_path)):
raise RuntimeError(
u'No such device, file or directory: {0:s}.'.format(source_path))
scan_context = source_scanner.SourceScannerContext()
scan_context.OpenSourcePath(source_path)
try:
self._source_scanner.Scan(scan_context)
except (errors.BackEndError, ValueError) as exception:
raise RuntimeError(
u'Unable to scan source with error: {0:s}.'.format(exception))
if scan_context.source_type not in [
definitions.SOURCE_TYPE_STORAGE_MEDIA_DEVICE,
definitions.SOURCE_TYPE_STORAGE_MEDIA_IMAGE]:
scan_node = scan_context.GetRootScanNode()
return [scan_node.path_spec]
# Get the first node where where we need to decide what to process.
scan_node = scan_context.GetRootScanNode()
def Analyze(self, source_path, output_writer):
"""Analyzes the source.
Args:
source_path (str): the source path.
output_writer (StdoutWriter): the output writer.
Raises:
RuntimeError: if the source path does not exists, or if the source path
is not a file or directory, or if the format of or within the source
file is not supported.
"""
if not os.path.exists(source_path):
raise RuntimeError('No such source: {0:s}.'.format(source_path))
scan_context = source_scanner.SourceScannerContext()
scan_path_spec = None
scan_step = 0
scan_context.OpenSourcePath(source_path)
while True:
self._source_scanner.Scan(
scan_context, auto_recurse=self._auto_recurse,
scan_path_spec=scan_path_spec)
if not scan_context.updated:
break
if not self._auto_recurse:
output_writer.WriteScanContext(scan_context, scan_step=scan_step)
scan_step += 1