Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_attribute(self):
first = self.create_simple_event()
second = self.create_simple_event()
a = second.add_attribute('ip-src', '11.11.11.11')
a.add_tag('testtag_admin_created')
second.distribution = Distribution.all_communities
try:
first = self.user_misp_connector.add_event(first)
second = self.admin_misp_connector.add_event(second, pythonify=True)
# Get attribute
attribute = self.user_misp_connector.get_attribute(first.attributes[0])
self.assertEqual(first.attributes[0].uuid, attribute.uuid)
# Add attribute
new_attribute = MISPAttribute()
new_attribute.value = '1.2.3.4'
new_attribute.type = 'ip-dst'
new_attribute = self.user_misp_connector.add_attribute(first, new_attribute)
self.assertTrue(isinstance(new_attribute, MISPAttribute), new_attribute)
self.assertEqual(new_attribute.value, '1.2.3.4', new_attribute)
# Test attribute already in event
# new_attribute.uuid = str(uuid4())
# new_attribute = self.user_misp_connector.add_attribute(first, new_attribute)
new_similar = MISPAttribute()
new_similar.value = '1.2.3.4'
new_similar.type = 'ip-dst'
similar_error = self.user_misp_connector.add_attribute(first, new_similar)
self.assertEqual(similar_error['errors'][1]['errors']['value'][0], 'A similar attribute already exists for this event.')
# Test add multiple attributes at once
attr1 = MISPAttribute()
first = self.user_misp_connector.get_event(first)
self.assertEqual(first.attributes[-1].value, '5.2.3.4')
# Accept attribute proposal - Attribute update
response = self.user_misp_connector.accept_attribute_proposal(new_proposal_update)
self.assertEqual(response['message'], 'Proposed change accepted.')
attribute = self.user_misp_connector.get_attribute(new_attribute)
self.assertEqual(attribute.to_ids, False)
# Discard attribute proposal
new_proposal_update = self.user_misp_connector.update_attribute_proposal(new_attribute.id, {'to_ids': True})
response = self.user_misp_connector.discard_attribute_proposal(new_proposal_update)
self.assertEqual(response['message'], 'Proposal discarded.')
attribute = self.user_misp_connector.get_attribute(new_attribute)
self.assertEqual(attribute.to_ids, False)
# Test fallback to proposal if the user doesn't own the event
prop_attr = MISPAttribute()
prop_attr.from_dict(**{'type': 'ip-dst', 'value': '123.43.32.21'})
# Add attribute on event owned by someone else
attribute = self.user_misp_connector.add_attribute(second.id, prop_attr)
self.assertTrue(isinstance(attribute, MISPShadowAttribute), attribute)
# Test if add proposal without category works - https://github.com/MISP/MISP/issues/4868
attribute = self.user_misp_connector.add_attribute(second.id, {'type': 'ip-dst', 'value': '123.43.32.22'})
self.assertTrue(isinstance(attribute, MISPShadowAttribute))
# Add attribute with the same value as an existing proposal
prop_attr.uuid = str(uuid4())
attribute = self.admin_misp_connector.add_attribute(second, prop_attr, pythonify=True)
prop_attr.uuid = str(uuid4())
# Add a duplicate attribute (same value)
attribute = self.admin_misp_connector.add_attribute(second, prop_attr, pythonify=True)
self.assertTrue('errors' in attribute)
# Update attribute owned by someone else
attribute = self.user_misp_connector.update_attribute({'comment': 'blah'}, second.attributes[0].id)
attr4.type = 'ip-dst'
attr4.add_tag('tlp:amber___test_unique_not_created')
attr4.add_tag('testtag_admin_created')
response = self.user_misp_connector.add_attribute(first, [attr1, attr2, attr3, attr4])
time.sleep(5)
self.assertTrue(isinstance(response['attributes'], list), response['attributes'])
self.assertEqual(response['attributes'][0].value, '1.2.3.5')
self.assertEqual(response['attributes'][1].value, '1.2.3.6')
self.assertTrue(isinstance(response['attributes'][1].tags, list), response['attributes'][1].to_json())
self.assertTrue(len(response['attributes'][1].tags), response['attributes'][1].to_json())
self.assertEqual(response['attributes'][1].tags[0].name, 'testtag_admin_created')
self.assertEqual(response['errors']['attribute_0']['value'][0], 'A similar attribute already exists for this event.')
self.assertEqual(response['errors']['attribute_2']['value'][0], 'A similar attribute already exists for this event.')
# Add attribute as proposal
new_proposal = MISPAttribute()
new_proposal.value = '5.2.3.4'
new_proposal.type = 'ip-dst'
new_proposal.category = 'Network activity'
new_proposal = self.user_misp_connector.add_attribute_proposal(first.id, new_proposal)
self.assertEqual(new_proposal.value, '5.2.3.4')
# Update attribute
new_attribute.value = '5.6.3.4'
new_attribute = self.user_misp_connector.update_attribute(new_attribute)
self.assertEqual(new_attribute.value, '5.6.3.4')
# Update attribute as proposal
new_proposal_update = self.user_misp_connector.update_attribute_proposal(new_attribute.id, {'to_ids': False})
self.assertEqual(new_proposal_update.to_ids, False)
# Delete attribute as proposal
proposal_delete = self.user_misp_connector.delete_attribute_proposal(new_attribute)
self.assertTrue(proposal_delete['saved'])
# Get attribute proposal
new_attribute.value = '1.2.3.4'
new_attribute.type = 'ip-dst'
new_attribute = self.user_misp_connector.add_attribute(first, new_attribute)
self.assertTrue(isinstance(new_attribute, MISPAttribute), new_attribute)
self.assertEqual(new_attribute.value, '1.2.3.4', new_attribute)
# Test attribute already in event
# new_attribute.uuid = str(uuid4())
# new_attribute = self.user_misp_connector.add_attribute(first, new_attribute)
new_similar = MISPAttribute()
new_similar.value = '1.2.3.4'
new_similar.type = 'ip-dst'
similar_error = self.user_misp_connector.add_attribute(first, new_similar)
self.assertEqual(similar_error['errors'][1]['errors']['value'][0], 'A similar attribute already exists for this event.')
# Test add multiple attributes at once
attr1 = MISPAttribute()
attr1.value = '1.2.3.4'
attr1.type = 'ip-dst'
attr2 = MISPAttribute()
attr2.value = '1.2.3.5'
attr2.type = 'ip-dst'
attr3 = MISPAttribute()
attr3.value = first.attributes[0].value
attr3.type = first.attributes[0].type
attr4 = MISPAttribute()
attr4.value = '1.2.3.6'
attr4.type = 'ip-dst'
attr4.add_tag('tlp:amber___test_unique_not_created')
attr4.add_tag('testtag_admin_created')
response = self.user_misp_connector.add_attribute(first, [attr1, attr2, attr3, attr4])
time.sleep(5)
self.assertTrue(isinstance(response['attributes'], list), response['attributes'])
new_similar.value = '1.2.3.4'
new_similar.type = 'ip-dst'
similar_error = self.user_misp_connector.add_attribute(first, new_similar)
self.assertEqual(similar_error['errors'][1]['errors']['value'][0], 'A similar attribute already exists for this event.')
# Test add multiple attributes at once
attr1 = MISPAttribute()
attr1.value = '1.2.3.4'
attr1.type = 'ip-dst'
attr2 = MISPAttribute()
attr2.value = '1.2.3.5'
attr2.type = 'ip-dst'
attr3 = MISPAttribute()
attr3.value = first.attributes[0].value
attr3.type = first.attributes[0].type
attr4 = MISPAttribute()
attr4.value = '1.2.3.6'
attr4.type = 'ip-dst'
attr4.add_tag('tlp:amber___test_unique_not_created')
attr4.add_tag('testtag_admin_created')
response = self.user_misp_connector.add_attribute(first, [attr1, attr2, attr3, attr4])
time.sleep(5)
self.assertTrue(isinstance(response['attributes'], list), response['attributes'])
self.assertEqual(response['attributes'][0].value, '1.2.3.5')
self.assertEqual(response['attributes'][1].value, '1.2.3.6')
self.assertTrue(isinstance(response['attributes'][1].tags, list), response['attributes'][1].to_json())
self.assertTrue(len(response['attributes'][1].tags), response['attributes'][1].to_json())
self.assertEqual(response['attributes'][1].tags[0].name, 'testtag_admin_created')
self.assertEqual(response['errors']['attribute_0']['value'][0], 'A similar attribute already exists for this event.')
self.assertEqual(response['errors']['attribute_2']['value'][0], 'A similar attribute already exists for this event.')
# Add attribute as proposal
files = [p]
elif p.is_dir():
files = [f for f in p.glob('**/*') if f.is_file()]
else:
print('invalid upload path (must be file or dir)')
exit(0)
if args.is_malware:
arg_type = 'malware-sample'
else:
arg_type = 'attachment'
# Create attributes
attributes = []
for f in files:
a = MISPAttribute()
a.type = arg_type
a.value = f.name
a.data = f
a.comment = args.comment
a.distribution = args.distrib
if args.expand and arg_type == 'malware-sample':
a.expand = 'binary'
attributes.append(a)
if args.event:
for a in attributes:
misp.add_attribute(args.event, a)
else:
m = MISPEvent()
m.info = args.info
m.distribution = args.distrib
def form_attr_obj(self, type, value, file=None):
try:
attr = MISPAttribute()
attr.type = type
attr.value = value
if file is not None:
path = Path(file)
attr.data = path
self.attributes.append(attr)
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
print("ERROR: Error in {location}.{funct_name}() - line {line_no} : {error}"
.format(location=__name__, funct_name=sys._getframe().f_code.co_name, line_no=exc_tb.tb_lineno,
error=str(e)))
if self.type is None:
self.type = definition['misp-attribute']
self.disable_correlation = kwargs.pop('disable_correlation', None)
if self.disable_correlation is None:
# The correlation can be disabled by default in the object definition.
# Use this value if it isn't overloaded by the object
self.disable_correlation = definition.get('disable_correlation')
self.to_ids = kwargs.pop('to_ids', None)
if self.to_ids is None:
# Same for the to_ids flag
self.to_ids = definition.get('to_ids')
# Initialise rest of the values
for k, v in kwargs.items():
self[k] = v
# FIXME: dirty hack until all the classes are ported to the new format but we get the default values
temp_attribute = MISPAttribute()
temp_attribute.set_all_values(**self)
# Update default values
self.from_dict(**temp_attribute.to_dict())