Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main():
with app.app_context():
if daconfig.get('use alembic', True):
packagedir = pkg_resources.resource_filename(pkg_resources.Requirement.parse('docassemble.webapp'), 'docassemble/webapp')
if not os.path.isdir(packagedir):
sys.exit("path for running alembic could not be found")
from alembic.config import Config
from alembic import command
alembic_cfg = Config(os.path.join(packagedir, 'alembic.ini'))
alembic_cfg.set_main_option("sqlalchemy.url", alchemy_connection_string())
alembic_cfg.set_main_option("script_location", os.path.join(packagedir, 'alembic'))
if not db.engine.has_table(dbtableprefix + 'alembic_version'):
sys.stderr.write("Creating alembic stamp\n")
command.stamp(alembic_cfg, "head")
if db.engine.has_table(dbtableprefix + 'user'):
sys.stderr.write("Running alembic upgrade\n")
command.upgrade(alembic_cfg, "head")
#db.drop_all()
try:
success = True
else:
sys.stderr.write("SSL destination directory not known")
sys.exit(1)
if success:
return
if certs_location is None:
if os.path.isdir('/usr/share/docassemble/certs'):
certs_location = '/usr/share/docassemble/certs'
else:
return
if not os.path.isdir(certs_location):
sys.stderr.write("certs directory " + str(certs_location) + " does not exist")
sys.exit(1)
import shutil
dest = daconfig.get('cert install directory', '/etc/ssl/docassemble')
if dest:
if os.path.isdir(dest):
shutil.rmtree(dest)
shutil.copytree(certs_location, dest)
for root, dirs, files in os.walk(dest):
for the_file in files:
os.chmod(os.path.join(root, the_file), stat.S_IRUSR)
else:
sys.stderr.write("SSL destination directory not known")
sys.exit(1)
return
def clear_old_interviews():
#sys.stderr.write("clear_old_interviews: starting\n")
try:
interview_delete_days = int(docassemble.base.config.daconfig.get('interview delete days', 90))
except:
sys.stderr.write("Error in configuration for interview delete days\n")
interview_delete_days = 0
days_by_filename = dict()
if 'interview delete days by filename' in docassemble.base.config.daconfig:
try:
for filename, days in docassemble.base.config.daconfig['interview delete days by filename'].items():
assert isinstance(filename, string_types)
days_by_filename[filename] = int(days)
except:
sys.stderr.write("Error in configuration for interview delete days by filename\n")
nowtime = datetime.datetime.utcnow()
#sys.stderr.write("clear_old_interviews: days is " + str(interview_delete_days) + "\n")
for filename, days in days_by_filename.items():
last_index = -1
while True:
subq = db.session.query(UserDict.key, UserDict.filename, db.func.max(UserDict.indexno).label('indexno')).filter(UserDict.indexno > last_index, UserDict.filename == filename).group_by(UserDict.filename, UserDict.key).subquery()
results = db.session.query(UserDict.indexno, UserDict.key, UserDict.filename, UserDict.modtime).join(subq, and_(subq.c.indexno == UserDict.indexno)).order_by(UserDict.indexno).limit(1000)
if results.count() == 0:
break
stale = list()
for record in results:
last_index = record.indexno
import json
import iso8601
import datetime
import pytz
import traceback
from subprocess import call
from requests.utils import quote
from docassemble.webapp.files import SavedFile
from io import open
if os.environ.get('SUPERVISOR_SERVER_URL', None):
USING_SUPERVISOR = True
else:
USING_SUPERVISOR = False
WEBAPP_PATH = daconfig.get('webapp', '/usr/share/docassemble/webapp/docassemble.wsgi')
container_role = ':' + os.environ.get('CONTAINERROLE', '') + ':'
ONEDRIVE_CHUNK_SIZE = 2000000
class WorkerController(object):
pass
backend = daconfig.get('redis', None)
if backend is None:
backend = 'redis://localhost'
broker = daconfig.get('rabbitmq', None)
if broker is None:
broker = 'pyamqp://guest@' + socket.gethostname() + '//'
SUPERVISORCTL = daconfig.get('supervisorctl', 'supervisorctl')
def ls_submit_online_intake(params, task=None):
"""Looks in config for legal server key, subkeys servername, username, and password
then calls _ls_submit_online_intake with those values"""
servername = daconfig.get('legal server',{}).get('servername')
username = daconfig.get('legal server',{}).get('username')
password = daconfig.get('legal server',{}).get('password')
return _ls_submit_online_intake(servername, username, password, params,task=task)
from docassemble.base.functions import word, currency_symbol, url_action, comma_and_list, server
from docassemble.base.filter import markdown_to_html, get_audio_urls, get_video_urls, audio_control, video_control, noquote, to_text, my_escape
from docassemble.base.parse import Question, debug
from docassemble.base.logger import logmessage
from docassemble.base.config import daconfig
import urllib
import sys
import os
import re
import json
import random
import sys
import codecs
DECORATION_SIZE = daconfig.get('decoration size', 2.0)
DECORATION_UNITS = daconfig.get('decoration units', 'em')
BUTTON_ICON_SIZE = daconfig.get('button icon size', 4.0)
BUTTON_ICON_UNITS = daconfig.get('button icon units', 'em')
if daconfig.get('button size', 'large') == 'large':
BUTTON_CLASS = 'btn-lg btn-da'
else:
BUTTON_CLASS = 'btn-da'
def tracker_tag(status):
output = ''
output += ' <input value="' + server.generate_csrf() + '" name="csrf_token" type="hidden">\n'
if len(status.next_action):
output += ' <input value=" + myb64doublequote(json.dumps(status.next_action)) + " name="_next_action" type="hidden">\n'
if status.question.name:
output += ' <input value="' + status.question.name + '" name="_question_name" type="hidden">\n'
# if 'orig_action' in status.current_info:
def background_action(yaml_filename, user_info, session_code, secret, url, url_root, action, extra=None):
if url_root is None:
url_root = daconfig.get('url root', 'http://localhost') + daconfig.get('root', '/')
if url is None:
url = url_root + 'interview'
time.sleep(1.0)
if not hasattr(worker_controller, 'loaded'):
initialize_db()
worker_controller.functions.reset_local_variables()
worker_controller.functions.set_uid(session_code)
with worker_controller.flaskapp.app_context():
with worker_controller.flaskapp.test_request_context(base_url=url_root, path=url):
if not str(user_info['the_user_id']).startswith('t'):
worker_controller.login_user(worker_controller.get_user_object(user_info['theid']), remember=False)
sys.stderr.write("background_action: yaml_filename is " + str(yaml_filename) + " and session code is " + str(session_code) + " and action is " + repr(action) + "\n")
worker_controller.set_request_active(False)
if action['action'] == 'incoming_email':
if 'id' in action['arguments']:
action['arguments'] = dict(email=worker_controller.retrieve_email(action['arguments']['id']))
import sys
import re
from flask_user.forms import RegisterForm, LoginForm, password_validator, unique_email_validator
from flask_wtf import FlaskForm
from wtforms import DateField, StringField, SubmitField, ValidationError, BooleanField, SelectField, SelectMultipleField, HiddenField, PasswordField, validators, TextAreaField
from wtforms.validators import DataRequired, Email, Optional
from wtforms.widgets import PasswordInput
from docassemble.base.functions import word
from docassemble.base.config import daconfig
from flask_login import current_user
import email.utils
try:
import ldap
except ImportError:
if 'ldap login' not in daconfig:
daconfig['ldap login'] = dict()
daconfig['ldap login']['enable'] = False
def fix_nickname(form, field):
field.data = form.first_name.data + ' ' + form.last_name.data
return
class MySignInForm(LoginForm):
def validate(self):
from docassemble.webapp.daredis import r
from flask import request, abort
key = 'da:failedlogin:ip:' + str(request.remote_addr)
failed_attempts = r.get(key)
if failed_attempts is not None and int(failed_attempts) > daconfig['attempt limit']:
abort(404)
if daconfig['ldap login'].get('enable', False):
def main():
from docassemble.base.config import daconfig, S3_ENABLED, s3_config, AZURE_ENABLED, azure_config
certs_location = daconfig.get('certs', None)
cloud = None
prefix = None
if S3_ENABLED:
import docassemble.webapp.amazon
my_config = copy.deepcopy(s3_config)
if certs_location is None:
cloud = docassemble.webapp.amazon.s3object(my_config)
prefix = 'certs/'
else:
m = re.search(r'^s3://([^/]+)/(.*)', certs_location)
if m:
prefix = m.group(2)
my_config['bucket'] = m.group(1)
cloud = docassemble.webapp.amazon.s3object(my_config)
elif AZURE_ENABLED:
import docassemble.webapp.microsoft
def check_for_config():
from docassemble.base.config import daconfig
import docassemble.webapp.cloud
cloud = docassemble.webapp.cloud.get_cloud()
if cloud is not None:
key = cloud.get_key('config.yml')
if key.does_exist:
key.get_contents_to_filename(daconfig['config file'])