Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@rollback_on_failure
def add_carto_app(self):
if not CartoviewApp.objects.app_exists(self.name):
CartoviewApp({
'name': self.name,
'active': True,
'order': self.get_app_order(),
'pending': True
})
CartoviewApp.save()
else:
carto_app = CartoviewApp.objects.get(self.name)
carto_app.pending = True
carto_app.commit()
CartoviewApp.save()
@rollback_on_failure
def add_carto_app(self):
if not CartoviewApp.objects.app_exists(self.name):
CartoviewApp({
'name': self.name,
'active': True,
'order': self.get_app_order(),
'pending': True
})
CartoviewApp.save()
else:
carto_app = CartoviewApp.objects.get(self.name)
carto_app.pending = True
carto_app.commit()
CartoviewApp.save()
@rollback_on_failure
def get_app_order(self):
apps = App.objects.all()
max_value = apps.aggregate(
Max('order'))['order__max'] if apps.exists() else 0
return max_value + 1
@rollback_on_failure
def _install_requirements(self):
try:
libs_dir = os.path.join(self.app_dir, 'libs')
req_installer = ReqInstaller(self.app_dir, target=libs_dir)
req_installer.install_all()
except BaseException as e:
if not (isinstance(e, ReqFileException)
or isinstance(e, ReqFilePermissionException)): # noqa
raise e
@rollback_on_failure
def extract_move_app(self, zipped_app):
extract_to = tempfile.mkdtemp()
libs_dir = None
zipped_app.extractall(extract_to)
if self.upgrade and os.path.exists(self.app_dir):
# move old version to temporary dir so that we can restore in
# case of failure
old_version_temp_dir = tempfile.mkdtemp()
shutil.move(self.app_dir, old_version_temp_dir)
old_lib_dir = os.path.join(old_version_temp_dir, 'libs')
if os.path.isdir(old_lib_dir) and os.path.exists(old_lib_dir):
libs_dir = old_lib_dir
self.new_app_dir = os.path.join(extract_to, self.name)
shutil.move(self.new_app_dir, settings.APPS_DIR)
if libs_dir:
shutil.copy(libs_dir, self.app_dir)
@rollback_on_failure
def add_app(self):
# save app configuration
app, created = App.objects.get_or_create(name=self.name)
if created:
if app.order is None or app.order == 0:
app.order = self.get_app_order()
self.add_carto_app()
app = self.app_serializer.get_app_object(app)
app.version = self.version.version
app.installed_by = self.user
app.store = AppStore.objects.filter(is_default=True).first()
app.save()
return app
@rollback_on_failure
def get_app_order(self):
apps = App.objects.all()
max_value = apps.aggregate(
Max('order'))['order__max'] if apps.exists() else 0
return max_value + 1
@rollback_on_failure
def _install_requirements(self):
try:
libs_dir = os.path.join(self.app_dir, 'libs')
req_installer = ReqInstaller(self.app_dir, target=libs_dir)
req_installer.install_all()
except BaseException as e:
if not (isinstance(e, ReqFileException)
or isinstance(e, ReqFilePermissionException)): # noqa
raise e
@rollback_on_failure
def extract_move_app(self, zipped_app):
extract_to = tempfile.mkdtemp()
libs_dir = None
zipped_app.extractall(extract_to)
if self.upgrade and os.path.exists(self.app_dir):
# move old version to temporary dir so that we can restore in
# case of failure
old_version_temp_dir = tempfile.mkdtemp()
shutil.move(self.app_dir, old_version_temp_dir)
old_lib_dir = os.path.join(old_version_temp_dir, 'libs')
if os.path.isdir(old_lib_dir) and os.path.exists(old_lib_dir):
libs_dir = old_lib_dir
self.new_app_dir = os.path.join(extract_to, self.name)
shutil.move(self.new_app_dir, settings.APPS_DIR)
if libs_dir:
shutil.copy(libs_dir, self.app_dir)
@rollback_on_failure
def add_app(self):
# save app configuration
app, created = App.objects.get_or_create(name=self.name)
if created:
if app.order is None or app.order == 0:
app.order = self.get_app_order()
self.add_carto_app()
app = self.app_serializer.get_app_object(app)
app.version = self.version.version
app.installed_by = self.user
app.store = AppStore.objects.filter(is_default=True).first()
app.save()
return app