Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def populate_tables():
user_manager = UserManager(SQLAlchemyAdapter(db, UserModel, UserAuthClass=UserAuthModel), app)
admin_defaults = daconfig.get('default admin account', dict())
if 'email' not in admin_defaults:
admin_defaults['email'] = os.getenv('DA_ADMIN_EMAIL', 'admin@admin.com')
if 'nickname' not in admin_defaults:
admin_defaults['nickname'] = 'admin'
if 'first_name' not in admin_defaults:
admin_defaults['first_name'] = word('System')
if 'last_name' not in admin_defaults:
admin_defaults['last_name'] = word('Administrator')
if 'password' not in admin_defaults:
admin_defaults['password'] = os.getenv('DA_ADMIN_PASSWORD', 'password')
cron_defaults = daconfig.get('default cron account', {'nickname': 'cron', 'email': 'cron@admin.com', 'first_name': 'Cron', 'last_name': 'User'})
cron_defaults['active'] = False
user_role = get_role(db, 'user')
admin_role = get_role(db, 'admin')
cron_role = get_role(db, 'cron')
customer_role = get_role(db, 'customer')
developer_role = get_role(db, 'developer')
advocate_role = get_role(db, 'advocate')
trainer_role = get_role(db, 'trainer')
for user in UserModel.query.all():
if len(user.roles) == 0:
user.roles.append(user_role)
db.session.commit()
admin = get_user(db, admin_role, admin_defaults)
cron = get_user(db, cron_role, cron_defaults)
if admin.confirmed_at is None:
admin.confirmed_at = datetime.datetime.now()
if cron.confirmed_at is None:
cron.confirmed_at = datetime.datetime.now()
if USING_SUPERVISOR:
SUPERVISORCTL = daconfig.get('supervisorctl', 'supervisorctl')
container_role = ':' + os.environ.get('CONTAINERROLE', '') + ':'
if re.search(r':(web|celery|all):', container_role):
sys.stderr.write("Sending reset signal\n")
args = [SUPERVISORCTL, '-s', 'http://localhost:9001', 'start', 'reset']
result = subprocess.call(args)
else:
sys.stderr.write("Not sending reset signal because not web or celery\n")
else:
sys.stderr.write("update: touched wsgi file" + "\n")
wsgi_file = daconfig.get('webapp', '/usr/share/docassemble/webapp/docassemble.wsgi')
if os.path.isfile(wsgi_file):
with open(wsgi_file, 'a'):
os.utime(wsgi_file, None)
db.engine.dispose()
sys.exit(0)
db.session.commit()
admin = get_user(db, admin_role, admin_defaults)
cron = get_user(db, cron_role, cron_defaults)
if admin.confirmed_at is None:
admin.confirmed_at = datetime.datetime.now()
if cron.confirmed_at is None:
cron.confirmed_at = datetime.datetime.now()
db.session.commit()
add_dependencies(admin.id)
git_packages = Package.query.filter_by(type='git')
for package in git_packages:
if package.name in ['docassemble', 'docassemble.base', 'docassemble.webapp', 'docassemble.demo']:
package.giturl = None
package.gitsubdir = None
package.type = 'pip'
db.session.commit()
return
something_added = True
if something_added:
db.session.commit()
if do_delete:
UserDict.query.filter_by(key=user_code, filename=filename).delete()
db.session.commit()
files_to_delete = list()
for speaklist in SpeakList.query.filter_by(key=user_code, filename=filename).all():
if speaklist.upload is not None:
files_to_delete.append(speaklist.upload)
SpeakList.query.filter_by(key=user_code, filename=filename).delete()
db.session.commit()
for upload in Uploads.query.filter_by(key=user_code, yamlfile=filename, persistent=False).all():
files_to_delete.append(upload.indexno)
Uploads.query.filter_by(key=user_code, yamlfile=filename, persistent=False).delete()
db.session.commit()
ChatLog.query.filter_by(key=user_code, filename=filename).delete()
db.session.commit()
for short_code_item in Shortener.query.filter_by(uid=user_code, filename=filename).all():
for email in Email.query.filter_by(short=short_code_item.short).all():
for attachment in EmailAttachment.query.filter_by(email_id=email.id).all():
files_to_delete.append(attachment.upload)
Shortener.query.filter_by(uid=user_code, filename=filename).delete()
db.session.commit()
for file_number in files_to_delete:
the_file = SavedFile(file_number)
the_file.delete()
return
description = db.Column(db.String(255))
class UserRoles(db.Model):
__tablename__ = dbtableprefix + 'user_roles'
id = db.Column(db.Integer(), primary_key=True)
user_id = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'user.id', ondelete='CASCADE'), index=True)
role_id = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'role.id', ondelete='CASCADE'), index=True)
class UserDict(db.Model):
__tablename__ = dbtableprefix + "userdict"
indexno = db.Column(db.Integer(), primary_key=True)
filename = db.Column(db.Text(), index=True)
key = db.Column(db.String(250), index=True)
dictionary = db.Column(db.Text())
user_id = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'user.id', ondelete='CASCADE'))
encrypted = db.Column(db.Boolean(), nullable=False, server_default='1')
modtime = db.Column(db.DateTime())
db.Index(dbtableprefix + 'ix_userdict_key_filename', UserDict.key, UserDict.filename)
class UserDictKeys(db.Model):
__tablename__ = dbtableprefix + "userdictkeys"
indexno = db.Column(db.Integer(), primary_key=True)
filename = db.Column(db.Text(), index=True)
key = db.Column(db.String(250), index=True)
user_id = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'user.id', ondelete='CASCADE'), index=True)
temp_user_id = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'tempuser.id', ondelete='CASCADE'), index=True)
db.Index(dbtableprefix + 'ix_userdictkeys_key_filename', UserDict.key, UserDict.filename)
class TempUser(db.Model):
__tablename__ = dbtableprefix + 'tempuser'
hostname = db.Column(db.Text())
url = db.Column(db.Text())
start_time = db.Column(db.DateTime(), server_default=db.func.now())
role = db.Column(db.Text())
class MachineLearning(db.Model):
__tablename__ = dbtableprefix + "machinelearning"
id = db.Column(db.Integer(), primary_key=True)
group_id = db.Column(db.Text())
key = db.Column(db.Text(), index=True)
independent = db.Column(db.Text())
dependent = db.Column(db.Text())
info = db.Column(db.Text())
create_time = db.Column(db.DateTime())
modtime = db.Column(db.DateTime())
active = db.Column(db.Boolean(), nullable=False, server_default=false())
class Shortener(db.Model):
__tablename__ = dbtableprefix + "shortener"
id = db.Column(db.Integer(), primary_key=True)
short = db.Column(db.String(250), nullable=False, unique=True)
filename = db.Column(db.Text(), index=True)
uid = db.Column(db.String(250))
user_id = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'user.id', ondelete='CASCADE'))
temp_user_id = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'tempuser.id', ondelete='CASCADE'))
key = db.Column(db.Text(), index=True)
index = db.Column(db.Integer())
modtime = db.Column(db.DateTime(), server_default=db.func.now())
class Email(db.Model):
__tablename__ = dbtableprefix + "email"
id = db.Column(db.Integer(), primary_key=True)
class UploadsUserAuth(db.Model):
__tablename__ = dbtableprefix + "uploadsuserauth"
id = db.Column(db.Integer(), primary_key=True)
uploads_indexno = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'uploads.indexno', ondelete='CASCADE'), nullable=False, index=True)
user_id = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'user.id', ondelete='CASCADE'), index=True)
temp_user_id = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'tempuser.id', ondelete='CASCADE'), index=True)
class UploadsRoleAuth(db.Model):
__tablename__ = dbtableprefix + "uploadsroleauth"
id = db.Column(db.Integer(), primary_key=True)
uploads_indexno = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'uploads.indexno', ondelete='CASCADE'), nullable=False, index=True)
role_id = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'role.id', ondelete='CASCADE'), nullable=False, index=True)
class ObjectStorage(db.Model):
__tablename__ = dbtableprefix + "objectstorage"
id = db.Column(db.Integer(), primary_key=True)
key = db.Column(db.Text(), index=True)
value = db.Column(db.Text())
class SpeakList(db.Model):
__tablename__ = dbtableprefix + "speaklist"
id = db.Column(db.Integer(), primary_key=True)
filename = db.Column(db.Text(), index=True)
key = db.Column(db.String(250), index=True)
phrase = db.Column(db.Text())
question = db.Column(db.Integer())
type = db.Column(db.String(20))
language = db.Column(db.String(10))
dialect = db.Column(db.String(10))
upload = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'uploads.indexno', ondelete='CASCADE'))
encrypted = db.Column(db.Boolean(), nullable=False, server_default=true())
digest = db.Column(db.Text())
giturl = db.Column(db.String(255), nullable=True)
gitsubdir = db.Column(db.Text(), nullable=True)
upload = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'uploads.indexno', ondelete='CASCADE'))
package_auth = db.relationship('PackageAuth', uselist=False, primaryjoin="PackageAuth.package_id==Package.id")
version = db.Column(db.Integer(), server_default='1')
packageversion = db.Column(db.Text())
limitation = db.Column(db.Text())
dependency = db.Column(db.Boolean(), nullable=False, server_default='0')
core = db.Column(db.Boolean(), nullable=False, server_default='0')
active = db.Column(db.Boolean(), nullable=False, server_default='1')
gitbranch = db.Column(db.String(255), nullable=True)
class PackageAuth(db.Model):
__tablename__ = dbtableprefix + 'package_auth'
id = db.Column(db.Integer, primary_key=True, unique=True)
package_id = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'package.id', ondelete='CASCADE'))
user_id = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'user.id', ondelete='CASCADE'))
authtype = db.Column(db.String(255), server_default='owner')
class Install(db.Model):
__tablename__ = dbtableprefix + "install"
id = db.Column(db.Integer(), primary_key=True, unique=True)
hostname = db.Column(db.Text())
version = db.Column(db.Integer())
packageversion = db.Column(db.Text())
package_id = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'package.id', ondelete='CASCADE'))
def populate_tables():
user_manager = UserManager(SQLAlchemyAdapter(db, UserModel, UserAuthClass=UserAuthModel), app)
admin_defaults = daconfig.get('default admin account', dict())
if 'email' not in admin_defaults:
admin_defaults['email'] = os.getenv('DA_ADMIN_EMAIL', 'admin@admin.com')
if 'nickname' not in admin_defaults:
admin_defaults['nickname'] = 'admin'
if 'first_name' not in admin_defaults:
admin_defaults['first_name'] = word('System')
if 'last_name' not in admin_defaults:
admin_defaults['last_name'] = word('Administrator')
if 'password' not in admin_defaults:
admin_defaults['password'] = os.getenv('DA_ADMIN_PASSWORD', 'password')
cron_defaults = daconfig.get('default cron account', {'nickname': 'cron', 'email': 'cron@admin.com', 'first_name': 'Cron', 'last_name': 'User'})
cron_defaults['active'] = False
user_role = get_role(db, 'user')
admin_role = get_role(db, 'admin')
cron_role = get_role(db, 'cron')
from docassemble.webapp.db_object import db
from docassemble.base.config import daconfig, dbtableprefix
from sqlalchemy import true, false
import docassemble.webapp.users.models
class Uploads(db.Model):
__tablename__ = dbtableprefix + "uploads"
indexno = db.Column(db.Integer(), primary_key=True)
key = db.Column(db.String(250), index=True)
filename = db.Column(db.Text(), index=True)
yamlfile = db.Column(db.Text())
private = db.Column(db.Boolean(), nullable=False, server_default=true())
persistent = db.Column(db.Boolean(), nullable=False, server_default=false())
class UploadsUserAuth(db.Model):
__tablename__ = dbtableprefix + "uploadsuserauth"
id = db.Column(db.Integer(), primary_key=True)
uploads_indexno = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'uploads.indexno', ondelete='CASCADE'), nullable=False, index=True)
user_id = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'user.id', ondelete='CASCADE'), index=True)
temp_user_id = db.Column(db.Integer(), db.ForeignKey(dbtableprefix + 'tempuser.id', ondelete='CASCADE'), index=True)
class UploadsRoleAuth(db.Model):