Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _parse_basic_auth(value):
try:
_decoded = base64.decodestring(value.encode()).decode()
username, password = _decoded.split(':', 1)
except ValueError:
raise BadRequest('Invalid basic authorization')
return username, password
k = full_k.split(':')[-1] # filter out selector
else:
k = full_k
if k.startswith('_') or type(defaults.get(k, None)) is MethodType:
del args[full_k]
elif k in defaults.keys():
default_type = type(defaults[k])
if default_type is not TupleType and type(value) is TupleType:
args[k] = value = value[-1]
elif default_type is TupleType and type(value) is ListType:
value = tuple(value)
if type(value) is not default_type:
try:
args[full_k] = corr_func.get(default_type, lambda x: x)(value)
except ValueError as e:
raise werkzeug.exceptions.BadRequest(
description='Failed to process parameter "{0}": {1}'.format(full_k, e))
else:
if del_nondef:
del args[full_k]
return args
@app.route("%s/flagstolen/" % prefix, methods=['POST'])
def flagstolen_set(flag_id):
"""
curl -i -H "Content-Type: application/json" -X POST -d '{"player_id":"1", "activity":"dns"}' http://localhost:5000/scorebot/api/v1.0/flagstolen/
"""
if not request.json:
raise BadRequest("No data sent!")
flag = Flags.query.get(flag_id)
if not flag:
raise BadRequest("Flag with id of %d not found" % flag_id)
if 'player_id' in request.json:
player_id = request.json['player_id']
player = Players.query.get(flag_id)
if not player:
raise BadRequest("Player with id of %d not found" % flag_id)
if 'activity' in request.json:
activity = request.json['activity']
else:
raise BadRequest("Activity not sent!")
date = datetime.utcnow()
flagstolen = FlagsStolen(playerID=player_id, flagID=flag_id,
activity=activity, datetime=date)
db.session.add(flagstolen)
db.session.commit()
return jsonify({}), 201
else:
raise BadRequest("Blueteam %s not found!" % name)
if 'game' in request.json:
name = request.json['game']
else:
raise BadRequest("Game name not sent!")
game = Games.query.filter_by(name=name).first()
if game:
gameID = game.gameID
else:
raise BadRequest("Game %s not found!" % name)
print btID, gameID
if 'hostname' in request.json:
hostname = request.json['hostname']
else:
raise BadRequest("Hostname not sent!")
if 'value' in request.json:
value = request.json['value']
else:
raise BadRequest("Value not sent!")
host = Hosts(blueteamID=btID, gameID=gameID, hostname=hostname, value=value)
db.session.add(host)
db.session.commit()
return jsonify({}), 201
# todo: i need a root node.. i don't knot if i can serialize anonymous
# root lists.. so i'm serializing, adding root node, then encoding back
# to a json str.. sorry for the waste.
# status = response.status
if status == 401:
raise exceptions.Unauthorized('''Error with Github api request:
401, user is not authorized: %s.''' % result)
elif status == 403:
raise exceptions.MethodNotAllowed('''Error with Github api request:
403, user is authenticated, but doesn't have permissions.''')
elif status == 404:
raise exceptions.NotFound('''Error with Github api request:
404, data not found: %s''' % result)
elif status > 299:
raise exceptions.BadRequest(
'Error with github api request: %s, %s' % (status, result))
return '{"response": ' + result + '}'
for age in item.get("workspaceDemographic").get('age'):
try:
age_array.append(int(WorkbenchWorkspaceAge(age)))
except TypeError:
raise BadRequest("WorkspaceID:{} Invalid age for workspaceDemographic: {}"
.format(item.get('workspaceId'), age))
item['age'] = age_array
try:
if item.get("workspaceDemographic").get('sexAtBirth') is None:
item['sexAtBirth'] = 'UNSET'
else:
item["sexAtBirth"] = item.get("workspaceDemographic").get("sexAtBirth")
WorkbenchWorkspaceSexAtBirth(item['sexAtBirth'])
except TypeError:
raise BadRequest("WorkspaceID:{} Invalid sexAtBirth for workspaceDemographic: {}"
.format(item.get('workspaceId'), item.get('sexAtBirth')))
try:
if item.get("workspaceDemographic").get('genderIdentity') is None:
item['genderIdentity'] = 'UNSET'
else:
item["genderIdentity"] = item.get("workspaceDemographic").get("genderIdentity")
WorkbenchWorkspaceGenderIdentity(item['genderIdentity'])
except TypeError:
raise BadRequest("WorkspaceID:{} Invalid genderIdentity for workspaceDemographic: {}"
.format(item.get('workspaceId'), item.get('genderIdentity')))
try:
if item.get("workspaceDemographic").get('sexualOrientation') is None:
item['sexualOrientation'] = 'UNSET'
else:
def _get_contribution_updates(self, data):
updates = {'parent': None}
contribution = Contribution.query.with_parent(self.event).filter_by(id=data['contribution_id']).first()
if contribution is None:
raise BadRequest('Invalid contribution id')
elif contribution.timetable_entry is not None:
raise BadRequest('The contribution is already scheduled')
updates['object'] = contribution
if data.get('session_block_id'):
session_block = self.event.get_session_block(data['session_block_id'])
if session_block is None:
raise BadRequest('Invalid session block id')
if session_block.timetable_entry is None:
raise BadRequest('The session block is not scheduled')
if contribution.session and session_block.session != contribution.session and not data.get('force'):
raise BadRequest('Contribution is assigned to another session')
updates['parent'] = session_block.timetable_entry
contribution.session = session_block.session
else:
updates['object'].session = None
return updates
def _get_hpo_id(obj):
hpo_name = _get_hpo_name_from_participant(obj)
if hpo_name:
hpo = HPODao().get_by_name(hpo_name)
if not hpo:
raise BadRequest(f"No HPO found with name {hpo_name}")
return hpo.hpoId
else:
return UNSET_HPO_ID
def _process_POST(self):
"""Create new timetable entry"""
data = request.json
required_keys = {'start_dt'}
allowed_keys = {'start_dt', 'contribution_id', 'session_block_id', 'force'}
if set(data.viewkeys()) > allowed_keys:
raise BadRequest('Invalid keys found')
elif required_keys > set(data.viewkeys()):
raise BadRequest('Required keys missing')
updates = {'start_dt': dateutil.parser.parse(data['start_dt'])}
if 'contribution_id' in data:
updates.update(self._get_contribution_updates(data))
# TODO: breaks & session blocks
else:
raise BadRequest('No object specified')
entry = create_timetable_entry(self.event, updates)
return jsonify(start_dt=entry.start_dt.isoformat(), id=entry.id)