Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def edit_user_profile_page(id):
user = UserModel.query.options(db.joinedload('roles')).filter_by(id=id).first()
the_tz = user.timezone if user.timezone else get_default_timezone()
if user is None or user.social_id.startswith('disabled$'):
abort(404)
if 'disable_mfa' in request.args and int(request.args['disable_mfa']) == 1:
user.otp_secret = None
db.session.commit()
#docassemble.webapp.daredis.clear_user_cache()
return redirect(url_for('edit_user_profile_page', id=id))
if 'reset_email_confirmation' in request.args and int(request.args['reset_email_confirmation']) == 1:
user.confirmed_at = None
db.session.commit()
#docassemble.webapp.daredis.clear_user_cache()
return redirect(url_for('edit_user_profile_page', id=id))
if daconfig.get('admin can delete account', True) and user.id != current_user.id:
if 'delete_account' in request.args and int(request.args['delete_account']) == 1:
from docassemble.webapp.server import user_interviews, r, r_user
from docassemble.webapp.backend import delete_user_data
user_interviews(user_id=id, secret=None, exclude_invalid=False, action='delete_all', delete_shared=False)
delete_user_data(id, r, r_user)
db.session.commit()
flash(word('The user account was deleted.'), 'success')
return redirect(url_for('user_list'))
if 'delete_account_complete' in request.args and int(request.args['delete_account_complete']) == 1:
from docassemble.webapp.server import user_interviews, r, r_user
from docassemble.webapp.backend import delete_user_data
user_interviews(user_id=id, secret=None, exclude_invalid=False, action='delete_all', delete_shared=True)
delete_user_data(id, r, r_user)
def get_role(db, name):
the_role = Role.query.filter_by(name=name).first()
if the_role:
return the_role
the_role = Role(name=name)
db.session.add(the_role)
db.session.commit()
return the_role
def sql_delete(key):
GlobalObjectStorage.query.filter_by(key=key).delete()
db.session.commit()
social_id=new_social,
email=defaults['email'],
user_auth=user_auth,
first_name = defaults.get('first_name', ''),
last_name = defaults.get('last_name', ''),
country = defaults.get('country', ''),
subdivisionfirst = defaults.get('subdivisionfirst', ''),
subdivisionsecond = defaults.get('subdivisionsecond', ''),
subdivisionthird = defaults.get('subdivisionthird', ''),
organization = defaults.get('organization', ''),
confirmed_at = datetime.datetime.now()
)
the_user.roles.append(role)
db.session.add(user_auth)
db.session.add(the_user)
db.session.commit()
return the_user
def delete_by_key(self, key):
self._initialize()
MachineLearning.query.filter_by(group_id=self.group_id, key=key).delete()
db.session.commit()
self.reset()
def save(self):
def set_dependent_by_id(self, the_id, the_dependent):
self._initialize()
existing_entry = MachineLearning.query.filter_by(group_id=self.group_id, id=the_id).with_for_update().first()
if existing_entry is None:
db.session.commit()
raise Exception("There was no entry in the database for id " + str(the_id) + " with group id " + str(self.group_id))
existing_entry.dependent = codecs.encode(pickle.dumps(the_dependent), 'base64').decode()
existing_entry.modtime = datetime.datetime.utcnow()
existing_entry.active = True
db.session.commit()
def delete_by_id(self, the_id):
continue
if UploadsRoleAuth.query.filter_by(uploads_indexno=file_number, role_id=existing_role.id).first():
continue
new_auth_record = UploadsRoleAuth(uploads_indexno=file_number, role_id=existing_role.id)
db.session.add(new_auth_record)
something_added = True
if something_added:
db.session.commit()
if disallow:
for privilege in set(disallow):
existing_role = Role.query.filter_by(name=privilege).first()
if not existing_role:
logmessage("file_privilege_access: invalid privilege " + repr(privilege))
continue
UploadsRoleAuth.query.filter_by(uploads_indexno=file_number, role_id=existing_role.id).delete()
db.session.commit()
if disallow_all:
UploadsRoleAuth.query.filter_by(uploads_indexno=file_number).delete()
if not (allow or disallow or disallow_all):
result = list()
for auth in db.session.query(UploadsRoleAuth.id, Role.name).join(Role, UploadsRoleAuth.role_id == Role.id).filter(UploadsRoleAuth.uploads_indexno == file_number).all():
result.append(auth.name)
return result
content = fp.read()
if 'mimetype' in file_info and file_info['mimetype'] == 'application/json':
aref = json.loads(content)
elif 'extension' in file_info and file_info['extension'].lower() in ['yaml', 'yml']:
aref = yaml.load(content, Loader=yaml.FullLoader)
if type(aref) is dict and hasattr(self, 'group_id'):
the_group_id = re.sub(r'.*:', '', self.group_id)
if the_group_id in aref:
aref = aref[the_group_id]
if type(aref) is list:
nowtime = datetime.datetime.utcnow()
for entry in aref:
if 'independent' in entry:
new_entry = MachineLearning(group_id=self.group_id, independent=codecs.encode(pickle.dumps(entry['independent']), 'base64').decode(), dependent=codecs.encode(pickle.dumps(entry.get('dependent', None)), 'base64').decode(), modtime=nowtime, create_time=nowtime, active=True, key=entry.get('key', None), info=codecs.encode(pickle.dumps(entry['info']), 'base64').decode() if entry.get('info', None) is not None else None)
db.session.add(new_entry)
db.session.commit()
def add_to_training_set(self, independent, dependent, key=None, info=None):
def delete_training_set(self):
self._initialize()
MachineLearning.query.filter_by(group_id=self.group_id).all().delete()
db.session.commit()
def _train(self, indep, depend):