Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_userdefined_object(self):
self.init_event()
self.mispevent.add_object(name='test_object_template', strict=True, misp_objects_path_custom='tests/mispevent_testfiles')
with self.assertRaises(InvalidMISPObject) as e:
# Fail on required
self.mispevent.to_json(sort_keys=True, indent=2)
if sys.version_info >= (3, ):
self.assertEqual(e.exception.message, '{\'member3\'} are required.')
else:
# Python2 bullshit
self.assertEqual(e.exception.message, 'set([u\'member3\']) are required.')
a = self.mispevent.objects[0].add_attribute('member3', value='foo')
del a.uuid
with self.assertRaises(InvalidMISPObject) as e:
# Fail on requiredOneOf
self.mispevent.to_json(sort_keys=True, indent=2)
self.assertEqual(e.exception.message, 'At least one of the following attributes is required: member1, member2')
a = self.mispevent.objects[0].add_attribute('member1', value='bar')
else:
# Python2 bullshit
self.assertEqual(e.exception.message, 'set([u\'member3\']) are required.')
a = self.mispevent.objects[0].add_attribute('member3', value='foo')
del a.uuid
with self.assertRaises(InvalidMISPObject) as e:
# Fail on requiredOneOf
self.mispevent.to_json(sort_keys=True, indent=2)
self.assertEqual(e.exception.message, 'At least one of the following attributes is required: member1, member2')
a = self.mispevent.objects[0].add_attribute('member1', value='bar')
del a.uuid
a = self.mispevent.objects[0].add_attribute('member1', value='baz')
del a.uuid
with self.assertRaises(InvalidMISPObject) as e:
# member1 is not a multiple
self.mispevent.to_json(sort_keys=True, indent=2)
self.assertEqual(e.exception.message, 'Multiple occurrences of member1 is not allowed')
self.mispevent.objects[0].attributes = self.mispevent.objects[0].attributes[:2]
self.mispevent.objects[0].uuid = 'a'
with open('tests/mispevent_testfiles/misp_custom_obj.json', 'r') as f:
ref_json = json.load(f)
self.assertEqual(self.mispevent.to_json(sort_keys=True, indent=2), json.dumps(ref_json, sort_keys=True, indent=2))
def add_object(self, obj=None, **kwargs):
"""Add an object to the Event, either by passing a MISPObject, or a dictionary"""
if isinstance(obj, MISPObject):
misp_obj = obj
elif isinstance(obj, dict):
misp_obj = MISPObject(name=obj.pop('name'), strict=obj.pop('strict', False),
default_attributes_parameters=obj.pop('default_attributes_parameters', {}),
**obj)
misp_obj.from_dict(**obj)
elif kwargs:
misp_obj = MISPObject(name=kwargs.pop('name'), strict=kwargs.pop('strict', False),
default_attributes_parameters=kwargs.pop('default_attributes_parameters', {}),
**kwargs)
misp_obj.from_dict(**kwargs)
else:
raise InvalidMISPObject("An object to add to an existing Event needs to be either a MISPObject, or a plain python dictionary")
self.Object.append(misp_obj)
self.edited = True
return misp_obj
def _validate(self):
"""Make sure the object we're creating has the required fields"""
if self._definition.get('required'):
required_missing = set(self._definition.get('required')) - set(self._fast_attribute_access.keys())
if required_missing:
raise InvalidMISPObject('{} are required.'.format(required_missing))
if self._definition.get('requiredOneOf'):
if not set(self._definition['requiredOneOf']) & set(self._fast_attribute_access.keys()):
# We ecpect at least one of the object_relation in requiredOneOf, and it isn't the case
raise InvalidMISPObject('At least one of the following attributes is required: {}'.format(', '.join(self._definition['requiredOneOf'])))
for rel, attrs in self._fast_attribute_access.items():
if len(attrs) == 1:
# object_relation's here only once, everything's cool, moving on
continue
if not self._definition['attributes'][rel].get('multiple'):
# object_relation's here more than once, but it isn't allowed in the template.
raise InvalidMISPObject('Multiple occurrences of {} is not allowed'.format(rel))
return True
def __init__(self, filepath=None, pseudofile=None, filename=None, standalone=True, **kwargs):
# PY3 way:
# super().__init__('file')
super(FileObject, self).__init__('file', standalone=standalone, **kwargs)
if not HAS_PYDEEP:
logger.warning("Please install pydeep: pip install git+https://github.com/kbandla/pydeep.git")
if not HAS_MAGIC:
logger.warning("Please install python-magic: pip install python-magic.")
if filename:
# Useful in case the file is copied with a pre-defined name by a script but we want to keep the original name
self.__filename = filename
elif filepath:
self.__filename = os.path.basename(filepath)
else:
raise InvalidMISPObject('A file name is required (either in the path, or as a parameter).')
if filepath:
with open(filepath, 'rb') as f:
self.__pseudofile = BytesIO(f.read())
elif pseudofile and isinstance(pseudofile, BytesIO):
# WARNING: lief.parse requires a path
self.__pseudofile = pseudofile
else:
raise InvalidMISPObject('File buffer (BytesIO) or a path is required.')
self.__data = self.__pseudofile.getvalue()
self.generate_attributes()
def get_object_by_id(self, object_id):
"""Get an object by ID (the ID is the one set by the server when creating the new object)"""
for obj in self.objects:
if hasattr(obj, 'id') and int(obj.id) == int(object_id):
return obj
raise InvalidMISPObject('Object with {} does not exist in this event'.format(object_id))
def __init__(self, filepath=None, pseudofile=None, attach_original_email=True, standalone=True, **kwargs):
# PY3 way:
# super().__init__('file')
super(EMailObject, self).__init__('email', standalone=standalone, **kwargs)
if filepath:
with open(filepath, 'rb') as f:
self.__pseudofile = BytesIO(f.read())
elif pseudofile and isinstance(pseudofile, BytesIO):
self.__pseudofile = pseudofile
else:
raise InvalidMISPObject('File buffer (BytesIO) or a path is required.')
self.__email = message_from_bytes(self.__pseudofile.getvalue(), policy=policy.default)
if attach_original_email:
self.add_attribute('eml', value='Full email.eml', data=self.__pseudofile)
self.generate_attributes()
if filename:
# Useful in case the file is copied with a pre-defined name by a script but we want to keep the original name
self.__filename = filename
elif filepath:
self.__filename = os.path.basename(filepath)
else:
raise InvalidMISPObject('A file name is required (either in the path, or as a parameter).')
if filepath:
with open(filepath, 'rb') as f:
self.__pseudofile = BytesIO(f.read())
elif pseudofile and isinstance(pseudofile, BytesIO):
# WARNING: lief.parse requires a path
self.__pseudofile = pseudofile
else:
raise InvalidMISPObject('File buffer (BytesIO) or a path is required.')
self.__data = self.__pseudofile.getvalue()
self.generate_attributes()