Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _CreateStorageWriter(self):
"""Creates a storage writer object.
Returns:
FakeStorageWriter: storage writer.
"""
session = sessions.Session()
storage_writer = fake_writer.FakeStorageWriter(session)
storage_writer.Open()
return storage_writer
def _ParseESEDBFileWithPlugin(
self, path_segments, plugin, knowledge_base_values=None):
"""Parses a file as an ESE database file and returns an event generator.
Args:
path_segments (list[str]): path segments inside the test data directory.
plugin (ESEDBPlugin): ESE database plugin.
knowledge_base_values (Optional[dict[str, object]]): knowledge base
values.
Returns:
FakeStorageWriter: storage writer.
"""
session = sessions.Session()
storage_writer = fake_writer.FakeStorageWriter(session)
storage_writer.Open()
file_entry = self._GetTestFileEntry(path_segments)
parser_mediator = self._CreateParserMediator(
storage_writer, file_entry=file_entry,
knowledge_base_values=knowledge_base_values)
file_object = file_entry.GetFileObject()
try:
esedb_file = pyesedb.file()
esedb_file.open_file_object(file_object)
cache = esedb.ESEDBCache()
plugin.Process(parser_mediator, cache=cache, database=esedb_file)
esedb_file.close()
test_engine = task_engine.TaskMultiProcessEngine(
maximum_number_of_tasks=100)
test_file_path = self._GetTestFilePath(['ímynd.dd'])
self._SkipIfPathNotExists(test_file_path)
os_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file_path)
source_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, location='/',
parent=os_path_spec)
test_engine.PreprocessSources(registry, [source_path_spec])
session = sessions.Session()
configuration = configurations.ProcessingConfiguration()
configuration.parser_filter_expression = 'filestat'
configuration.task_storage_format = definitions.STORAGE_FORMAT_SQLITE
with shared_test_lib.TempDirectory() as temp_directory:
temp_file = os.path.join(temp_directory, 'storage.plaso')
storage_writer = sqlite_writer.SQLiteStorageFileWriter(session, temp_file)
test_engine.ProcessSources(
session.identifier, [source_path_spec], storage_writer, configuration)
def testOpenClose(self):
"""Tests the Open and Close functions."""
session = sessions.Session()
storage_writer = fake_writer.FakeStorageWriter(session)
storage_writer.Open()
storage_writer.Close()
storage_writer.Open()
storage_writer.Close()
storage_writer = fake_writer.FakeStorageWriter(
session, storage_type=definitions.STORAGE_TYPE_TASK)
storage_writer.Open()
storage_writer.Close()
storage_writer.Open()
with self.assertRaises(IOError):
storage_writer.Open()
def testGetDisplayNameForPathSpec(self):
"""Tests the GetDisplayNameForPathSpec function."""
session = sessions.Session()
storage_writer = fake_writer.FakeStorageWriter(session)
knowledge_base = self._SetUpKnowledgeBase()
analysis_mediator = mediator.AnalysisMediator(
storage_writer, knowledge_base)
test_path = self._GetTestFilePath(['syslog.gz'])
os_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_path)
expected_display_name = 'OS:{0:s}'.format(test_path)
display_name = analysis_mediator.GetDisplayNameForPathSpec(os_path_spec)
self.assertEqual(display_name, expected_display_name)
def testGetEvents(self):
"""Tests the GetEvents function."""
session = sessions.Session()
storage_writer = fake_writer.FakeStorageWriter(session)
storage_writer.Open()
for event, event_data in containers_test_lib.CreateEventsFromValues(
self._TEST_EVENTS):
storage_writer.AddEventData(event_data)
event.SetEventDataIdentifier(event_data.GetIdentifier())
storage_writer.AddEvent(event)
test_events = list(storage_writer.GetEvents())
self.assertEqual(len(test_events), 4)
storage_writer.Close()
def testProcessSources(self):
"""Tests the ProcessSources function."""
session = sessions.Session()
test_front_end = extraction_frontend.ExtractionFrontend()
test_file = self._GetTestFilePath([u'ímynd.dd'])
volume_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file)
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, location=u'/',
parent=volume_path_spec)
source_type = dfvfs_definitions.SOURCE_TYPE_STORAGE_MEDIA_IMAGE
configuration = configurations.ProcessingConfiguration()
with shared_test_lib.TempDirectory() as temp_directory:
storage_file_path = os.path.join(temp_directory, u'storage.plaso')
Raises:
IOError: if there is a mismatch in session identifiers between the
session start and completion attribute containers.
OSError: if there is a mismatch in session identifiers between the
session start and completion attribute containers.
"""
session_start_generator = self._GetAttributeContainers(
self._CONTAINER_TYPE_SESSION_START)
session_completion_generator = self._GetAttributeContainers(
self._CONTAINER_TYPE_SESSION_COMPLETION)
for session_index in range(0, self._last_session):
session_start = next(session_start_generator) # pylint: disable=stop-iteration-return
session_completion = next(session_completion_generator) # pylint: disable=stop-iteration-return
session = sessions.Session()
session.CopyAttributesFromSessionStart(session_start)
if session_completion:
try:
session.CopyAttributesFromSessionCompletion(session_completion)
except ValueError:
raise IOError(
'Session identifier mismatch for session: {0:d}'.format(
session_index))
yield session
data_stream = _SerializedDataStream(
self._zipfile, self._temporary_path, stream_name)
session_start = self._ReadAttributeContainerFromStreamEntry(
data_stream, 'session_start')
session_completion = None
stream_name = 'session_completion.{0:06d}'.format(stream_number)
if self._HasStream(stream_name):
data_stream = _SerializedDataStream(
self._zipfile, self._temporary_path, stream_name)
session_completion = self._ReadAttributeContainerFromStreamEntry(
data_stream, 'session_completion')
session = sessions.Session()
session.CopyAttributesFromSessionStart(session_start)
if session_completion:
try:
session.CopyAttributesFromSessionCompletion(session_completion)
except ValueError:
raise IOError(
'Session identifier mismatch in stream: {0:s}'.format(
stream_name))
yield session