How to use the dfvfs.helpers.source_scanner.SourceScannerContext function in dfvfs

To help you get started, we’ve selected a few dfvfs examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github log2timeline / dfvfs / tests / helpers / source_scanner.py View on Github external
def testRemoveScanNode(self):
    """Test the RemoveScanNode function."""
    test_fake_path_spec = fake_path_spec.FakePathSpec(location='/')
    test_context = source_scanner.SourceScannerContext()

    parent_node = test_context.RemoveScanNode(test_fake_path_spec)
    self.assertIsNone(parent_node)

    self.assertEqual(len(test_context._scan_nodes), 0)

    test_context.AddScanNode(test_fake_path_spec, None)

    self.assertEqual(len(test_context._scan_nodes), 1)

    parent_node = test_context.RemoveScanNode(test_fake_path_spec)
    self.assertIsNone(parent_node)

    self.assertEqual(len(test_context._scan_nodes), 0)
github log2timeline / dfvfs / tests / helpers / source_scanner.py View on Github external
def testHasFileSystemScanNodes(self):
    """Test the HasFileSystemScanNodes function."""
    test_fake_path_spec = fake_path_spec.FakePathSpec(location='/')
    test_context = source_scanner.SourceScannerContext()

    self.assertFalse(test_context.HasFileSystemScanNodes())

    test_context.AddScanNode(test_fake_path_spec, None)

    self.assertTrue(test_context.HasFileSystemScanNodes())
github log2timeline / dfvfs / tests / helpers / volume_scanner.py View on Github external
def testScanVolume(self):
    """Tests the _ScanVolume function."""
    test_mediator = TestVolumeScannerMediator()
    test_scanner = volume_scanner.VolumeScanner(mediator=test_mediator)
    test_options = volume_scanner.VolumeScannerOptions()

    scan_context = source_scanner.SourceScannerContext()

    # Test error conditions.
    with self.assertRaises(errors.ScannerError):
      test_scanner._ScanVolume(scan_context, None, test_options, [])

    scan_node = source_scanner.SourceScanNode(None)
    with self.assertRaises(errors.ScannerError):
      test_scanner._ScanVolume(scan_context, scan_node, test_options, [])
github log2timeline / dfvfs / tests / helpers / source_scanner.py View on Github external
def testIsSourceTypeDirectory(self):
    """Test the IsSourceTypeDirectory function."""
    test_context = source_scanner.SourceScannerContext()

    self.assertIsNone(test_context.IsSourceTypeDirectory())

    test_context.source_type = definitions.SOURCE_TYPE_DIRECTORY
    self.assertTrue(test_context.IsSourceTypeDirectory())

    test_context.source_type = definitions.SOURCE_TYPE_FILE
    self.assertFalse(test_context.IsSourceTypeDirectory())
github log2timeline / plaso / tests / cli / storage_media_tool.py View on Github external
def testScanEncryptedVolumeOnEncryptedAPFS(self):
    """Tests the _ScanEncryptedVolume function on an encrypted APFS image."""
    test_file_path = self._GetTestFilePath(['apfs_encrypted.dmg'])
    self._SkipIfPathNotExists(test_file_path)

    resolver.Resolver.key_chain.Empty()

    test_tool = storage_media_tool.StorageMediaTool()
    test_tool._credentials = [('password', self._APFS_PASSWORD)]

    scan_context = source_scanner.SourceScannerContext()
    scan_context.OpenSourcePath(test_file_path)

    test_tool._source_scanner.Scan(scan_context)
    scan_node = self._GetTestScanNode(scan_context)

    apfs_container_scan_node = scan_node.sub_nodes[4].sub_nodes[0]

    # Test on volume system sub node.
    test_tool._ScanEncryptedVolume(
        scan_context, apfs_container_scan_node.sub_nodes[0])
github log2timeline / plaso / tests / cli / storage_media_tool.py View on Github external
def testScanVolumeSystemRootOnVSSDisabled(self):
    """Tests the _ScanVolumeSystemRoot function on VSS with VSS turned off."""
    test_file_path = self._GetTestFilePath(['vsstest.qcow2'])
    self._SkipIfPathNotExists(test_file_path)

    test_tool = storage_media_tool.StorageMediaTool()
    test_tool._process_vss = False

    scan_context = source_scanner.SourceScannerContext()
    scan_context.OpenSourcePath(test_file_path)

    test_tool._source_scanner.Scan(scan_context)
    scan_node = self._GetTestScanNode(scan_context)

    vss_scan_node = scan_node.sub_nodes[0]

    base_path_specs = []
    test_tool._ScanVolumeSystemRoot(
        scan_context, vss_scan_node, base_path_specs)
    self.assertEqual(len(base_path_specs), 0)
github maurermj08 / vftools / dfvfs_utils / dfvfs_util.py View on Github external
source_path: the source path.
        Returns:
            A list of path specifications (instances of dfvfs.PathSpec).
        Raises:
            RuntimeError: if the source path does not exists, or if the source path
                                        is not a file or directory, or if the format of or within
                                        the source file is not supported.
        """
        self.initialized = 0

        if (not source_path.startswith(u'\\\\.\\') and
                not os.path.exists(source_path)):
            raise RuntimeError(
                u'No such device, file or directory: {0:s}.'.format(source_path))

        scan_context = source_scanner.SourceScannerContext()
        scan_context.OpenSourcePath(source_path)

        try:
            self._source_scanner.Scan(scan_context)
        except (errors.BackEndError, ValueError) as exception:
            raise RuntimeError(
                u'Unable to scan source with error: {0:s}.'.format(exception))

        if scan_context.source_type not in [
            definitions.SOURCE_TYPE_STORAGE_MEDIA_DEVICE,
            definitions.SOURCE_TYPE_STORAGE_MEDIA_IMAGE]:
            scan_node = scan_context.GetRootScanNode()
            return [scan_node.path_spec]

        # Get the first node where where we need to decide what to process.
        scan_node = scan_context.GetRootScanNode()
github log2timeline / dfvfs / examples / recursive_hasher2.py View on Github external
source_path: the source path.

    Returns:
      A list of path specifications (instances of dfvfs.PathSpec).

    Raises:
      RuntimeError: if the source path does not exists, or if the source path
                    is not a file or directory, or if the format of or within
                    the source file is not supported.
    """
    if (not source_path.startswith(u'\\\\.\\') and
        not os.path.exists(source_path)):
      raise RuntimeError(
          u'No such device, file or directory: {0:s}.'.format(source_path))

    scan_context = source_scanner.SourceScannerContext()
    scan_context.OpenSourcePath(source_path)

    try:
      self._source_scanner.Scan(scan_context)
    except (errors.BackEndError, ValueError) as exception:
      raise RuntimeError(
          u'Unable to scan source with error: {0:s}.'.format(exception))

    if scan_context.source_type not in [
        definitions.SOURCE_TYPE_STORAGE_MEDIA_DEVICE,
        definitions.SOURCE_TYPE_STORAGE_MEDIA_IMAGE]:
      scan_node = scan_context.GetRootScanNode()
      return [scan_node.path_spec]

    # Get the first node where where we need to decide what to process.
    scan_node = scan_context.GetRootScanNode()
github log2timeline / dfvfs / examples / source_analyzer.py View on Github external
def Analyze(self, source_path, output_writer):
    """Analyzes the source.

    Args:
      source_path (str): the source path.
      output_writer (StdoutWriter): the output writer.

    Raises:
      RuntimeError: if the source path does not exists, or if the source path
          is not a file or directory, or if the format of or within the source
          file is not supported.
    """
    if not os.path.exists(source_path):
      raise RuntimeError('No such source: {0:s}.'.format(source_path))

    scan_context = source_scanner.SourceScannerContext()
    scan_path_spec = None
    scan_step = 0

    scan_context.OpenSourcePath(source_path)

    while True:
      self._source_scanner.Scan(
          scan_context, auto_recurse=self._auto_recurse,
          scan_path_spec=scan_path_spec)

      if not scan_context.updated:
        break

      if not self._auto_recurse:
        output_writer.WriteScanContext(scan_context, scan_step=scan_step)
      scan_step += 1