Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from crontab import CronTab
from bs4 import BeautifulSoup
import urllib2
from xml.dom.minidom import parse, parseString
import sys
from math import ceil
from sys import argv
import Foundation
import objc
import AppKit
import subprocess
import pickle
import time
NSUserNotification = objc.lookUpClass('NSUserNotification')
NSUserNotificationCenter = objc.lookUpClass('NSUserNotificationCenter')
tab = CronTab(user=getpass.getuser())
def notify(clear, title, subtitle, info_text, url, delay=0, sound=False, userInfo={}):
#Notification Triggering Function
notification = NSUserNotification.alloc().init()
notification.setTitle_(title)
notification.setSubtitle_(subtitle)
notification.setInformativeText_(info_text)
notification.setHasActionButton_(True)
notification.setActionButtonTitle_("View")
notification.setUserInfo_({"action":"open_url", "value":url})
if sound:
notification.setSoundName_("NSUserNotificationDefaultSoundName")
notification.setDeliveryDate_(Foundation.NSDate.dateWithTimeInterval_sinceDate_(delay, Foundation.NSDate.date()))
center = NSUserNotificationCenter.defaultUserNotificationCenter()
center.setDelegate_(notification)
if clear == True:
NSUserNotificationCenter.defaultUserNotificationCenter().removeAllDeliveredNotifications()
session.get('task_query'), session.get('form_type'), session.get('task_description'),
session.get('task_frequency'), session.get('task_limit'), "Stopped",
Current_Timestamp, Current_Timestamp,))
Connection.commit()
time.sleep(1)
if session.get('task_frequency'):
Cursor.execute("SELECT * FROM tasks WHERE query = %s AND plugin = %s AND description = %s AND frequency = %s AND task_limit = %s AND status = %s AND created_at = %s AND updated_at = %s;", (
session.get('task_query'), session.get('form_type'), session.get('task_description'),
session.get('task_frequency'), str(session.get('task_limit')),
"Stopped", str(Current_Timestamp), str(Current_Timestamp),))
result = Cursor.fetchone()
current_task_id = result[0]
try:
my_cron = CronTab(user=getpass.getuser())
job = my_cron.new(command=f'/usr/bin/python3 {File_Path}/plugin_caller.py -t {str(current_task_id)}')
job.setall(session.get('task_frequency'))
my_cron.write()
Message = f"Task ID {(current_task_id)} created by {session.get('user')}."
app.logger.warning(Message)
Create_Event(Message)
except:
Frequency_Error = f"Task created but no cronjob was created due to the supplied frequency being invalid, please double check the frequency for task ID {str(session.get('task_id'))} and use the \"Edit\" button to update it in order for the cronjob to be created."
session['form_step'] = 0
Cursor.execute("SELECT * FROM tasks")
results = Cursor.fetchall()
if Frequency_Error:
return render_template('tasks.html', username=session.get('user'),
app_log.error('%s (thread): %s', name, exception)
def run_function(*args, **kwargs):
future = threadpool.submit(fn, *args, **kwargs)
future.add_done_callback(on_done)
return future
self.function = run_function
# Run on schedule if any of the schedule periods are specified
periods = 'minutes hours dates months weekdays years'.split()
if any(schedule.get(key) for key in periods):
# Convert all valid values into strings (e.g. 30 => '30'), and ignore any spaces
cron = (str(schedule.get(key, '*')).replace(' ', '') for key in periods)
self.cron_str = ' '.join(cron)
self.cron = CronTab(self.cron_str)
self.call_later()
elif not schedule.get('startup'):
app_log.warning('schedule:%s has no schedule nor startup', name)
# Run now if the task is to be run on startup. Don't re-run if the config was reloaded
startup = schedule.get('startup')
if startup == '*' or (startup is True and not ioloop_running(self.ioloop)):
self.function()
def add_cron():
"""Add job to crontab to set alarm modus or trigger recording."""
if os.name == 'nt':
log('Cronjob'.ljust(17) + ' | ' + 'ERROR'.ljust(8) + ' | Not supported on windows OS', 3, 1)
elif args.modus is None and args.record is None:
log('Cronjob'.ljust(17) + ' | ' + 'ERROR'.ljust(8) + ' | Requires --modus or --record', 3, 1)
if args.modus:
action = ' --modus ' + args.modus + ' '
else:
action = ' --record ' + args.record + ' '
if istimeformat(args.cronjob):
cron = CronTab(user=True)
now = datetime.datetime.now()
timer = now.replace(hour=time.strptime(args.cronjob, '%H:%M')[3], minute=time.strptime(args.cronjob, '%H:%M')[4], second=0, microsecond=0)
job = cron.new('gigasetelements-cli -u ' + args.username + ' -p ' + args.password +
action, comment='added by gigasetelements-cli on ' + str(now)[:16])
job.month.on(datetime.datetime.now().strftime('%-m'))
if now < timer:
job.day.on(datetime.datetime.now().strftime('%-d'))
else:
job.day.on(str((int(datetime.datetime.now().strftime('%-d')) + 1)))
timer = now.replace(day=(int(datetime.datetime.now().strftime('%-d')) + 1), hour=time.strptime(args.cronjob, '%H:%M')
[3], minute=time.strptime(args.cronjob, '%H:%M')[4], second=0, microsecond=0)
job.hour.on(time.strptime(args.cronjob, '%H:%M')[3])
job.minute.on(time.strptime(args.cronjob, '%H:%M')[4])
cron.write()
log('Cronjob'.ljust(17) + ' | ' + color('scheduled'.ljust(8)) + ' |' + action + '| ' + timer.strftime('%A %d %B %Y %H:%M'), 0, 1)
else:
Cursor.execute(PSQL_Select_Query)
results = Cursor.fetchall()
return redirect('tasks.html', username=session.get('user'), is_admin=session.get('is_admin'), results=results)
elif 'delete_id' in request.form:
try:
del_id = int(request.form['delete_id'])
PSQL_Select_Query = "SELECT frequency FROM tasks WHERE task_id = %s"
Cursor.execute(PSQL_Select_Query, (del_id,))
result = Cursor.fetchone()
if result:
try:
my_cron = CronTab(user=getpass.getuser())
for job in my_cron:
if job.command == '/usr/bin/python3 ' + File_Path + '/plugin_caller.py -t ' + str(del_id):
my_cron.remove(job)
my_cron.write()
except:
PSQL_Select_Query = "SELECT * FROM tasks"
Cursor.execute(PSQL_Select_Query)
results = Cursor.fetchall()
return render_template('tasks.html', username=session.get('user'), form_step=session.get('form_step'),
is_admin=session.get('is_admin'), results=results,
error="Failed to remove task ID " + str(
del_id) + " from crontab.")
if not os.access(LOGFILE, os.W_OK):
import getpass
curr_user = getpass.getuser()
logging.warning("'{0}' does not have write permissions to {1}. Crontab jobs will not run.".format(curr_user, LOGFILE))
else:
manage_path = locate_file('manage.py') # get the file path of 'manage.py'
if manage_path is None:
raise Exception('"manage.py" not found')
output = subprocess.check_output(['which', 'python']) # get the python path used by the current process
python_path = re.sub(r"(?<=[a-z])\r?\n", "", output) # remove newlines from output...
cron = CronTab(user=user) # get cron object for managing crontab jobs
# stop jobs to prevent job duplication
if len(cron):
stop_jobs(cron)
"""
Create crontab job for scheduled hydroshare file uploads
Example of what the crontab job would look like from this command:
0 */5 * * * python manage.py update_hydroshare_resource_files --settings=WebSDL.linux_sandbox >> \
/crontab.log 2>> /crontab.log # __odm2websdl__upload_hydroshare_files
"""
command_name = 'update_hydroshare_resource_files'
job = cron.new(command="""{python} {manage} {command} --settings={settings} >> {logfile} 2>> {logfile}""".format(
python=python_path,
manage=manage_path,
command=command_name,
def make_cron_from(income):
cron_request = CronTab(tab=income)
if len(cron_request) == 1:
print("requested cron is valid")
cron_request = cron_request[0]
cronslice = cron_request.slices
croncommand = cron_request.command
croncomment = cron_request.comment
cronenabled = cron_request.enabled
new_job = cron.new(command=croncommand, comment=croncomment)
if cronslice == "@reboot":
#print("Reboot script")
new_job.every_reboot()
else:
#print("not a reboot script...")
new_job.setall(cronslice)
new_job.enabled = cronenabled
return new_job
def __init__(self, loc, tabs=None):
for username in self.listdir(loc):
tab = self.generate(loc, username)
if tab:
self.append(tab)
if not self:
tab = CronTab(user=True)
if tab:
self.append(tab)