Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_skip_space_after_bin_control_word(self):
data = testdata_reader.read_encrypted('rtfobj/issue_185.rtf.zip')
rtfp = rtfobj.RtfObjParser(data)
rtfp.parse()
objects = rtfp.objects
self.assertTrue(len(objects) == 1)
def save_ole_objects(self, data, save_object, filename):
'''
The bulk of this fuction is taken from python-oletools: https://github.com/decalage2/oletools/blob/master/oletools/rtfobj.py
See link for license
'''
rtfp = RtfObjParser(data)
rtfp.parse()
try:
i = int(save_object)
objects = [rtfp.objects[i]]
except Exception as ex:
self.log('error', 'The -s option must be followed by an object index, such as "-s 2"\n{ex}'.format(ex=ex))
return
for rtfobj in objects:
i = objects.index(rtfobj)
tmp = tempfile.NamedTemporaryFile(delete=False)
if rtfobj.is_package:
self.log('info', 'Saving file from OLE Package in object #%d:' % i)
self.log('info', ' Filename = %r' % rtfobj.filename)
self.log('info', ' Source path = %r' % rtfobj.src_path)
self.log('info', ' Temp path = %r' % rtfobj.temp_path)
def scan(self, data, file, options, expire_at):
file_limit = options.get('limit', 1000)
self.event['total'] = {'rtf_objects': 0, 'extracted': 0}
rtf = rtfobj.RtfObjParser(data)
rtf.parse()
self.event['total']['rtf_objects'] = len(rtf.rtf_objects)
for rtf_object in rtf.rtf_objects:
if self.event['total']['extracted'] >= file_limit:
break
index = rtf.server.index(rtf_object)
if rtf_object.is_package:
extract_file = strelka.File(
name=rtf_object.filename,
source=self.name,
)
for c in strelka.chunk_string(rtf_object.olepkgdata):
self.upload_to_coordinator(
else:
base_dir = os.path.dirname(filename)
sane_fname = sanitize_filename(filename)
fname_prefix = os.path.join(base_dir, sane_fname)
# TODO: option to extract objects to files (false by default)
if data is None:
data = open(filename, 'rb').read()
print('='*79)
print('File: %r - size: %d bytes' % (filename, len(data)))
tstream = tablestream.TableStream(
column_width=(3, 10, 63),
header_row=('id', 'index', 'OLE Object'),
style=tablestream.TableStyleSlim
)
rtfp = RtfObjParser(data)
rtfp.parse()
for rtfobj in rtfp.objects:
ole_color = None
if rtfobj.is_ole:
ole_column = 'format_id: %d ' % rtfobj.format_id
if rtfobj.format_id == oleobj.OleObject.TYPE_EMBEDDED:
ole_column += '(Embedded)\n'
elif rtfobj.format_id == oleobj.OleObject.TYPE_LINKED:
ole_column += '(Linked)\n'
else:
ole_column += '(Unknown)\n'
ole_column += 'class name: %r\n' % rtfobj.class_name
# if the object is linked and not embedded, data_size=None:
if rtfobj.oledata_size is None:
ole_column += 'data size: N/A'
else:
def __init__(self, data):
super(RtfObjParser, self).__init__(data)
# list of RtfObjects found
self.objects = []
def get_rtf_objects():
with open('/sample', 'rb') as f:
data = f.read()
rtfp = RtfObjParser(data)
rtfp.parse()
out_data = []
tags = []
cve_regex = re.compile(' CVE-(\d{4}-\d+)')
for rtfobj in rtfp.objects:
if rtfobj.is_ole:
tags.append('ole')
ole_column = {'format_id': rtfobj.format_id}
if rtfobj.format_id == oleobj.OleObject.TYPE_EMBEDDED:
ole_column['format_type'] = 'embedded'
elif rtfobj.format_id == oleobj.OleObject.TYPE_LINKED:
ole_column['format_type'] = 'linked'
else:
def parse_rtf(self, filename, data):
'''
The bulk of this fuction is taken from python-oletools: https://github.com/decalage2/oletools/blob/master/oletools/rtfobj.py
See link for license
'''
self.log('success', 'File: {name} - size: {size} bytes'.format(name=filename, size=hex(len(data))))
table = []
h = ['id', 'index', 'OLE Object']
rtfp = RtfObjParser(data)
rtfp.parse()
for rtfobj in rtfp.objects:
row = []
obj_col = []
if rtfobj.is_ole:
obj_col.append('format_id: {id} '.format(id=rtfobj.format_id))
if rtfobj.format_id == oleobj.OleObject.TYPE_EMBEDDED:
obj_col.append('(Embedded)')
elif rtfobj.format_id == oleobj.OleObject.TYPE_LINKED:
obj_col.append('(Linked)')
else:
obj_col.append('(Unknown)')
obj_col.append('class name: {cls}'.format(cls=rtfobj.class_name))
# if the object is linked and not embedded, data_size=None:
if rtfobj.oledata_size is None:
obj_col.append('data size: N/A')
async def scan(self, payload: Payload, request: Request) -> WorkerResponse:
extracted: List[ExtractedPayload] = []
rtf = rtfobj.RtfObjParser(payload.content)
rtf.parse()
for obj_idx, obj in enumerate(rtf.objects):
if obj.is_ole:
data = obj.oledata
meta = PayloadMeta(extra_data={'index': obj_idx})
elif obj.is_package:
data = obj.olepkgdata
meta = PayloadMeta(
extra_data={'index': obj_idx, 'filename': obj.filename}
)
else:
data = obj.rawdata
meta = PayloadMeta(extra_data={'index': obj_idx})
extracted.append(ExtractedPayload(data, meta))
return WorkerResponse(extracted=extracted)