Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def to_dict(self):
to_return = super(MISPEvent, self).to_dict()
if to_return.get('date'):
if isinstance(self.date, datetime.datetime):
self.date = self.date.date()
to_return['date'] = self.date.isoformat()
if to_return.get('publish_timestamp'):
to_return['publish_timestamp'] = self._datetime_to_timestamp(self.publish_timestamp)
if to_return.get('sighting_timestamp'):
to_return['sighting_timestamp'] = self._datetime_to_timestamp(self.sighting_timestamp)
return to_return
methods in ExpandedPyMISP/PyMISP.
"""
super(AbstractMISP, self).__init__()
self.__edited = True # As we create a new object, we assume it is edited
self.__not_jsonable = []
self.__self_defined_describe_types = None
if kwargs.get('force_timestamps') is not None:
# Ignore the edited objects and keep the timestamps.
self.__force_timestamps = True
else:
self.__force_timestamps = False
# List of classes having tags
from .mispevent import MISPAttribute, MISPEvent
self.__has_tags = (MISPAttribute, MISPEvent)
if isinstance(self, self.__has_tags):
self.Tag = []
setattr(AbstractMISP, 'add_tag', AbstractMISP.__add_tag)
setattr(AbstractMISP, 'tags', property(AbstractMISP.__get_tags, AbstractMISP.__set_tags))
def buildEvent(pkg, **kwargs):
log.info("Building Event...")
if not pkg.stix_header:
title = "STIX Import"
else:
if not pkg.stix_header.title:
title = "STIX Import"
else:
title = pkg.stix_header.title
log.info("Using title %s", title)
log.debug("Seting up MISPEvent...")
event = mispevent.MISPEvent()
event.distribution = kwargs.get("distribution", 0)
event.threat_level_id = kwargs.get("threat_level_id", 3)
event.analysis = kwargs.get("analysis", 0)
event.info = title
if hasattr(pkg, "description"):
log.debug("Found description %s", pkg.description)
event.add_attribute("comment", pkg.description)
log.debug("Beginning to Lint_roll...")
ids = []
to_process = []
for obj in lintRoll(pkg):
if isinstance(obj, cybox.core.observable.Observable):
if obj.id_ not in ids:
ids.append(obj.id_)
def MISPtoSTIX(mispJSON):
"""
Function to convert from a MISP JSON to a STIX stix
:param mispJSON: A dict (json) containing a misp Event.
:returns stix: A STIX stix with as much of the original
data as we could convert.
"""
if isinstance(mispJSON, mispevent.MISPEvent):
misp_event = mispJSON
else:
misp_event = mispevent.MISPEvent()
misp_event.load(mispJSON)
# We should now have a proper MISP JSON loaded.
# Create a base stix
stix = STIXPackage()
try:
stix.MISPID = mispJSON["Event"]["id"]
except Exception:
# We don't have an ID?
# Generate a random number and use that
stix.MISPID = random.randint(1, 9000)
# it's being silly
# backup the ID
backupID = stix.MISPID
def get_event(self, event: Union[MISPEvent, int, str, UUID], deleted: [bool, int, list]=False, pythonify: bool=False):
'''Get an event from a MISP instance'''
event_id = self.__get_uuid_or_id_from_abstract_misp(event)
if deleted:
data = {'deleted': deleted}
event = self._prepare_request('POST', f'events/view/{event_id}', data=data)
else:
event = self._prepare_request('GET', f'events/view/{event_id}')
event = self._check_response(event, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in event:
return event
e = MISPEvent()
e.load(event)
return e
def MISPtoSTIX(mispJSON):
"""
Function to convert from a MISP JSON to a STIX stix
:param mispJSON: A dict (json) containing a misp Event.
:returns stix: A STIX stix with as much of the original
data as we could convert.
"""
if isinstance(mispJSON, mispevent.MISPEvent):
misp_event = mispJSON
else:
misp_event = mispevent.MISPEvent()
misp_event.load(mispJSON)
# We should now have a proper MISP JSON loaded.
# Create a base stix
stix = STIXPackage()
try:
stix.MISPID = mispJSON["Event"]["id"]
except Exception:
# We don't have an ID?
# Generate a random number and use that
stix.MISPID = random.randint(1, 9000)
# it's being silly
if return_format == 'json' and self.global_pythonify or pythonify:
# The response is in json, we can convert it to a list of pythonic MISP objects
to_return = []
if controller == 'events':
for e in normalized_response:
me = MISPEvent()
me.load(e)
to_return.append(me)
elif controller == 'attributes':
# FIXME: obvs, this is hurting my soul. We need something generic.
for a in normalized_response.get('Attribute'):
ma = MISPAttribute()
ma.from_dict(**a)
if 'Event' in ma:
me = MISPEvent()
me.from_dict(**ma.Event)
ma.Event = me
if 'RelatedAttribute' in ma:
related_attributes = []
for ra in ma.RelatedAttribute:
r_attribute = MISPAttribute()
r_attribute.from_dict(**ra)
if 'Event' in r_attribute:
me = MISPEvent()
me.from_dict(**r_attribute.Event)
r_attribute.Event = me
related_attributes.append(r_attribute)
ma.RelatedAttribute = related_attributes
if 'Sighting' in ma:
sightings = []
for sighting in ma.Sighting:
if topic != 'misp_json':
log.info("Ignoring " + topic + "...")
continue
# Process the JSON payload
log.debug("Processing...")
payload = message[len(topic)+1:]
# Load the message JSON
msg = json.loads(payload)
log.debug(msg)
# Load it as a misp object for easy conversion to STIX
ev = pymisp.mispevent.MISPEvent()
ev.load(msg)
# Convert to STIX
pkg = pymisp.tools.stix.make_stix_package(ev)
log.debug("Loaded successfully!")
# Push the package to TAXII
for version in config.get("stix_versions", ["1.1.1"]):
# Convert to that version
objs = lint_roller.lintRoll(pkg)
for i in objs:
# Set the object's version
if hasattr(i, "version"):
i.version = version
# Set the top-level
pkg.version = version
:Example:
>>> misp.search_sightings(publish_timestamp='30d') # search sightings for the last 30 days on the instance
[ ... ]
>>> misp.search_sightings(context='attribute', context_id=6, include_attribute=True) # return list of sighting for attribute 6 along with the attribute itself
[ ... ]
>>> misp.search_sightings(context='event', context_id=17, include_event_meta=True, org=2) # return list of sighting for event 17 filtered with org id 2
'''
query = {'returnFormat': 'json'}
if context is not None:
if context not in ['attribute', 'event']:
raise ValueError('context has to be in {}'.format(', '.join(['attribute', 'event'])))
url_path = f'sightings/restSearch/{context}'
else:
url_path = 'sightings/restSearch'
if isinstance(context_id, (MISPEvent, MISPAttribute)):
context_id = self.__get_uuid_or_id_from_abstract_misp(context_id)
query['id'] = context_id
query['type'] = type_sighting
query['from'] = date_from
query['to'] = date_to
query['last'] = publish_timestamp
query['org_id'] = org
query['source'] = source
query['includeAttribute'] = include_attribute
query['includeEvent'] = include_event_meta
url = urljoin(self.root_url, url_path)
response = self._prepare_request('POST', url, data=query)
normalized_response = self._check_response(response, expect_json=True)
if not (self.global_pythonify or pythonify) or 'errors' in normalized_response:
return normalized_response
def _prepare_full_event(self, distribution, threat_level_id, analysis, info, date=None, published=False, orgc_id=None, org_id=None, sharing_group_id=None):
"""Initialize a new MISPEvent from scratch"""
misp_event = MISPEvent(self.describe_types)
misp_event.from_dict(info=info, distribution=distribution, threat_level_id=threat_level_id,
analysis=analysis, date=date, orgc_id=orgc_id, org_id=org_id, sharing_group_id=sharing_group_id)
if published:
misp_event.publish()
return misp_event