Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@bottle.post('/collections/')
@auth.restricted
def collections_process():
""" Process URLs that were passed through the collection form """
urls = request.forms.get('urls')
if urls is None:
return "no URLs given"
urls = list(set([url.strip() for url in urls.strip().split('\n')]))
batch = Batch.process_urls(
urls,
base_dir=request.app.config['artex.directory'],
max_procs=int(request.app.config['artex.processes']))
bottle.redirect('/batches/%s' % batch.id)
@post("/build", no_i18n=True)
@csrf_protect
def build():
if app.workers.in_progress('build-pelican'):
emit(EVENT_ALREADY_BUILDING)
redirect('/')
return
cache_dir = app._config['henet']['cache_dir']
content_dir = app._config['henet']['pelican_content_path']
cmd = app._config['henet']['build_command']
cmd_dir = app._config['henet'].get('build_working_dir', os.getcwd())
client_id = request.remote_addr
def done_building(client_id, *args):
@post('/login/mastodon')
def login_mastodon(user):
"""
Mastodon OAuth authentication process.
:return: redirect to city page.
"""
# get app tokens
instance_url = request.forms.get('instance_url')
masto_email = request.forms.get('email')
masto_pass = request.forms.get('pass')
client_id, client_secret = user.get_mastodon_app_keys(instance_url)
m = Mastodon(client_id=client_id, client_secret=client_secret,
api_base_url=instance_url)
try:
access_token = m.log_in(masto_email, masto_pass)
user.save_masto_token(access_token, instance_url)
return city_page(user.get_city(), info='Thanks for supporting decentralized social networks!')
@bottle.post(login_path)
@bottle.view(login_view)
def login():
""" Handle login request """
errors = {}
# Get form data
forms = request.forms
redir = forms.get('redir', redir_path)
email = forms.get('email', '')
remember = forms.get('remember', 's')
password = forms.get('password', '')
# Process authentication request
user = User.authenticate(email, password)
if user is None:
errors = {'_': 'Invalid email or password'}
@post('/goto/')
def goto(where):
controller.goto(where)
return "OK"
@bottle.post('/sendmail')
def sendEmail():
fp = open('static/OUTPUT.csv', 'wb')
c = csv.writer(fp, delimiter=',',quoting=csv.QUOTE_ALL)
c.writerow(["Twitter Name","Mentions","Followers","Follower Score","Retweets","Retweet Score","Total Score"])
# get all the documents
z = []
view = db.all_docs()
for doc in view.iter(params={'include_docs': True}):
z.append(doc['doc'])
pass
cursor = list(z)
totinf = int(len(cursor))
for j in range(0, totinf):
c.writerow([cursor[j]['twitname'],cursor[j]['mcount'],cursor[j]['fcount'],cursor[j]['fscore'],cursor[j]['rtcount'],cursor[j]['rtscore'],cursor[j]['totalscore']])
fp.close()
@post('/updatealertschedules/', methods=['POST'])
@enable_cors
def update_alert_schedules():
'''an endpoint to return alerts schedules'''
if not request.body:
response.status = 503
return response
alert_schedules = json.loads(request.body.read())
request.body.close()
response.content_type = "application/json"
mongoclient = MongoClient(options.mongohost, options.mongoport)
schedulers_db = mongoclient.meteor['alertschedules'].with_options(codec_options=CodecOptions(tz_aware=True))
schedulers_db.remove()
for alert_name, alert_schedule in alert_schedules.items():
@post("/port/up")
@required_parameters("switch", "port")
def port_up(p):
result = port_mod(p['switch'], p['port'], 'up')
return respond(result == 0,
"Switch %s port %s up\n" % (p['switch'], p['port']),
"Fail switch %s port %s up\n" % (p['switch'], p['port']))
@bottle.post("/meta/logging-level")
def set_log_level():
"""levels = {"DEBUG": logging.DEBUG, "INFO": logging.INFO,
"WARNING": logging.WARNING, "ERROR": logging.ERROR, "CRITICAL": logging.CRITICAL}
"""
logger.setLevel(request.POST.get("level").upper())
@post(siteconfig["rootPath"] + "/entrydelete.json")
@authDict(["canEdit"])
def entrydelete(dictID, user, dictDB, configs):
ops.deleteEntry(dictDB, request.forms.id, user["email"])
return {"success": True, "id": request.forms.id}