Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Handler for /api/v1/sketches//aggregation/explore/
Args:
sketch_id: Integer primary key for a sketch database model
Returns:
JSON with aggregation results
"""
form = AggregationExploreForm.build(request)
if not form.validate_on_submit():
abort(
HTTP_STATUS_CODE_BAD_REQUEST,
'Not able to run aggregation, unable to validate form data.')
sketch = Sketch.query.get_with_acl(sketch_id)
sketch_indices = {
t.searchindex.index_name
for t in sketch.timelines
}
aggregation_dsl = form.aggregation_dsl.data
aggregator_name = form.aggregator_name.data
if aggregator_name:
if isinstance(form.aggregator_parameters.data, dict):
aggregator_parameters = form.aggregator_parameters.data
else:
aggregator_parameters = json.loads(
form.aggregator_parameters.data)
agg_class = aggregator_manager.AggregatorManager.get_aggregator(
abort(
HTTP_STATUS_CODE_BAD_REQUEST,
'Failed to create timeline, upload not enabled')
form = CreateTimelineForm()
if not form.validate_on_submit():
abort(
HTTP_STATUS_CODE_BAD_REQUEST,
'Failed to create timeline, form data not validated')
sketch_id = form.sketch_id.data
timeline_name = form.name.data
sketch = None
if sketch_id:
sketch = Sketch.query.get_with_acl(sketch_id)
# We do not need a human readable filename or
# datastore index name, so we use UUIDs here.
index_name = uuid.uuid4().hex
if not isinstance(index_name, six.text_type):
index_name = codecs.decode(index_name, 'utf-8')
# Create the search index in the Timesketch database
searchindex = SearchIndex.get_or_create(
name=timeline_name,
description=timeline_name,
user=current_user,
index_name=index_name)
searchindex.grant_permission(permission='read', user=current_user)
searchindex.grant_permission(permission='write', user=current_user)
searchindex.grant_permission(
def post(self, sketch_id, timeline_id):
"""Handles GET request to the resource.
Args:
sketch_id: Integer primary key for a sketch database model
timeline_id: Integer primary key for a timeline database model
"""
sketch = Sketch.query.get_with_acl(sketch_id)
timeline = Timeline.query.get(timeline_id)
form = TimelineForm.build(request)
# Check that this timeline belongs to the sketch
if timeline.sketch_id != sketch.id:
abort(
HTTP_STATUS_CODE_NOT_FOUND,
'The sketch ID ({0:d}) does not match with the timeline '
'sketch ID ({1:d})'.format(sketch.id, timeline.sketch_id))
if not sketch.has_permission(user=current_user, permission='write'):
abort(
HTTP_STATUS_CODE_FORBIDDEN,
'The user does not have write permission on the sketch.')
if not form.validate_on_submit():
def post(self, sketch_id):
"""Handles POST request to the resource.
Returns:
A string with the response from running the analyzer.
"""
sketch = Sketch.query.get_with_acl(sketch_id)
if not sketch.has_permission(current_user, 'read'):
return abort(
HTTP_STATUS_CODE_FORBIDDEN,
'User does not have read permission on the sketch.')
form = request.json
if not form:
form = request.data
if not form:
return abort(
HTTP_STATUS_CODE_FORBIDDEN,
'Unable to run an analyzer without any data submitted.')
timeline_id = form.get('timeline_id')
if not timeline_id:
def get(self, sketch_id):
"""Handles GET request to the resource.
Args:
sketch_id: Integer primary key for a sketch database model
Returns:
Views in JSON (instance of flask.wrappers.Response)
"""
sketch = Sketch.query.get_with_acl(sketch_id)
return self.to_json(sketch.get_named_views)
def story(sketch_id, story_id=None):
"""Generates the story list template.
Returns:
Template with context.
"""
sketch = Sketch.query.get_with_acl(sketch_id)
graphs_enabled = current_app.config['GRAPH_BACKEND_ENABLED']
current_story = None
if story_id:
current_story = Story.query.get(story_id)
return render_template(
'sketch/stories.html', sketch=sketch, story=current_story,
graphs_enabled=graphs_enabled)
def get(self, sketch_id):
"""Handles GET request to the resource.
Args:
sketch_id: Integer primary key for a sketch database model
Returns:
Number of events in JSON (instance of flask.wrappers.Response)
"""
sketch = Sketch.query.get_with_acl(sketch_id)
indices = [t.searchindex.index_name for t in sketch.active_timelines]
count = self.datastore.count(indices)
meta = dict(count=count)
schema = dict(meta=meta, objects=[])
return jsonify(schema)
def post(self, sketch_id):
"""Handles POST request to the resource.
Args:
sketch_id: Integer primary key for a sketch database model
"""
sketch = Sketch.query.get_with_acl(sketch_id)
form = request.json
# TODO: Add granular ACL controls.
# https://github.com/google/timesketch/issues/1016
if not sketch.has_permission(user=current_user, permission='write'):
abort(
HTTP_STATUS_CODE_FORBIDDEN,
'The user does not have write permission on the sketch.')
for username in form.get('users', []):
# Try the username with any potential @domain preserved.
user = User.query.filter_by(username=username).first()
# If no hit, then try to strip the domain.
if not user:
def get(self, sketch_id):
"""Handles GET request to the resource.
Handler for /api/v1/sketches/:sketch_id/event/
Args:
sketch_id: Integer primary key for a sketch database model
Returns:
JSON of the datastore event
"""
args = self.parser.parse_args()
sketch = Sketch.query.get_with_acl(sketch_id)
searchindex_id = args.get('searchindex_id')
searchindex = SearchIndex.query.filter_by(
index_name=searchindex_id).first()
event_id = args.get('event_id')
indices = [t.searchindex.index_name for t in sketch.timelines]
# Check if the requested searchindex is part of the sketch
if searchindex_id not in indices:
abort(
HTTP_STATUS_CODE_BAD_REQUEST,
'Search index ID ({0!s}) does not belong to the list '
'of indices'.format(searchindex_id))
result = self.datastore.get_event(searchindex_id, event_id)
event = Event.query.filter_by(