Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, params):
self.initial_user_connector = ExpandedPyMISP(params['url'], params['key'], ssl=False, debug=False)
# Git pull
self.initial_user_connector.update_misp()
# Set the default role (id 3 on the VM is normal user)
self.initial_user_connector.set_default_role(3)
# Restart workers
self.initial_user_connector.restart_workers()
if not fast_mode:
# Load submodules
self.initial_user_connector.update_object_templates()
self.initial_user_connector.update_galaxies()
self.initial_user_connector.update_noticelists()
self.initial_user_connector.update_warninglists()
self.initial_user_connector.update_taxonomies()
self.initial_user_connector.toggle_global_pythonify()
def create_sync_user(self, organisation):
sync_org = self.site_admin_connector.add_organisation(organisation)
short_org_name = sync_org.name.lower().replace(' ', '-')
user = MISPUser()
user.email = f"sync_user@{short_org_name}.local"
user.org_id = sync_org.id
user.role_id = 5 # Org admin
sync_user = self.site_admin_connector.add_user(user)
sync_user_connector = ExpandedPyMISP(self.site_admin_connector.root_url, sync_user.authkey, ssl=False, debug=False)
sync_server_config = sync_user_connector.get_sync_config(pythonify=True)
self.sync.append((sync_org, sync_user, sync_server_config))
def init(url, key, verifycert):
'''
Template to get MISP module started
'''
return ExpandedPyMISP(url, key, verifycert, 'json')
def init(url, key, verifycert):
'''
Template to get MISP module started
'''
return ExpandedPyMISP(url, key, verifycert, 'json')
def __init__(self):
self.misp = ExpandedPyMISP(misp_url, misp_key, misp_verifycert)
def __init__(self):
self.config = DxlClientConfig.create_dxl_config_from_file(dxl_config)
self.misp = ExpandedPyMISP(misp_url, misp_key, misp_verify)
self.tags = self.misp.tags()
self.attributes = []
self.found = False
# Capture default distribution and sharing group settings. Backwards compatability and empty string check
self.distribution = cfg.misp.get("misp_distribution", None)
self.distribution = None if self.distribution == "" else self.distribution
if type(self.distribution) not in (type(None), int):
self.distribution = None
self.log('info', "The distribution stored in viper config is not an integer, setting to None")
self.sharinggroup = cfg.misp.get("misp_sharinggroup", None)
self.sharinggroup = None if self.sharinggroup == "" else self.sharinggroup
if type(self.sharinggroup) not in (type(None), int):
self.sharinggroup = None
self.log('info', "The sharing group stored in viper config is not an integer, setting to None")
if not self.offline_mode:
try:
self.misp = ExpandedPyMISP(self.url, self.key, ssl=verify, proxies=cfg.misp.proxies, cert=cfg.misp.cert)
except PyMISPError as e:
self.log('error', e.message)
return
# Require an open MISP session
if self.args.subname in ['add_hashes', 'add', 'show', 'publish'] and not __sessions__.is_attached_misp():
return
# Require an open file session
if self.args.subname in ['upload'] and not __sessions__.is_attached_file():
return
try:
if self.args.subname == 'upload':
self.upload()
elif self.args.subname == 'search':
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pymisp import ExpandedPyMISP
from keys import misp_url, misp_key, misp_verifycert
import argparse
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Delete the user with the given id. Keep in mind that disabling users (by setting the disabled flag via an edit) is always prefered to keep user associations to events intact.')
parser.add_argument("-i", "--user_id", help="The id of the user you want to delete.")
args = parser.parse_args()
misp = ExpandedPyMISP(misp_url, misp_key, misp_verifycert)
print(misp.delete_user(args.user_id))
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pymisp import ExpandedPyMISP, MISPEvent
from keys import misp_url, misp_key, misp_verifycert
import argparse
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Update a MISP event.")
parser.add_argument("-e", "--event", required=True, help="Event ID to update.")
parser.add_argument("-i", "--input", required=True, help="Input file")
args = parser.parse_args()
misp = ExpandedPyMISP(misp_url, misp_key, misp_verifycert)
me = MISPEvent()
me.load_file(args.input)
result = misp.update_event(args.event, me)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Add Fail2ban object.')
parser.add_argument("-b", "--banned_ip", required=True, help="Banned IP address.")
parser.add_argument("-a", "--attack_type", required=True, help="Type of attack.")
parser.add_argument("-t", "--tag", required=True, help="Tag to search on MISP.")
parser.add_argument("-p", "--processing_timestamp", help="Processing timestamp.")
parser.add_argument("-f", "--failures", help="Amount of failures that lead to the ban.")
parser.add_argument("-s", "--sensor", help="Sensor identifier.")
parser.add_argument("-v", "--victim", help="Victim identifier.")
parser.add_argument("-l", "--logline", help="Logline (base64 encoded).")
parser.add_argument("-F", "--logfile", help="Path to a logfile to attach.")
parser.add_argument("-n", "--force_new", action='store_true', default=False, help="Force new MISP event.")
parser.add_argument("-d", "--disable_new", action='store_true', default=False, help="Do not create a new Event.")
args = parser.parse_args()
pymisp = ExpandedPyMISP(misp_url, misp_key, misp_verifycert, debug=True)
event_id = -1
me = None
if args.force_new:
me = create_new_event()
else:
response = pymisp.search_index(tag=args.tag, timestamp='1h', pythonify=True)
if response:
if args.disable_new:
event_id = response[0].id
else:
last_event_date = parse(response[0].date).date()
nb_attr = response[0].attribute_count
if last_event_date < date.today() or int(nb_attr) > 1000:
me = create_new_event()
else:
event_id = response[0].id