Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from tests.output import test_lib
class NativePythonOutputTest(test_lib.OutputModuleTestCase):
"""Tests for the "raw" (or native) Python output module."""
_OS_PATH_SPEC = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location='{0:s}{1:s}'.format(
os.path.sep, os.path.join('cases', 'image.dd')))
_TEST_EVENTS = [
{'data_type': 'test:output',
'display_name': 'OS: /var/log/syslog.1',
'hostname': 'ubuntu',
'inode': 12345678,
'pathspec': path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, inode=15,
location='/var/log/syslog.1', parent=_OS_PATH_SPEC),
'text': (
'Reporter PID: |8442| (pam_unix(cron:session): session\n '
'closed for user root)'),
'timestamp': timelib.Timestamp.CopyFromString('2012-06-27 18:17:01'),
'timestamp_desc': definitions.TIME_DESCRIPTION_UNKNOWN,
'username': 'root'}]
def testWriteEventBody(self):
"""Tests the WriteEventBody function."""
output_mediator = self._CreateOutputMediator()
output_writer = cli_test_lib.TestOutputWriter()
output_module = rawpy.NativePythonOutputModule(output_mediator)
output_module.SetOutputWriter(output_writer)
event_data = self._GetEventDataOfEvent(storage_writer, event)
self.assertEqual(event_data.file_size, 1247)
self.assertIsNone(event_data.inode)
expected_message = (
'TAR:/syslog '
'Type: file')
expected_short_message = '/syslog'
self._TestGetMessageStrings(
event_data, expected_message, expected_short_message)
test_file_path = self._GetTestFilePath(['syslog.tgz'])
self._SkipIfPathNotExists(test_file_path)
os_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file_path)
gzip_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_GZIP, parent=os_path_spec)
storage_writer = self._ParseFileByPathSpec(gzip_path_spec, parser)
self.assertEqual(storage_writer.number_of_warnings, 0)
self.assertEqual(storage_writer.number_of_events, 1)
events = list(storage_writer.GetEvents())
event = events[0]
self.CheckTimestamp(event.timestamp, '2012-07-28 16:44:43.000000')
self.assertEqual(
event.timestamp_desc, definitions.TIME_DESCRIPTION_MODIFICATION)
def testGetDisplayName(self):
"""Tests the GetDisplayName function."""
session = sessions.Session()
storage_writer = fake_writer.FakeStorageWriter(session)
parsers_mediator = self._CreateParserMediator(storage_writer)
with self.assertRaises(ValueError):
parsers_mediator.GetDisplayName(file_entry=None)
test_file_path = self._GetTestFilePath(['syslog.gz'])
self._SkipIfPathNotExists(test_file_path)
os_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file_path)
file_entry = path_spec_resolver.Resolver.OpenFileEntry(os_path_spec)
display_name = parsers_mediator.GetDisplayName(file_entry=file_entry)
expected_display_name = 'OS:{0:s}'.format(test_file_path)
self.assertEqual(display_name, expected_display_name)
gzip_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_GZIP, parent=os_path_spec)
file_entry = path_spec_resolver.Resolver.OpenFileEntry(gzip_path_spec)
display_name = parsers_mediator.GetDisplayName(file_entry=file_entry)
expected_display_name = 'GZIP:{0:s}'.format(test_file_path)
self.assertEqual(display_name, expected_display_name)
from tests.output import test_lib
class TLNOutputModuleTest(test_lib.OutputModuleTestCase):
"""Tests for the TLN output module."""
_OS_PATH_SPEC = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location='{0:s}{1:s}'.format(
os.path.sep, os.path.join('cases', 'image.dd')))
_TEST_EVENTS = [
{'data_type': 'test:output',
'display_name': 'OS: /var/log/syslog.1',
'hostname': 'ubuntu',
'inode': 12345678,
'pathspec': path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, inode=15,
location='/var/log/syslog.1', parent=_OS_PATH_SPEC),
'text': (
'Reporter PID: |8442| (pam_unix(cron:session): session\n '
'closed for user root)'),
'timestamp': timelib.Timestamp.CopyFromString('2012-06-27 18:17:01'),
'timestamp_desc': definitions.TIME_DESCRIPTION_UNKNOWN,
'username': 'root'}]
def setUp(self):
"""Makes preparations before running an individual test."""
self._output_writer = cli_test_lib.TestOutputWriter()
output_mediator = self._CreateOutputMediator()
self._output_module = tln.TLNOutputModule(output_mediator)
self._output_module.SetOutputWriter(self._output_writer)
ValueError: when identifier is not set.
"""
if not identifier:
raise ValueError('Missing identifier value.')
super(MountPathSpec, self).__init__(parent=None, **kwargs)
self.identifier = identifier
@property
def comparable(self):
"""str: comparable representation of the path specification."""
sub_comparable_string = 'identifier: {0:s}'.format(self.identifier)
return self._GetComparable(sub_comparable_string=sub_comparable_string)
factory.Factory.RegisterPathSpec(MountPathSpec)
Raises:
ValueError: when parent is set.
"""
parent = None
if 'parent' in kwargs:
parent = kwargs['parent']
del kwargs['parent']
if parent:
raise ValueError('Parent value set.')
super(FakePathSpec, self).__init__(
location=location, parent=parent, **kwargs)
factory.Factory.RegisterPathSpec(FakePathSpec)
if not type_indicators:
return
logging.debug(u'Found file system type indicators: {0!s}'.format(
type_indicators))
if len(type_indicators) > 1:
raise errors.FileSystemScannerError(
u'Unsupported source found more than one file system types.')
if type_indicators[0] != dfvfs_definitions.TYPE_INDICATOR_TSK:
raise errors.FileSystemScannerError(
u'Unsupported file system type: {0:s}.'.format(type_indicators[0]))
return path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, location=u'/',
parent=source_path_spec)
def __init__(self, file_system, mount_point):
"""Initializes a file system searcher.
Args:
file_system (FileSystem): file system.
mount_point (PathSpec): mount point path specification that refers
to the base location of the file system.
Raises:
PathSpecError: if the mount point path specification is incorrect.
ValueError: when file system or mount point is not set.
"""
if not file_system or not mount_point:
raise ValueError('Missing file system or mount point value.')
if path_spec_factory.Factory.IsSystemLevelTypeIndicator(
file_system.type_indicator):
if not hasattr(mount_point, 'location'):
raise errors.PathSpecError(
'Mount point path specification missing location.')
super(FileSystemSearcher, self).__init__()
self._file_system = file_system
self._mount_point = mount_point
Args:
location (Optional[str]): ZIP file internal location string prefixed
with a path separator character.
parent (Optional[PathSpec]): parent path specification.
Raises:
ValueError: when parent is not set.
"""
if not parent:
raise ValueError('Missing parent value.')
super(ZipPathSpec, self).__init__(
location=location, parent=parent, **kwargs)
factory.Factory.RegisterPathSpec(ZipPathSpec)
self.volume_index = volume_index
@property
def comparable(self):
"""str: comparable representation of the path specification."""
string_parts = []
if self.location is not None:
string_parts.append('location: {0:s}'.format(self.location))
if self.volume_index is not None:
string_parts.append('volume index: {0:d}'.format(self.volume_index))
return self._GetComparable(sub_comparable_string=', '.join(string_parts))
factory.Factory.RegisterPathSpec(LVMPathSpec)