Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def scan(self, data, file, options, expire_at):
self.event['total'] = {'streams': 0, 'extracted': 0}
try:
ole = olefile.OleFileIO(data)
ole_streams = ole.listdir(streams=True)
self.event['total']['streams'] = len(ole_streams)
for stream in ole_streams:
file = ole.openstream(stream)
extract_data = file.read()
extract_name = f'{"_".join(stream)}'
extract_name = re.sub(r'[\x00-\x1F]', '', extract_name)
if extract_name.endswith('Ole10Native'):
native_stream = oletools.oleobj.OleNativeStream(
bindata=extract_data,
)
if native_stream.filename:
extract_name = extract_name + f'_{str(native_stream.filename)}'
else:
extract_name = extract_name + '_native_data'
extract_file = strelka.File(
name=extract_name,
source=self.name,
)
for c in strelka.chunk_string(native_stream.data):
self.upload_to_coordinator(
extract_file.pointer,
c,
object_data = binascii.unhexlify(hexdata)
rtfobj.rawdata = object_data
rtfobj.rawdata_md5 = hashlib.md5(object_data).hexdigest()
# TODO: check if all hex data is extracted properly
obj = oleobj.OleObject()
try:
obj.parse(object_data)
rtfobj.format_id = obj.format_id
rtfobj.class_name = obj.class_name
rtfobj.oledata_size = obj.data_size
rtfobj.oledata = obj.data
rtfobj.oledata_md5 = hashlib.md5(obj.data).hexdigest()
rtfobj.is_ole = True
if obj.class_name.lower() == b'package':
opkg = oleobj.OleNativeStream(bindata=obj.data,
package=True)
rtfobj.filename = opkg.filename
rtfobj.src_path = opkg.src_path
rtfobj.temp_path = opkg.temp_path
rtfobj.olepkgdata = opkg.data
rtfobj.olepkgdata_md5 = hashlib.md5(opkg.data).hexdigest()
rtfobj.is_package = True
else:
if olefile.isOleFile(obj.data):
ole = olefile.OleFileIO(obj.data)
rtfobj.clsid = ole.root.clsid
rtfobj.clsid_desc = clsid.KNOWN_CLSIDS.get(rtfobj.clsid,
'unknown CLSID (please report at https://github.com/decalage2/oletools/issues)')
except:
pass
log.debug('*** Not an OLE 1.0 Object')
async def scan(self, payload: Payload, request: Request) -> WorkerResponse:
extracted: List[ExtractedPayload] = []
errors: List[Error] = []
ole_object = olefile.OleFileIO(payload.content)
streams = ole_object.listdir(streams=True)
for stream in streams:
try:
stream_buffer = ole_object.openstream(stream).read()
name = ''.join(
filter(lambda x: x in string.printable, '_'.join(stream))
)
if stream_buffer.endswith(b'\x01Ole10Native'):
ole_native = oleobj.OleNativeStream(stream_buffer)
if ole_native.filename:
name = f'{name}_{str(ole_native.filename)}'
else:
name = f'{name}_olenative'
meta = PayloadMeta(
should_archive=False,
extra_data={'index': streams.index(stream), 'name': name},
)
extracted.append(ExtractedPayload(ole_native.data, meta))
else:
meta = PayloadMeta(
should_archive=False,
extra_data={'index': streams.index(stream), 'name': name},
)
extracted.append(ExtractedPayload(stream_buffer, meta))
except Exception as err:
def handle_yara(self, filepath, match):
ole = olefile.olefile.OleFileIO(filepath)
for stream in ole.listdir():
if stream[-1] != "\x01Ole10Native":
continue
content = ole.openstream(stream).read()
stream = oletools.oleobj.OleNativeStream(content)
self.push_blob(stream.data, "binaries", None, {
"filename": stream.filename.decode("latin-1"),
"src_path": stream.src_path.decode("latin-1"),
"temp_path": stream.temp_path.decode("latin-1"),
})
# are closed in find_ole
for ole in find_ole(filename, data, xml_parser):
if ole is None: # no ole file found
continue
for path_parts in ole.listdir():
stream_path = '/'.join(path_parts)
log.debug('Checking stream %r', stream_path)
if path_parts[-1] == '\x01Ole10Native':
stream = None
try:
stream = ole.openstream(path_parts)
print('extract file embedded in OLE object from stream %r:'
% stream_path)
print('Parsing OLE Package')
opkg = OleNativeStream(stream)
# leave stream open until dumping is finished
except Exception:
log.warning('*** Not an OLE 1.0 Object')
err_stream = True
if stream is not None:
stream.close()
continue
# print info
if opkg.is_link:
log.debug('Object is not embedded but only linked to '
'- skip')
continue
print(u'Filename = "%s"' % opkg.filename)
print(u'Source path = "%s"' % opkg.src_path)
print(u'Temp path = "%s"' % opkg.temp_path)