Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_culprit(data):
mgr = EventManager(data)
mgr.normalize()
return mgr.get_culprit()
def test_invalid_transaction(self):
dict_input = {"messages": "foo"}
manager = EventManager(make_event(transaction=dict_input))
manager.normalize()
event = manager.save(1)
assert event.transaction is None
def test_similar_message_prefix_doesnt_group(self):
# we had a regression which caused the default hash to just be
# 'event.message' instead of '[event.message]' which caused it to
# generate a hash per letter
manager = EventManager(make_event(event_id="a", message="foo bar"))
manager.normalize()
event1 = manager.save(1)
manager = EventManager(make_event(event_id="b", message="foo baz"))
manager.normalize()
event2 = manager.save(1)
assert event1.group_id != event2.group_id
def test_event_saved_signal(self):
mock_event_saved = mock.Mock()
event_saved.connect(mock_event_saved)
manager = EventManager(make_event(message="foo"))
manager.normalize()
event = manager.save(1)
assert_mock_called_once_with_partial(
mock_event_saved, project=event.group.project, sender=EventManager, signal=event_saved
)
def inner(data):
mgr = EventManager(data={"debug_meta": data})
mgr.normalize()
evt = eventstore.create_event(data=mgr.get_data())
interface = evt.interfaces.get("debug_meta")
insta_snapshot(
{"errors": evt.data.get("errors"), "to_json": interface and interface.to_json()}
)
def test_userreport(default_project, monkeypatch):
"""
Test that user_report-type kafka messages end up in a user report being
persisted. We additionally test some logic around upserting data in
eventuser which is also present in the legacy endpoint.
"""
event_id = uuid.uuid4().hex
start_time = time.time() - 3600
mgr = EventManager(data={"event_id": event_id, "user": {"email": "markus+dontatme@sentry.io"}})
mgr.normalize()
mgr.save(default_project.id)
evtuser, = EventUser.objects.all()
assert not evtuser.name
assert not UserReport.objects.all()
assert process_userreport(
{
"type": "user_report",
"start_time": start_time,
"payload": json.dumps(
{
"name": "Hans Gans",
def inner(data):
mgr = EventManager(data={"spans": data})
mgr.normalize()
evt = eventstore.create_event(data=mgr.get_data())
interface = evt.interfaces.get("spans")
insta_snapshot({"errors": evt.data.get("errors"), "to_json": interface.to_json()})
def inner(data):
mgr = EventManager(data={"logentry": data})
mgr.normalize()
evt = eventstore.create_event(data=mgr.get_data())
interface = evt.interfaces.get("logentry")
insta_snapshot(
{"errors": evt.data.get("errors"), "to_json": interface and interface.to_json()}
)
def inner(data):
mgr = EventManager(data={"breadcrumbs": data})
mgr.normalize()
evt = eventstore.create_event(data=mgr.get_data())
breadcrumbs = evt.interfaces.get("breadcrumbs")
insta_snapshot(
{"errors": evt.data.get("errors"), "to_json": breadcrumbs and breadcrumbs.to_json()}
)
def get_normalized_event(data, project):
mgr = EventManager(data, project=project)
mgr.normalize()
return dict(mgr.get_data())