Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
possible for it to fail (just exceedingly unlikely).
"""
# TODO: docstring is not 100% correct
with get_session() as session:
errors, warnings = validate_request(request, credentials, session)
if errors:
# Fatal errors; cannot be bypassed, even with staff approval
return NewAccountResponse(
status=NewAccountResponse.REJECTED,
errors=(errors + warnings),
)
elif warnings:
# Non-fatal errors; the frontend can choose to create the account
# anyway, submit the account for staff approval, or get a response
# with a list of warnings for further inspection.
if request.handle_warnings == NewAccountRequest.WARNINGS_SUBMIT:
stored_request = StoredNewAccountRequest.from_request(request, str(warnings))
try:
with get_session() as session:
session.add(stored_request) # TODO: error handling
session.commit()
except sqlalchemy.exc.IntegrityError:
# If there's an integrity constraint, it's okay -- the
# account was already submitted, so we can still return a
# "pending" response.
pass
else:
dispatch_event(
'ocflib.account_submitted',
request=dict(request.to_dict(), reasons=warnings),
)
req = NewAccountRequest(
user_name=form.cleaned_data['ocf_login_name'],
real_name=real_name,
is_group=False,
calnet_uid=calnet_uid,
callink_oid=None,
email=form.cleaned_data['contact_email'],
encrypted_password=encrypt_password(
form.cleaned_data['password'],
RSA.importKey(CREATE_PUBLIC_KEY),
),
handle_warnings=NewAccountRequest.WARNINGS_WARN,
)
if 'warnings-submit' in request.POST:
req = req._replace(
handle_warnings=NewAccountRequest.WARNINGS_SUBMIT,
)
task = validate_then_create_account.delay(req)
task.wait(timeout=5)
if isinstance(task.result, NewAccountResponse):
if task.result.status == NewAccountResponse.REJECTED:
status = 'has_errors'
form.add_error(None, task.result.errors)
elif task.result.status == NewAccountResponse.FLAGGED:
status = 'has_warnings'
form.add_error(None, task.result.errors)
elif task.result.status == NewAccountResponse.PENDING:
return HttpResponseRedirect(reverse('account_pending'))
else:
raise AssertionError('Unexpected state reached')
input('Press enter to continue...')
continue
try:
password = prompt_for_new_password(
validator=lambda pwd: validate_password(
account['user_name'], pwd),
)
except KeyboardInterrupt:
# we want to allow cancelling during the "enter password" stage
# without completely exiting approve
print()
input('Press enter to start over (or ^C again to cancel)...')
continue
request = NewAccountRequest(
user_name=account['user_name'],
real_name=account['group_name'],
is_group=True,
calnet_uid=None,
callink_oid=account['callink_oid'],
email=account['email'],
encrypted_password=encrypt_password(
password,
RSA.importKey(CREATE_PUBLIC_KEY),
),
handle_warnings=NewAccountRequest.WARNINGS_WARN,
)
print()
print(bold('Pending account request:'))
print(dedent(
if not user_attrs_ucb(calnet_uid):
return render(
request,
'account/register/cant-find-in-ldap.html',
{
'calnet_uid': calnet_uid,
'title': 'Unable to read account information',
},
)
real_name = directory.name_by_calnet_uid(calnet_uid)
if request.method == 'POST':
form = ApproveForm(request.POST)
if form.is_valid():
req = NewAccountRequest(
user_name=form.cleaned_data['ocf_login_name'],
real_name=real_name,
is_group=False,
calnet_uid=calnet_uid,
callink_oid=None,
email=form.cleaned_data['contact_email'],
encrypted_password=encrypt_password(
form.cleaned_data['password'],
RSA.importKey(CREATE_PUBLIC_KEY),
),
handle_warnings=NewAccountRequest.WARNINGS_WARN,
)
if 'warnings-submit' in request.POST:
req = req._replace(
handle_warnings=NewAccountRequest.WARNINGS_SUBMIT,
)
def to_request(self, handle_warnings=NewAccountRequest.WARNINGS_CREATE):
"""Convert this object to a NewAccountRequest."""
return NewAccountRequest(**dict(
{
field: getattr(self, field)
for field in NewAccountRequest._fields
if field in self.__table__.columns._data.keys()
},
handle_warnings=handle_warnings,
))
except sqlalchemy.exc.IntegrityError:
# If there's an integrity constraint, it's okay -- the
# account was already submitted, so we can still return a
# "pending" response.
pass
else:
dispatch_event(
'ocflib.account_submitted',
request=dict(request.to_dict(), reasons=warnings),
)
return NewAccountResponse(
status=NewAccountResponse.PENDING,
errors=warnings,
)
elif request.handle_warnings == NewAccountRequest.WARNINGS_WARN:
return NewAccountResponse(
status=NewAccountResponse.FLAGGED,
errors=warnings,
)
return create_account.delay(request).id
def to_request(self, handle_warnings=NewAccountRequest.WARNINGS_CREATE):
"""Convert this object to a NewAccountRequest."""
return NewAccountRequest(**dict(
{
field: getattr(self, field)
for field in NewAccountRequest._fields
if field in self.__table__.columns._data.keys()
},
handle_warnings=handle_warnings,
))