Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def reset_user_dict(user_code, filename):
UserDict.query.filter_by(key=user_code, filename=filename).delete()
db.session.commit()
UserDictKeys.query.filter_by(key=user_code, filename=filename).delete()
db.session.commit()
for upload in Uploads.query.filter_by(key=user_code, yamlfile=filename).all():
old_file = SavedFile(upload.indexno)
old_file.delete()
Uploads.query.filter_by(key=user_code, yamlfile=filename).delete()
db.session.commit()
Attachments.query.filter_by(key=user_code, filename=filename).delete()
db.session.commit()
SpeakList.query.filter_by(key=user_code, filename=filename).delete()
db.session.commit()
#cur = conn.cursor()
#cur.execute("DELETE FROM userdict where key=%s and filename=%s", [user_code, filename])
#cur.execute("DELETE FROM userdictkeys where key=%s and filename=%s", [user_code, filename])
#conn.commit()
return
def get_info_from_file_number(file_number):
result = dict()
upload = Uploads.query.filter_by(indexno=file_number, key=session['uid']).first()
if upload:
result['filename'] = upload.filename
result['extension'], result['mimetype'] = get_ext_and_mimetype(result['filename'])
result['savedfile'] = SavedFile(file_number, extension=result['extension'], fix=True)
result['path'] = result['savedfile'].path
result['fullpath'] = result['path'] + '.' + result['extension']
# cur = conn.cursor()
# cur.execute("SELECT filename FROM uploads where indexno=%s and key=%s", [file_number, session['uid']])
# for d in cur:
# result['path'] = get_path_from_file_number(file_number)
# result['filename'] = d[0]
# result['extension'], result['mimetype'] = get_ext_and_mimetype(result['filename'])
# result['fullpath'] = result['path'] + '.' + result['extension']
# break
# conn.commit()
if 'path' not in result:
logmessage("path is not in result for " + str(file_number))
return result
filename = result['path'] + '.' + result['extension']
if os.path.isfile(filename):
def install_git_package(packagename, giturl):
logmessage("install_git_package: " + packagename + " " + str(giturl))
if Package.query.filter_by(name=packagename, active=True).first() is None and Package.query.filter_by(giturl=giturl, active=True).first() is None:
package_auth = PackageAuth(user_id=current_user.id)
package_entry = Package(name=packagename, giturl=giturl, package_auth=package_auth, version=1, active=True, type='git')
db.session.add(package_auth)
db.session.add(package_entry)
db.session.commit()
else:
package_entry = Package.query.filter_by(name=packagename).first()
if package_entry is not None:
if package_entry.type == 'zip' and package_entry.upload is not None:
SavedFile(package_entry.upload).delete()
package_entry.version += 1
package_entry.giturl = giturl
package_entry.upload = None
package_entry.limitation = None
package_entry.type = 'git'
db.session.commit()
ok, logmessages = docassemble.webapp.update.check_for_updates()
if ok:
trigger_update(except_for=hostname)
restart_wsgi()
flash(word("Install successful"), 'success')
else:
flash(word("Install not successful"), 'error')
flash('pip log: ' + str(logmessages), 'info')
# pip_log = tempfile.NamedTemporaryFile()
# commands = ['install', '--quiet', '--egg', '--src=' + tempfile.mkdtemp(), '--upgrade', '--log-file=' + pip_log.name, 'git+' + giturl + '.git#egg=' + packagename]
def uninstall_package(packagename):
logmessage("uninstall_package: " + packagename)
existing_package = Package.query.filter_by(name=packagename, active=True).first()
if existing_package is None:
flash(word("Package did not exist"), 'error')
return
the_upload_number = existing_package.upload
the_package_type = existing_package.type
for package in Package.query.filter_by(name=packagename, active=True).all():
package.active = False
db.session.commit()
ok, logmessages = docassemble.webapp.update.check_for_updates()
if ok:
if the_package_type == 'zip' and the_upload_number is not None:
SavedFile(the_upload_number).delete()
trigger_update(except_for=hostname)
restart_wsgi()
flash(word("Uninstall successful"), 'success')
else:
flash(word("Uninstall not successful"), 'error')
flash('pip log: ' + str(logmessages), 'info')
logmessage(logmessages)
logmessage("uninstall_package: done")
return
SpeakList.query.filter_by(key=user_code, filename=filename).delete()
db.session.commit()
for upload in Uploads.query.filter_by(key=user_code, yamlfile=filename, persistent=False).all():
files_to_delete.append(upload.indexno)
Uploads.query.filter_by(key=user_code, yamlfile=filename, persistent=False).delete()
db.session.commit()
ChatLog.query.filter_by(key=user_code, filename=filename).delete()
db.session.commit()
for short_code_item in Shortener.query.filter_by(uid=user_code, filename=filename).all():
for email in Email.query.filter_by(short=short_code_item.short).all():
for attachment in EmailAttachment.query.filter_by(email_id=email.id).all():
files_to_delete.append(attachment.upload)
Shortener.query.filter_by(uid=user_code, filename=filename).delete()
db.session.commit()
for file_number in files_to_delete:
the_file = SavedFile(file_number)
the_file.delete()
return
if "@odata.nextLink" not in info:
break
r, content = try_request(http, info["@odata.nextLink"], "GET")
for section in ['static', 'templates', 'questions', 'modules', 'sources']:
sys.stderr.write("sync_with_onedrive: processing " + section + "\n")
if section not in subdirs:
worker_controller.functions.ReturnValue(ok=False, error="error accessing " + section + " in OneDrive", restart=False)
local_files[section] = set()
local_modtimes[section] = dict()
if section == 'questions':
the_section = 'playground'
elif section == 'templates':
the_section = 'playgroundtemplate'
else:
the_section = 'playground' + section
area = SavedFile(user_id, fix=True, section=the_section)
for f in area.list_of_files():
local_files[section].add(f)
local_modtimes[section][f] = os.path.getmtime(os.path.join(area.directory, f))
od_files[section] = set()
od_ids[section] = dict()
od_modtimes[section] = dict()
od_createtimes[section] = dict()
od_deleted[section] = set()
od_zero[section] = set()
od_dirlist[section] = dict()
if subdir_count[section] == 0:
sys.stderr.write("sync_with_onedrive: skipping " + section + " because empty on remote\n")
else:
r, content = try_request(http, "https://graph.microsoft.com/v1.0/me/drive/items/" + quote(subdirs[section]) + "/children?$select=id,name,deleted,fileSystemInfo,folder,size", "GET")
sys.stderr.write("sync_with_onedrive: processing " + section + ", which is " + text_type(subdirs[section]) + "\n")
while True:
user_dict['_internal']['modtime'] = datetime.datetime.utcnow()
try:
interview.assemble(user_dict, interview_status)
has_error = False
except Exception as errmess:
has_error = True
error_message = str(errmess)
error_type = type(errmess)
logmessage("Failed assembly with error type " + str(error_type) + " and message: " + error_message)
functions = set()
modules = set()
classes = set()
fields_used = set()
names_used = set()
names_used.update(interview.names_used)
area = SavedFile(self.user_id, fix=True, section='playgroundmodules')
avail_modules = set([re.sub(r'.py$', '', f) for f in os.listdir(area.directory) if os.path.isfile(os.path.join(area.directory, f))])
for question in interview.questions_list:
names_used.update(question.mako_names)
names_used.update(question.names_used)
names_used.update(question.fields_used)
fields_used.update(question.fields_used)
for val in interview.questions:
names_used.add(val)
fields_used.add(val)
for val in user_dict:
if type(user_dict[val]) is types.FunctionType:
functions.add(val)
elif type(user_dict[val]) is types.TypeType or type(user_dict[val]) is types.ClassType:
classes.add(val)
elif type(user_dict[val]) is types.ModuleType:
modules.add(val)
def absolute_filename(the_file):
#logmessage("Running absolute filename")
if current_user.is_authenticated and not current_user.is_anonymous:
match = re.match(r'^/playground/(.*)', the_file)
if match:
filename = re.sub(r'[^A-Za-z0-9]', '', match.group(1))
playground = SavedFile(current_user.id, section='playground', fix=True, filename=filename)
return playground
return(None)
def fix_ml_files(playground_number, current_project):
playground = SavedFile(playground_number, section='playgroundsources', fix=False)
changed = False
for filename in playground.list_of_files():
if re.match(r'^ml-.*\.json', filename):
playground.fix()
try:
if write_ml_source(playground, playground_number, current_project, filename, finalize=False):
changed = True
except:
logmessage("Error writing machine learning source file " + str(filename))
if changed:
playground.finalize()
playground = SavedFile(match.group(1), section='playground', fix=True, filename=filename, subdir=match.group(2))
return playground
match = re.match(r'^/playgroundtemplate/([0-9]+)/([A-Za-z0-9]+)/(.*)', the_file)
if match:
filename = re.sub(r'[^A-Za-z0-9\-\_\. ]', '', match.group(3))
playground = SavedFile(match.group(1), section='playgroundtemplate', fix=True, filename=filename, subdir=match.group(2))
return playground
match = re.match(r'^/playgroundstatic/([0-9]+)/([A-Za-z0-9]+)/(.*)', the_file)
if match:
filename = re.sub(r'[^A-Za-z0-9\-\_\. ]', '', match.group(3))
playground = SavedFile(match.group(1), section='playgroundstatic', fix=True, filename=filename, subdir=match.group(2))
return playground
match = re.match(r'^/playgroundsources/([0-9]+)/([A-Za-z0-9]+)/(.*)', the_file)
if match:
filename = re.sub(r'[^A-Za-z0-9\-\_\. ]', '', match.group(3))
playground = SavedFile(match.group(1), section='playgroundsources', fix=True, filename=filename, subdir=match.group(2))
write_ml_source(playground, match.group(1), match.group(2), filename)
return playground
return(None)