Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def abort_request(request):
abort(401)
field = 'title'
h = hateoas([b for b in motw.books.values()
if unq(q).lower() in b[field].lower()],
request,
title)
return rs.raw(h, content_type='application/json')
elif field in ['authors', 'languages', 'tags', '']:
h = hateoas([b for b in motw.books.values()
if unq(q).lower() in " ".join(b[field]).lower()],
request,
title)
return rs.raw(h, content_type='application/json')
except Exception as e:
abort(404, e)
library_secret = (request.headers.get('Library-Secret') or
request.headers.get('library-secret'))
r = re.match("[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}",
library_secret)
if not r or verb not in ['add', 'remove', 'bookids']:
abort(422, "Wrong verb, ha!")
if verb == 'add':
if library_uuid not in motw.library['collectionids']:
motw.library['collectionids'] = {library_uuid: library_secret}
with open("motw_cache/{}".format(library_uuid), 'wb') as f:
pickle.dump([], f)
motw.dump_collections(motw.library['collectionids'])
return text("{} added. Let's share books...".format(library_uuid))
else:
abort(422, "Library already added.")
elif verb == 'remove' and library_uuid in motw.library['collectionids']:
if check_library_secret(library_uuid, library_secret):
del motw.library['collectionids'][library_uuid]
motw.dump_collections(motw.library['collectionids'])
return text("{} removed.".format(library_uuid))
elif verb == 'bookids' and library_uuid in motw.library['collectionids']:
if check_library_secret(library_uuid, library_secret):
bookids = ["{}___{}".format(book['_id'], book['last_modified'])
for book in motw.books.values()
if library_uuid == book['library_uuid']]
if bookids == []:
try:
with (open("motw_cache/{}".format(library_uuid), 'rb')) as f:
bookids = ["{}___{}".format(book['_id'], book['last_modified'])
for book in pickle.load(f)]
except Exception as e:
async def upload_payload_code(request, user, ptype):
if user['auth'] not in ['access_token', 'apitoken']:
abort(status_code=403, message="Cannot access via Cookies. Use CLI or access via JS in browser")
payload_type = unquote_plus(ptype)
try:
query = await db_model.payloadtype_query()
payloadtype = await db_objects.get(query, ptype=payload_type)
except Exception as e:
print(e)
return json({'status': 'error', 'error': 'failed to find payload'})
if request.files:
code = request.files['upload_file'][0].body
code_file = open("./app/payloads/{}/payload/{}".format(payloadtype.ptype, request.files['upload_file'][0].name),
"wb")
code_file.write(code)
code_file.close()
for i in range(1, int(request.form.get('file_length'))):
code = request.files['upload_file_' + str(i)][0].body
code_file = open(
async def remove_browserscript(request, user, bid):
if user['auth'] not in ['access_token', 'apitoken']:
abort(status_code=403, message="Cannot access via Cookies. Use CLI or access via JS in browser")
try:
query = await db_model.operator_query()
operator = await db_objects.get(query, username=user['username'])
query = await db_model.browserscript_query()
browserscript = await db_objects.get(query, id=bid, operator=operator)
browserscript_json = browserscript.to_json()
await db_objects.delete(browserscript)
return json({'status': 'success', **browserscript_json})
except Exception as e:
print(str(e))
return json({"status": "error", 'error': 'failed to find information: ' + str(e)})
async def remove_apitokens(request, user, tid):
if user['auth'] not in ['access_token', 'apitoken']:
abort(status_code=403, message="Cannot access via Cookies. Use CLI or access via JS in browser")
try:
query = await db_model.operator_query()
operator = await db_objects.get(query, username=user['username'])
query = await db_model.apitokens_query()
apitoken = await db_objects.get(query, id=tid, operator=operator)
apitoken_json = apitoken.to_json()
await db_objects.delete(apitoken)
return json({'status': 'success', **apitoken_json})
except Exception as e:
print(str(e))
return json({"status": "error", 'error': 'failed to find user or tokens'})
async def post(request):
"""
Mattermost new post event handler
"""
schema = BotSchema().load(request.form)
if schema.errors:
abort(400, schema.errors)
message = "I received \"{}\" from @{}".format(schema.data['text'],
schema.data['user_name'])
return response.json({"text": message})
async def get_all_mitre_attack_ids(request, user):
if user['auth'] not in ['access_token', 'apitoken']:
abort(status_code=403, message="Cannot access via Cookies. Use CLI or access via JS in browser")
try:
query = await db_model.attack_query()
attack_entries = await db_objects.execute(query)
return json({'status': 'success', 'attack': [a.to_json() for a in attack_entries]})
except Exception as e:
return json({'status': 'error', 'error': str(e)})
def book(request, book_id):
try:
return rs.raw(rjson.dumps(
motw.books[book_id],
datetime_mode=rjson.DM_ISO8601).encode(),
content_type='application/json')
except Exception as e:
abort(404, e)
async def update_command(request, user, id):
if user['auth'] not in ['access_token', 'apitoken']:
abort(status_code=403, message="Cannot access via Cookies. Use CLI or access via JS in browser")
updated_command = False
try:
query = await db_model.command_query()
command = await db_objects.get(query, id=id)
query = await db_model.operator_query()
operator = await db_objects.get(query, username=user['username'])
except Exception as e:
print(e)
return json({'status': 'error', 'error': 'failed to get command'})
if request.form:
data = js.loads(request.form.get('json'))
else:
data = request.json
if "description" in data and data['description'] != command.description:
command.description = data['description']
updated_command = True