Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_complex_event(self):
event = MISPEvent()
event.info = 'Complex Event'
event.distribution = Distribution.all_communities
event.add_tag('tlp:white')
event.add_attribute('ip-src', '8.8.8.8')
event.add_attribute('ip-dst', '8.8.8.9')
event.add_attribute('domain', 'google.com')
event.add_attribute('md5', '3c656da41f4645f77e3ec3281b63dd43')
event.attributes[0].distribution = Distribution.your_organisation_only
event.attributes[1].distribution = Distribution.this_community_only
event.attributes[2].distribution = Distribution.connected_communities
event.attributes[0].add_tag('tlp:red')
event.attributes[1].add_tag('tlp:amber')
event.attributes[2].add_tag('tlp:green')
obj = MISPObject('file')
obj.distribution = Distribution.connected_communities
obj.add_attribute('filename', 'testfile')
obj.add_attribute('md5', '3c656da41f4645f77e3ec3281b63dd44')
obj.attributes[0].distribution = Distribution.your_organisation_only
event.add_object(obj)
return event
def test_sync_community(self):
'''Simple event, this community only, pull from member of the community'''
event = MISPEvent()
event.info = 'Event created on first instance - test_sync_community'
event.distribution = Distribution.this_community_only
event.add_attribute('ip-src', '1.1.1.1')
try:
source = self.instances[0]
dest = self.instances[1]
event = source.org_admin_connector.add_event(event)
source.org_admin_connector.publish(event)
dest.site_admin_connector.server_pull(dest.sync_servers[0])
time.sleep(10)
dest_event = dest.org_admin_connector.get_event(event.uuid)
self.assertEqual(dest_event.distribution, 0)
finally:
source.org_admin_connector.delete_event(event)
dest.site_admin_connector.delete_event(dest_event)
def environment(self):
first_event = MISPEvent()
first_event.info = 'First event - org only - low - completed'
first_event.distribution = Distribution.your_organisation_only
first_event.threat_level_id = ThreatLevel.low
first_event.analysis = Analysis.completed
first_event.set_date("2017-12-31")
first_event.add_attribute('text', 'FIRST_EVENT' + str(uuid4()))
first_event.attributes[0].add_tag('admin_only')
first_event.attributes[0].add_tag('tlp:white___test')
first_event.add_attribute('text', str(uuid4()))
first_event.attributes[1].add_tag('unique___test')
second_event = MISPEvent()
second_event.info = 'Second event - org only - medium - ongoing'
second_event.distribution = Distribution.your_organisation_only
second_event.threat_level_id = ThreatLevel.medium
second_event.analysis = Analysis.ongoing
second_event.set_date("Aug 18 2018")
second_event.add_attribute('text', 'SECOND_EVENT' + str(uuid4()))
def create_simple_event(self, force_timestamps=False):
mispevent = MISPEvent(force_timestamps=force_timestamps)
mispevent.info = 'This is a super simple test'
mispevent.distribution = Distribution.your_organisation_only
mispevent.threat_level_id = ThreatLevel.low
mispevent.analysis = Analysis.completed
mispevent.add_attribute('text', str(uuid4()))
return mispevent
def test_attribute(self):
first = self.create_simple_event()
second = self.create_simple_event()
a = second.add_attribute('ip-src', '11.11.11.11')
a.add_tag('testtag_admin_created')
second.distribution = Distribution.all_communities
try:
first = self.user_misp_connector.add_event(first)
second = self.admin_misp_connector.add_event(second, pythonify=True)
# Get attribute
attribute = self.user_misp_connector.get_attribute(first.attributes[0])
self.assertEqual(first.attributes[0].uuid, attribute.uuid)
# Add attribute
new_attribute = MISPAttribute()
new_attribute.value = '1.2.3.4'
new_attribute.type = 'ip-dst'
new_attribute = self.user_misp_connector.add_attribute(first, new_attribute)
self.assertTrue(isinstance(new_attribute, MISPAttribute), new_attribute)
self.assertEqual(new_attribute.value, '1.2.3.4', new_attribute)
# Test attribute already in event
# new_attribute.uuid = str(uuid4())
# new_attribute = self.user_misp_connector.add_attribute(first, new_attribute)
def create_complex_event(self):
event = MISPEvent()
event.info = 'Complex Event'
event.distribution = Distribution.all_communities
event.add_tag('tlp:white')
event.add_attribute('ip-src', '8.8.8.8')
event.add_attribute('ip-dst', '8.8.8.9')
event.add_attribute('domain', 'google.com')
event.add_attribute('md5', '3c656da41f4645f77e3ec3281b63dd43')
event.attributes[0].distribution = Distribution.your_organisation_only
event.attributes[1].distribution = Distribution.this_community_only
event.attributes[2].distribution = Distribution.connected_communities
event.attributes[0].add_tag('tlp:red')
event.attributes[1].add_tag('tlp:amber')
event.attributes[2].add_tag('tlp:green')
obj = MISPObject('file')
def create_complex_event(self):
event = MISPEvent()
event.info = 'Complex Event'
event.distribution = Distribution.all_communities
event.add_tag('tlp:white')
event.add_attribute('ip-src', '8.8.8.8')
event.add_attribute('ip-dst', '8.8.8.9')
event.add_attribute('domain', 'google.com')
event.add_attribute('md5', '3c656da41f4645f77e3ec3281b63dd43')
event.attributes[0].distribution = Distribution.your_organisation_only
event.attributes[1].distribution = Distribution.this_community_only
event.attributes[2].distribution = Distribution.connected_communities
event.attributes[0].add_tag('tlp:red')
event.attributes[1].add_tag('tlp:amber')
event.attributes[2].add_tag('tlp:green')
obj = MISPObject('file')
obj.distribution = Distribution.connected_communities
obj.add_attribute('filename', 'testfile')
obj.add_attribute('md5', '3c656da41f4645f77e3ec3281b63dd44')
obj.attributes[0].distribution = Distribution.your_organisation_only
event.add_object(obj)
return event
def test_simple_sync(self):
'''Test simple event, push to one server'''
event = MISPEvent()
event.info = 'Event created on first instance - test_simple_sync'
event.distribution = Distribution.all_communities
event.add_attribute('ip-src', '1.1.1.1')
try:
source = self.instances[0]
dest = self.instances[1]
event = source.org_admin_connector.add_event(event)
source.org_admin_connector.publish(event)
source.site_admin_connector.server_push(source.sync_servers[0], event)
time.sleep(10)
dest_event = dest.org_admin_connector.get_event(event.uuid)
self.assertEqual(event.attributes[0].value, dest_event.attributes[0].value)
finally:
source.org_admin_connector.delete_event(event)
dest.site_admin_connector.delete_event(dest_event)
apikey = self.options.get("apikey")
mode = shlex.split(self.options.get("mode") or "")
if not url or not apikey:
raise CuckooProcessingError(
"Please configure the URL and API key for your MISP instance."
)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
import pymisp
self.misp = pymisp.PyMISP(url, apikey, False, "json")
event = self.misp.new_event(
distribution=pymisp.Distribution.all_communities.value,
threat_level_id=pymisp.ThreatLevel.undefined.value,
analysis=pymisp.Analysis.completed.value,
info="Cuckoo Sandbox analysis #%d" % self.task["id"],
)
if results.get("target", {}).get("category") == "file":
self.misp.upload_sample(
filename=os.path.basename(self.task["target"]),
filepath_or_bytes=self.task["target"],
event_id=event["Event"]["id"],
category="External analysis",
)
self.signature(results, event)
if "hashes" in mode:
description = pulse['description']
malware_families = pulse['malware_families']
references = pulse['references']
tlp = pulse['tlp']
try:
timestamp = dateparser.parse(pulse['created'])
except Exception as ex:
LOGGER.error('Cannot parse pulse creation date: {0}'.format(str(ex)))
timestamp = datetime.utcnow()
event_date = timestamp.strftime('%Y-%m-%d')
event.info = title
event.analysis = Analysis.completed
event.distribution = Distribution.your_organisation_only
event.threat_level_id = ThreatLevel.low
event.add_tag('otx-author:{0}'.format(author))
if adversary:
adversary_list = []
tag_list = []
if ',' in adversary:
adversary_list = [s.strip() for s in adversary.split(',')]
else:
adversary_list.append(adversary)
print(adversary_list)
for adversary in adversary_list: