Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_insert_participant_withdrawn(self):
self.insert_codes()
p = Participant(participantId=1, biobankId=2, withdrawalStatus=WithdrawalStatus.NO_USE)
self.participant_dao.insert(p)
self._setup_questionnaire()
qr = QuestionnaireResponse(
questionnaireResponseId=1,
questionnaireId=1,
questionnaireVersion=1,
questionnaireSemanticVersion='V1',
participantId=1,
resource=QUESTIONNAIRE_RESPONSE_RESOURCE
)
qr.answers.extend(self._names_and_email_answers())
with self.assertRaises(Forbidden):
self.questionnaire_response_dao.insert(qr)
def test_cannot_validate_without_request(self):
with self.assertRaises(Forbidden) as ex:
self.csrf.validate()
expected_exception_message = '403 Forbidden: {0}'.format(REASON_NO_REQUEST)
self.assertEqual(str(ex.exception), expected_exception_message)
def download_private_file(path):
"""Checks permissions and sends back private file"""
try:
check_file_permission(path)
except frappe.PermissionError:
raise Forbidden(_("You don't have permission to access this file"))
return send_private_file(path.split("/private", 1)[1])
def update_ghost_participant(self, session, pid):
if not pid:
raise Forbidden("Can not update participant without id")
participant = self.get_for_update(session, pid)
if participant is None:
logging.warning(
f"Tried to mark participant with id: [{pid}] as ghost \
but participant does not exist. Wrong environment?"
)
else:
participant.isGhostId = 1
participant.dateAddedGhost = clock.CLOCK.now()
self._update_history(session, participant, participant)
super(ParticipantDao, self)._do_update(session, participant, participant)
op = patch.get('op', '')
if op != 'undelete':
log.warning('User %s sent unsupported PATCH op %r to project %s: %s',
current_user, op, pid, patch)
raise wz_exceptions.BadRequest(f'unsupported operation {op!r}')
# Get the project to find the user's permissions.
proj_coll = current_app.db('projects')
proj = proj_coll.find_one({'_id': pid})
if not proj:
raise wz_exceptions.NotFound(f'project {pid} not found')
allowed = authorization.compute_allowed_methods('projects', proj)
if 'PUT' not in allowed:
log.warning('User %s tried to undelete project %s but only has permissions %r',
current_user, pid, allowed)
raise wz_exceptions.Forbidden(f'no PUT access to project {pid}')
if not proj.get('_deleted', False):
raise wz_exceptions.BadRequest(f'project {pid} was not deleted, unable to undelete')
# Undelete the files. We cannot do this via Eve, as it doesn't support
# PATCHing collections, so direct MongoDB modification is used to set
# _deleted=False and provide new _etag and _updated values.
new_etag = random_etag()
log.debug('undeleting files before undeleting project %s', pid)
files_coll = current_app.db('files')
update_result = files_coll.update_many(
{'project': pid},
{'$set': {'_deleted': False,
'_etag': new_etag,
'_updated': utcnow()}})
def _checkProtection(self):
if not session.user:
raise Forbidden
# If the user cannot manage the whole event see if anything gives them
# limited management access.
if not self.event_new.can_manage(session.user):
urls = sorted(values_from_signal(signals.event_management.management_url.send(self.event_new),
single_value=True))
response = redirect(urls[0]) if urls else None
raise Forbidden(response=response)
RHManageEventBase._checkProtection(self) # mainly to trigger the legacy "event locked" check
def post(self, name):
if not may(DELETE):
raise Forbidden()
try:
with current_app.storage.open(name) as item:
if not item.meta[COMPLETE] and not may(ADMIN):
error = 'Upload incomplete. Try again later.'
return render_template('error.html', heading=item.meta[FILENAME], body=error), 409
if item.meta[LOCKED] and not may(ADMIN):
raise Forbidden()
current_app.storage.remove(name)
except (OSError, IOError) as e:
if e.errno == errno.ENOENT:
raise NotFound()
raise
return redirect_next_referrer('bepasty.index')
def handle_error(response):
if response.status_code == BadRequest.code:
raise BadRequest(_get_response_message(response))
elif response.status_code == Forbidden.code:
raise Forbidden(_get_response_message(response))
elif response.status_code == InternalServerError.code:
raise InternalServerError(_get_response_message(response))
elif response.status_code == NotFound.code:
raise NotFound(_get_response_message(response))
elif response.status_code == ServiceUnavailable.code:
raise ServiceUnavailable(_get_response_message(response))
elif response.status_code == Unauthorized.code:
raise Unauthorized(_get_response_message(response))
response.raise_for_status()
if blueprint and blueprint in state.bp_access_controllers:
access_controllers.extend(state.bp_access_controllers[blueprint])
if endpoint and endpoint in state.endpoint_access_controllers:
access_controllers.extend(state.endpoint_access_controllers[endpoint])
for access_controller in reversed(access_controllers):
verdict = access_controller(user=user, roles=user_roles)
if verdict is None:
continue
elif verdict is True:
return None
else:
if user.is_anonymous:
return self.redirect_to_login()
raise Forbidden()
# default policy
if current_app.config.get("PRIVATE_SITE") and user.is_anonymous:
return self.redirect_to_login()
return None
def view_job_depsgraph(project, job_id):
# Job list is public, job details are not.
if not current_user.has_cap('flamenco-view'):
raise wz_exceptions.Forbidden()
focus_task_id = request.args.get('t', None)
return render_template('flamenco/jobs/depsgraph.html',
job_id=job_id,
project=project,
focus_task_id=focus_task_id)