Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if form.giturl.data:
giturl = form.giturl.data.strip()
packagename = re.sub(r'.*/', '', giturl)
if user_can_edit_package(giturl=giturl) and user_can_edit_package(pkgname=packagename):
#commands = ['install', '--egg', '--src=' + temp_directory, '--log-file=' + pip_log.name, '--upgrade', "--install-option=--user", 'git+' + giturl + '.git#egg=' + packagename]
commands = ['install', '--quiet', '--egg', '--src=' + tempfile.mkdtemp(), '--upgrade', '--log-file=' + pip_log.name, 'git+' + giturl + '.git#egg=' + packagename]
returnval = pip.main(commands)
if returnval > 0:
with open(pip_log.name) as x: logfilecontents = x.read()
flash("pip " + " ".join(commands) + "<pre>" + str(logfilecontents) + "</pre>", 'error')
else:
if Package.query.filter_by(name=packagename).first() is None and Package.query.filter_by(giturl=giturl).first() is None:
package_auth = PackageAuth(user_id=current_user.id)
package_entry = Package(name=packagename, giturl=giturl, package_auth=package_auth, version=1, active=True, type='github')
db.session.add(package_auth)
db.session.add(package_entry)
db.session.commit()
else:
package_entry = Package.query.filter_by(name=packagename).first()
if package_entry is not None:
package_entry.version += 1
package_entry.giturl = giturl
package_entry.type = 'github'
db.session.commit()
flash(word("Install successful"), 'success')
trigger_install(except_for=hostname)
restart_wsgi()
else:
flash(word("You do not have permission to install this package."), 'error')
else:
flash(word('You need to either supply a Git URL or upload a file.'), 'error')
return render_template('pages/update_package.html', form=form), 200
@app.route('/callback/')
def oauth_callback(provider):
if not current_user.is_anonymous:
return redirect(url_for('index'))
oauth = OAuthSignIn.get_provider(provider)
social_id, username, email = oauth.callback()
if social_id is None:
flash(word('Authentication failed.'), 'error')
return redirect(url_for('index'))
user = User.query.filter_by(social_id=social_id).first()
if not user:
user = User.query.filter_by(email=email).first()
if not user:
user = User(social_id=social_id, nickname=username, email=email, active=True)
db.session.add(user)
db.session.commit()
login_user(user, remember=False)
if not current_user.is_anonymous:
#update_user_id(session['uid'])
flash(word('Welcome! You are logged in as ') + email, 'success')
return redirect(url_for('index'))
def get_new_file_number(user_code, file_name, yaml_file_name=None):
new_upload = Uploads(key=user_code, filename=file_name, yamlfile=yaml_file_name)
db.session.add(new_upload)
db.session.commit()
return new_upload.indexno
# indexno = None
def get_unique_name(filename):
while True:
newname = ''.join(random.choice(string.ascii_letters) for i in range(32))
existing_key = UserDict.query.filter_by(key=newname).first()
if existing_key:
continue
# cur = conn.cursor()
# cur.execute("SELECT key from userdict where key=%s", [newname])
# if cur.fetchone():
# #logmessage("Key already exists in database")
# continue
new_user_dict = UserDict(key=newname, filename=filename, dictionary=codecs.encode(pickle.dumps(copy.deepcopy(initial_dict)), 'base64').decode())
db.session.add(new_user_dict)
db.session.commit()
# cur.execute("INSERT INTO userdict (key, filename, dictionary) values (%s, %s, %s);", [newname, filename, codecs.encode(pickle.dumps(initial_dict.copy()), 'base64').decode()])
# conn.commit()
return newname
the_file.save(zippath)
saved_file.save()
saved_file.finalize()
zippath += '.zip'
#commands = ['install', zippath, '--egg', '--no-index', '--src=' + tempfile.mkdtemp(), '--log-file=' + pip_log.name, '--upgrade', "--install-option=--user"]
commands = ['install', '--quiet', '--egg', '--no-index', '--src=' + tempfile.mkdtemp(), '--upgrade', '--log-file=' + pip_log.name, zippath]
returnval = pip.main(commands)
if returnval > 0:
with open(pip_log.name) as x: logfilecontents = x.read()
flash("pip " + " ".join(commands) + "<pre>" + str(logfilecontents) + '</pre>', 'error')
else:
existing_package = Package.query.filter_by(name=pkgname).first()
if existing_package is None:
package_auth = PackageAuth(user_id=current_user.id)
package_entry = Package(name=pkgname, package_auth=package_auth, upload=file_number, active=True, type='zip', version=1)
db.session.add(package_auth)
db.session.add(package_entry)
else:
existing_package.upload = file_number
existing_package.active = True
existing_package.type = 'zip'
existing_package.version += 1
db.session.commit()
flash(word("Install successful"), 'success')
trigger_install(except_for=hostname)
restart_wsgi()
else:
flash(word("You do not have permission to install this package."), 'error')
except Exception as errMess:
flash("Error processing upload: " + str(errMess), "error")
else:
if form.giturl.data:
#from docassemble.webapp.users import views
db_adapter = SQLAlchemyAdapter(db, User, UserAuthClass=UserAuth)
user_manager = UserManager(db_adapter, app, register_form=MyRegisterForm, user_profile_view_function=user_profile_page, logout_view_function=logout)
return(app)
setup_app(app, db)
lm = LoginManager(app)
lm.login_view = 'login'
supervisor_url = os.environ.get('SUPERVISOR_SERVER_URL', None)
if supervisor_url:
USING_SUPERVISOR = True
Supervisors.query.filter_by(hostname=hostname).delete()
db.session.commit()
new_entry = Supervisors(hostname=hostname, url="http://" + hostname + ":9001")
db.session.add(new_entry)
db.session.commit()
else:
USING_SUPERVISOR = False
sys_logger = logging.getLogger('docassemble')
sys_logger.setLevel(logging.DEBUG)
if LOGSERVER is None:
docassemble_log_handler = logging.FileHandler(filename=os.path.join(LOG_DIRECTORY, 'docassemble.log'))
sys_logger.addHandler(docassemble_log_handler)
else:
import logging.handlers
handler = logging.handlers.SysLogHandler(address=(LOGSERVER, 514), socktype=socket.SOCK_STREAM)
sys_logger.addHandler(handler)
LOGFORMAT = 'docassemble: ip=%(clientip)s i=%(yamlfile)s uid=%(session)s user=%(user)s %(message)s'
def get_unique_name(filename):
while True:
newname = ''.join(random.choice(string.ascii_letters) for i in range(32))
existing_key = UserDict.query.filter_by(key=newname).first()
if existing_key:
continue
# cur = conn.cursor()
# cur.execute("SELECT key from userdict where key=%s", [newname])
# if cur.fetchone():
# #logmessage("Key already exists in database")
# continue
new_user_dict = UserDict(key=newname, filename=filename, dictionary=codecs.encode(pickle.dumps(copy.deepcopy(initial_dict)), 'base64').decode())
db.session.add(new_user_dict)
db.session.commit()
# cur.execute("INSERT INTO userdict (key, filename, dictionary) values (%s, %s, %s);", [newname, filename, codecs.encode(pickle.dumps(initial_dict.copy()), 'base64').decode()])
# conn.commit()
return newname
def save_user_dict_key(user_code, filename):
the_record = UserDictKeys.query.filter_by(key=user_code, filename=filename, user_id=current_user.id).first()
if the_record:
found = True
else:
found = False
if not found:
new_record = UserDictKeys(key=user_code, filename=filename, user_id=current_user.id)
db.session.add(new_record)
db.session.commit()
# cur = conn.cursor()
# cur.execute("select indexno from userdictkeys where key=%s and filename=%s and user_id=%s", [user_code, filename, current_user.id])
# found = False
# for d in cur:
# found = True
# if not found:
# cur.execute("INSERT INTO userdictkeys (key, filename, user_id) values (%s, %s, %s)", [user_code, filename, current_user.id])
# conn.commit()
return
def install_git_package(packagename, giturl):
logmessage("install_git_package: " + packagename + " " + str(giturl))
if Package.query.filter_by(name=packagename, active=True).first() is None and Package.query.filter_by(giturl=giturl, active=True).first() is None:
package_auth = PackageAuth(user_id=current_user.id)
package_entry = Package(name=packagename, giturl=giturl, package_auth=package_auth, version=1, active=True, type='git')
db.session.add(package_auth)
db.session.add(package_entry)
db.session.commit()
else:
package_entry = Package.query.filter_by(name=packagename).first()
if package_entry is not None:
if package_entry.type == 'zip' and package_entry.upload is not None:
SavedFile(package_entry.upload).delete()
package_entry.version += 1
package_entry.giturl = giturl
package_entry.upload = None
package_entry.limitation = None
package_entry.type = 'git'
db.session.commit()
ok, logmessages = docassemble.webapp.update.check_for_updates()
if ok:
trigger_update(except_for=hostname)
def install_zip_package(packagename, file_number):
logmessage("install_zip_package: " + packagename + " " + str(file_number))
existing_package = Package.query.filter_by(name=packagename, active=True).first()
if existing_package is None:
package_auth = PackageAuth(user_id=current_user.id)
package_entry = Package(name=packagename, package_auth=package_auth, upload=file_number, active=True, type='zip', version=1)
db.session.add(package_auth)
db.session.add(package_entry)
else:
if existing_package.type == 'zip' and existing_package.upload is not None:
SavedFile(existing_package.upload).delete()
existing_package.upload = file_number
existing_package.active = True
existing_package.limitation = None
existing_package.type = 'zip'
existing_package.version += 1
db.session.commit()
ok, logmessages = docassemble.webapp.update.check_for_updates()
if ok:
trigger_update(except_for=hostname)
restart_wsgi()
flash(word("Install successful"), 'success')
else:
flash(word("Install not successful"), 'error')