Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from flask import Flask
from flask_apscheduler import APScheduler
from flask import request
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
app = Flask(__name__)
scheduler = APScheduler()
import sqlite3
class Config(object):
JOBS = [ ]
SCHEDULER_JOBSTORES = {
'default': SQLAlchemyJobStore(url='sqlite:///shebei.db')
}
SCHEDULER_EXECUTORS = {
'processpool': ProcessPoolExecutor(4)
}
SCHEDULER_JOB_DEFAULTS = {
'coalesce': False,
'max_instances': 3
}
SCHEDULER_API_ENABLED = True
def job1(a, b):
print(str(a) + ' ' + str(b))
def jobfromparm(jobargs):
id = jobargs['id']
func = jobargs['func']
args = eval(jobargs['args'])
trigger = jobargs['trigger']
seconds = jobargs['seconds']
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
app = Flask(__name__)
DB = SQLAlchemy(app)
app.config.from_pyfile('../conf/redis.conf')
app.config.from_pyfile('../conf/sql.conf')
app.config.from_pyfile('../conf/task.conf')
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
redis_host = app.config.get('REDIS_HOST')
redis_port = app.config.get('REDIS_PORT')
redis_password = app.config.get('REDIS_PASSWORD')
task_hosts = app.config.get('TASK_HOSTS')
RC = Redis = redis.StrictRedis(host=redis_host, port=redis_port,decode_responses=True)
HOST = socket.gethostbyname(socket.gethostname())
jobstores = {'default': SQLAlchemyJobStore(url=app.config.get('SQLALCHEMY_BINDS')['idc'])}
executors = {'default': ThreadPoolExecutor(50),'processpool': ProcessPoolExecutor(8)}
job_defaults = {'coalesce': False,'max_instances': 3,'misfire_grace_time':60}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=pytz.timezone('Asia/Shanghai'))
#单点后台执行
def scheduler_tasks():
date_time = datetime.datetime.now() + datetime.timedelta(minutes=1)
run_date = date_time.strftime('%H:%M').split(':')
scheduler.remove_all_jobs()
################################################################################################################################################
#scheduler.add_job(Task.zabbix_counts_task, 'cron', second='0', minute=run_date[1], hour=run_date[0],id=Task.zabbix_counts_task.__name__, replace_existing=True)
scheduler.add_job(Task.business_monitor_task, 'cron', second='0', minute='*', id=Task.business_monitor_task.__name__,replace_existing=True)
scheduler.add_job(Task.es_log_status, 'cron', second='0', minute='*', id=Task.es_log_status.__name__,replace_existing=True)
scheduler.add_job(Task.es_log_time, 'cron', second='0', minute='*', id=Task.es_log_time.__name__,replace_existing=True)
scheduler.add_job(Task.business_data, 'cron', second='0', minute='*', id=Task.business_data.__name__, replace_existing=True)
scheduler.add_job(Task.assets_infos, 'cron', second='0', minute='30',hour='4',id=Task.assets_infos.__name__,replace_existing=True)
scheduler.add_job(Task.auto_discovery_task, 'cron', second='0', minute='0', hour='*/4',id=Task.auto_discovery_task.__name__,replace_existing=True)
scheduler.add_job(Task.app_service_task, 'cron', second='0', minute='0', hour='*',id=Task.app_service_task.__name__, replace_existing=True)
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
def job():
with open('./log.txt', 'a') as fp:
now = datetime.datetime.now()
boot_time = psutil.boot_time()
result = '[{}][{}]\n'.format(now, boot_time)
fp.write(result)
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
sched = BackgroundScheduler(jobstores = jobstores, executors = executors, job_defaults = job_defaults, timezone = utc)
## sched = BlockingScheduler(jobstores = jobstores, executors = executors, job_defaults = job_defaults, timezone = utc)
## sched.add_job(job, 'cron', second = '*/2')
sched.start()
while True:
time.sleep(1)
from pytz import timezone
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from eclogue.model import db
store = MongoDBJobStore(database='eclogue', collection='schedule_jobs', client=db.client)
jobstores = {
'default': store,
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 1
}
tz = timezone('Asia/Shanghai')
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=tz)
# FLASKY_ADMIN = config["smtp"]["username"]
# MAIL_USE_SSL = config["smtp"]["ssl"]
# MAIL_DEBUG = True
SSL_REDIRECT = False
SECRET_KEY = 'QWERTYUIOPASDFGHJ'
# logging level
LOGGING_LEVEL = logging.INFO
AUTO_HOME = os.getcwd().replace('\\', '/') + '/.beats'
AUTO_ROBOT = []
executors = {
'default': {'type': 'threadpool', 'max_workers': 20},
'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
'coalesce': False,
'max_instances': 10
}
@staticmethod
def init_app(app):
pass
class DevelopmentConfig(Config):
DEBUG = True
return [x.as_dict for x in self.metrics.values()]
@property
def stats_influx(self):
points = []
for metric in self.metrics.values():
points.extend(metric.as_influx)
return points
class HttpServer(flask.Flask):
"""Our HTTP/API server."""
EXECUTORS = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
def __init__(self, name, ip, port, *args, **kwargs):
"""Constructor.
Args:
name: (str) name of Flask service
ip: (str) IP address to bind HTTP server
port: (int) TCP port for HTTP server to listen
"""
super(HttpServer, self).__init__(name, *args, **kwargs)
# Fixup the root path for Flask so it can find templates/*
root_path = os.path.abspath(os.path.dirname(__file__))
logging.debug('Setting root_path for Flask: %s', root_path)
self.root_path = root_path
self.targets = config.CollectorConfig()
if settings['scheduler.store'] == 'redis':
jobstores = {
'default': RedisJobStore(db=settings['scheduler.db'])
}
else:
jobstores = {
'default': SQLAlchemyJobStore(url=settings['scheduler.url'])
}
executors = {
'default': {
'type': settings['scheduler.executors.type'],
'max_workers': settings['scheduler.executors.max_workers']
},
'processpool': ProcessPoolExecutor(
max_workers=settings['scheduler.executors.processpool.max_workers']
)
}
job_defaults = {
'coalesce': False,
'max_instances': settings['scheduler.job_defaults.max_instances']
}
scheduler.configure(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone=timezone('UTC')
)
if settings['scheduler.autostart'] == 'true':
scheduler.start()
max_records_between_fsync = dict_storage_settings["max_records_between_fsync"]
max_file_count = dict_storage_settings["max_file_count"]
read_interval = dict_storage_settings["read_interval"] / 1000
max_read_record_count = dict_storage_settings["max_read_record_count"]
if dict_performance_settings:
number_of_processes = TBUtility.get_parameter(dict_performance_settings, "processes_to_use", 20)
number_of_workers = TBUtility.get_parameter(dict_performance_settings, "additional_threads_to_use", 5)
self.dict_ext_by_device_name = {}
self.dict_rpc_handlers_by_device = {}
self.lock = Lock()
# initialize scheduler
executors = {'default': ThreadPoolExecutor(number_of_workers)}
if number_of_processes > 1:
executors.update({'processpool': ProcessPoolExecutor(number_of_processes)})
self.scheduler = BackgroundScheduler(executors=executors)
self.scheduler.add_listener(TBGateway.listener, EVENT_JOB_ERROR)
self.scheduler.start()
# initialize client
self.mqtt_gateway = TBGatewayMqttClient(host, token, self)
# todo add tls
while not self.mqtt_gateway._TBDeviceMqttClient__is_connected:
try:
self.mqtt_gateway.connect(port=port, keepalive=keep_alive)
except Exception as e:
log.error(e)
log.debug("connecting to ThingsBoard...")
time.sleep(1)
# Define logger
logger = logging.getLogger(PROJECT_NAME)
# Since we use 'apscheduler' The first weekday is always monday.
days_to_ints = {'Sunday': '6', 'Monday': '0', 'Tuesday': '1',
'Wednesday': '2', 'Thursday': '3', 'Friday': '4', 'Saturday': '5'}
'''
jobstores = {
'mongo': {'type': 'mongodb'},
# 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}'''
executors = {
'default': {'type': 'threadpool', 'max_workers': 20},
'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler()
scheduler.configure(executors=executors, job_defaults=job_defaults, timezone=utc)
# scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
scheduler.start()
# define the function that is to be executed
def execute_scheduled_job(topic, payload):
# ###################使用装饰器添加任务#################
# 每隔5秒运行一次job3
@scheduler.scheduled_job('interval', seconds=5, id='job3')
def job3():
print 'job3 is running, Now is %s' % current_time()
# 每隔5秒运行一次job4
@scheduler.scheduled_job('cron', second='*/5', id='job4')
def job4():
print 'job4 is running, Now is %s' % current_time()
executors = {
'processpool': ProcessPoolExecutor(5),
'default': ThreadPoolExecutor(20)
}
job_defaults = {
'coalesce': False,
'max_instances': 5
}
scheduler.configure(executors=executors, job_defaults=job_defaults)
scheduler.start()