Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if 'errors' in normalized_response:
return normalized_response
if return_format == 'json' and self.global_pythonify or pythonify:
# The response is in json, we can convert it to a list of pythonic MISP objects
to_return = []
if controller == 'events':
for e in normalized_response:
me = MISPEvent()
me.load(e)
to_return.append(me)
elif controller == 'attributes':
# FIXME: obvs, this is hurting my soul. We need something generic.
for a in normalized_response.get('Attribute'):
ma = MISPAttribute()
ma.from_dict(**a)
if 'Event' in ma:
me = MISPEvent()
me.from_dict(**ma.Event)
ma.Event = me
if 'RelatedAttribute' in ma:
related_attributes = []
for ra in ma.RelatedAttribute:
r_attribute = MISPAttribute()
r_attribute.from_dict(**ra)
if 'Event' in r_attribute:
me = MISPEvent()
me.from_dict(**r_attribute.Event)
r_attribute.Event = me
related_attributes.append(r_attribute)
ma.RelatedAttribute = related_attributes
In that case, the pythonified response is the following: {'attributes': [MISPAttribute], 'errors': {errors by attributes}}'''
event_id = self.__get_uuid_or_id_from_abstract_misp(event)
new_attribute = self._prepare_request('POST', f'attributes/add/{event_id}', data=attribute)
new_attribute = self._check_response(new_attribute, expect_json=True)
if isinstance(attribute, list):
# Multiple attributes were passed at once, the handling is totally different
if self._old_misp((2, 4, 113), '2020-01-01', sys._getframe().f_code.co_name):
return new_attribute
if not (self.global_pythonify or pythonify):
return new_attribute
to_return = {'attributes': []}
if 'errors' in new_attribute:
to_return['errors'] = new_attribute['errors']
for new_attr in new_attribute['Attribute']:
a = MISPAttribute()
a.from_dict(**new_attr)
to_return['attributes'].append(a)
return to_return
if ('errors' in new_attribute and new_attribute['errors'][0] == 403
and new_attribute['errors'][1]['message'] == 'You do not have permission to do that.'):
# At this point, we assume the user tried to add an attribute on an event they don't own
# Re-try with a proposal
return self.add_attribute_proposal(event_id, attribute, pythonify)
if not (self.global_pythonify or pythonify) or 'errors' in new_attribute:
return new_attribute
a = MISPAttribute()
a.from_dict(**new_attribute)
return a
def attributes(self, pythonify: bool=False):
attributes = self._prepare_request('GET', f'attributes/index')
attributes = self._check_response(attributes, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in attributes:
return attributes
to_return = []
for attribute in attributes:
a = MISPAttribute()
a.from_dict(**attribute)
to_return.append(a)
return to_return
def add_attribute(self, type, value, **kwargs):
"""Add an attribute. type and value are required but you can pass all
other parameters supported by MISPAttribute"""
attr_list = []
if isinstance(value, list):
attr_list = [self.add_attribute(type=type, value=a, **kwargs) for a in value]
else:
attribute = MISPAttribute(describe_types=self.describe_types)
attribute.from_dict(type=type, value=value, **kwargs)
self.attributes.append(attribute)
self.edited = True
if attr_list:
return attr_list
return attribute
def _prepare_full_attribute(self, category, type_value, value, to_ids, comment=None, distribution=None, **kwargs):
"""Initialize a new MISPAttribute from scratch"""
misp_attribute = MISPAttribute(self.describe_types)
misp_attribute.from_dict(type=type_value, value=value, category=category,
to_ids=to_ids, comment=comment, distribution=distribution, **kwargs)
return misp_attribute
me = MISPEvent()
me.load(e)
to_return.append(me)
elif controller == 'attributes':
# FIXME: obvs, this is hurting my soul. We need something generic.
for a in normalized_response.get('Attribute'):
ma = MISPAttribute()
ma.from_dict(**a)
if 'Event' in ma:
me = MISPEvent()
me.from_dict(**ma.Event)
ma.Event = me
if 'RelatedAttribute' in ma:
related_attributes = []
for ra in ma.RelatedAttribute:
r_attribute = MISPAttribute()
r_attribute.from_dict(**ra)
if 'Event' in r_attribute:
me = MISPEvent()
me.from_dict(**r_attribute.Event)
r_attribute.Event = me
related_attributes.append(r_attribute)
ma.RelatedAttribute = related_attributes
if 'Sighting' in ma:
sightings = []
for sighting in ma.Sighting:
s = MISPSighting()
s.from_dict(**sighting)
sightings.append(s)
ma.Sighting = sightings
to_return.append(ma)
elif controller == 'objects':
'''Update an attribute on a MISP instance'''
if attribute_id is None:
attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute)
else:
attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute_id)
updated_attribute = self._prepare_request('POST', f'attributes/edit/{attribute_id}', data=attribute)
updated_attribute = self._check_response(updated_attribute, expect_json=True)
if 'errors' in updated_attribute:
if (updated_attribute['errors'][0] == 403
and updated_attribute['errors'][1]['message'] == 'You do not have permission to do that.'):
# At this point, we assume the user tried to update an attribute on an event they don't own
# Re-try with a proposal
return self.update_attribute_proposal(attribute_id, attribute, pythonify)
if not (self.global_pythonify or pythonify) or 'errors' in updated_attribute:
return updated_attribute
a = MISPAttribute()
a.from_dict(**updated_attribute)
return a
def update_attribute(self, attribute_id, attribute):
"""Update an attribute
:param attribute_id: Attribute id/uuid to update
:param attribute: Attribute as JSON object / string to add
"""
url = urljoin(self.root_url, 'attributes/{}'.format(attribute_id))
if isinstance(attribute, MISPAttribute):
attribute = attribute.to_json()
elif not isinstance(attribute, basestring):
attribute = json.dumps(attribute)
response = self._prepare_request('POST', url, attribute)
return self._check_response(response)
def get_attribute(self, attribute: Union[MISPAttribute, int, str, UUID], pythonify: bool=False):
'''Get an attribute from a MISP instance'''
attribute_id = self.__get_uuid_or_id_from_abstract_misp(attribute)
attribute = self._prepare_request('GET', f'attributes/view/{attribute_id}')
attribute = self._check_response(attribute, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in attribute:
return attribute
a = MISPAttribute()
a.from_dict(**attribute)
return a