Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def do_test_md5(self, args, test_fun=oleobj.main):
""" helper for test_md5 and test_md5_args """
# name of sample, extension of embedded file, md5 hash of embedded file
data_dir = join(DATA_BASE_DIR, 'oleobj')
for sample_name, embedded_name, expect_hash in SAMPLES:
ret_val = test_fun(args + [join(data_dir, sample_name), ])
self.assertEqual(ret_val, oleobj.RETURN_DID_DUMP)
expect_name = join(self.temp_dir,
sample_name + '_' + embedded_name)
if not isfile(expect_name):
self.did_fail = True
self.fail('{0} not created from {1}'.format(expect_name,
sample_name))
continue
md5_hash = calc_md5(expect_name)
if md5_hash != expect_hash:
self.did_fail = True
self.fail('Wrong md5 {0} of {1} from {2}'
.format(md5_hash, expect_name, sample_name))
continue
def preread_file(args):
"""helper for TestOleObj.test_non_streamed: preread + call process_file"""
ignore_arg, output_dir, filename = args
if ignore_arg != '-d':
raise ValueError('ignore_arg not as expected!')
with open(filename, 'rb') as file_handle:
data = file_handle.read()
err_stream, err_dumping, did_dump = \
oleobj.process_file(filename, data, output_dir=output_dir)
if did_dump and not err_stream and not err_dumping:
return oleobj.RETURN_DID_DUMP
else:
return oleobj.RETURN_NO_DUMP # just anything else
(options, args) = parser.parse_args()
# Print help if no arguments are passed
if len(args) == 0:
print (__doc__)
parser.print_help()
sys.exit()
# Setup logging to the console:
# here we use stdout instead of stderr by default, so that the output
# can be redirected properly.
logging.basicConfig(level=LOG_LEVELS[options.loglevel], stream=sys.stdout,
format='%(levelname)-8s %(message)s')
# enable logging in the modules:
log.setLevel(logging.NOTSET)
oleobj.enable_logging()
for container, filename, data in xglob.iter_files(args, recursive=options.recursive,
zip_password=options.zip_password, zip_fname=options.zip_fname):
# ignore directory names stored in zip files:
if container and filename.endswith('/'):
continue
process_file(container, filename, data, output_dir=options.output_dir,
save_object=options.save_object)
if data is None:
data = open(filename, 'rb').read()
print('='*79)
print('File: %r - size: %d bytes' % (filename, len(data)))
tstream = tablestream.TableStream(
column_width=(3, 10, 63),
header_row=('id', 'index', 'OLE Object'),
style=tablestream.TableStyleSlim
)
rtfp = RtfObjParser(data)
rtfp.parse()
for rtfobj in rtfp.objects:
ole_color = None
if rtfobj.is_ole:
ole_column = 'format_id: %d ' % rtfobj.format_id
if rtfobj.format_id == oleobj.OleObject.TYPE_EMBEDDED:
ole_column += '(Embedded)\n'
elif rtfobj.format_id == oleobj.OleObject.TYPE_LINKED:
ole_column += '(Linked)\n'
else:
ole_column += '(Unknown)\n'
ole_column += 'class name: %r\n' % rtfobj.class_name
# if the object is linked and not embedded, data_size=None:
if rtfobj.oledata_size is None:
ole_column += 'data size: N/A'
else:
ole_column += 'data size: %d' % rtfobj.oledata_size
if rtfobj.is_package:
ole_column += '\nOLE Package object:'
ole_column += '\nFilename: %r' % rtfobj.filename
ole_column += '\nSource path: %r' % rtfobj.src_path
ole_column += '\nTemp path = %r' % rtfobj.temp_path
rtfp = RtfObjParser(data)
rtfp.parse()
out_data = []
tags = []
cve_regex = re.compile(' CVE-(\d{4}-\d+)')
for rtfobj in rtfp.objects:
if rtfobj.is_ole:
tags.append('ole')
ole_column = {'format_id': rtfobj.format_id}
if rtfobj.format_id == oleobj.OleObject.TYPE_EMBEDDED:
ole_column['format_type'] = 'embedded'
elif rtfobj.format_id == oleobj.OleObject.TYPE_LINKED:
ole_column['format_type'] = 'linked'
else:
ole_column['format_type'] = 'unknown'
ole_column['class_name'] = rtfobj.class_name
# if the object is linked and not embedded, data_size=None:
if rtfobj.oledata_size is None:
ole_column['data_size'] = -1
else:
ole_column['data_size'] = rtfobj.oledata_size
if rtfobj.is_package:
ole_column['package'] = {}
ole_column['package']['filename'] = rtfobj.filename
ole_column['package']['source_path'] = rtfobj.src_path
ole_column['package']['temp_path'] = rtfobj.temp_path
# check if the file extension is executable:
_, ext = os.path.splitext(rtfobj.filename)
hexdata = re.sub(b'[^a-fA-F0-9]', b'', hexdata1)
if len(hexdata) < len(hexdata1):
# this is only for debugging:
nonhex = re.sub(b'[a-fA-F0-9]', b'', hexdata1)
log.debug('Found non-hex chars in hexdata: %r' % nonhex)
# MS Word accepts an extra hex digit, so we need to trim it if present:
if len(hexdata) & 1:
log.debug('Odd length, trimmed last byte.')
hexdata = hexdata[:-1]
rtfobj.hexdata = hexdata
object_data = binascii.unhexlify(hexdata)
rtfobj.rawdata = object_data
rtfobj.rawdata_md5 = hashlib.md5(object_data).hexdigest()
# TODO: check if all hex data is extracted properly
obj = oleobj.OleObject()
try:
obj.parse(object_data)
rtfobj.format_id = obj.format_id
rtfobj.class_name = obj.class_name
rtfobj.oledata_size = obj.data_size
rtfobj.oledata = obj.data
rtfobj.oledata_md5 = hashlib.md5(obj.data).hexdigest()
rtfobj.is_ole = True
if obj.class_name.lower() == b'package':
opkg = oleobj.OleNativeStream(bindata=obj.data,
package=True)
rtfobj.filename = opkg.filename
rtfobj.src_path = opkg.src_path
rtfobj.temp_path = opkg.temp_path
rtfobj.olepkgdata = opkg.data
rtfobj.olepkgdata_md5 = hashlib.md5(opkg.data).hexdigest()