Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.x_lo = self.x_hi - self.duration
else:
self.x_hi = self.hourly_data.before(datetime.max)
if not self.x_hi:
self.x_hi = datetime.utcnow() # only if no hourly data
self.x_hi += timezone.utcoffset(self.x_hi)
if self.duration < timedelta(hours=6):
# set end of graph to start of the next minute after last item
self.x_hi += timedelta(seconds=55)
self.x_hi = self.x_hi.replace(second=0)
else:
# set end of graph to start of the next hour after last item
self.x_hi += timedelta(minutes=55)
self.x_hi = self.x_hi.replace(minute=0, second=0)
self.x_lo = self.x_hi - self.duration
self.utcoffset = timezone.utcoffset(self.x_hi)
# open gnuplot command file
self.tmp_files = []
cmd_file = os.path.join(self.work_dir, 'plot.cmd')
self.tmp_files.append(cmd_file)
of = codecs.open(cmd_file, 'w', encoding=self.encoding[0])
# write gnuplot set up
of.write('set encoding %s\n' % (self.encoding[1]))
lcl = locale.getlocale()
if lcl[0]:
of.write('set locale "%s.%s"\n' % lcl)
self.rows = self.get_default_rows()
self.cols = (self.plot_count + self.rows - 1) // self.rows
self.rows, self.cols = eval(self.graph.get_value(
'layout', '%d, %d' % (self.rows, self.cols)))
w, h = self.get_default_plot_size()
w = w * self.cols
context, self.context.work_dir)
# get daytime end hour
self.day_end_hour, self.use_dst = pywws.process.get_day_end_hour(
self.params)
# parse "cron" sections
self.cron = {}
for section in self.params._config.sections():
if section.split()[0] != 'cron':
continue
import croniter
self.cron[section] = croniter.croniter(
self.params.get(section, 'format', ''))
self.cron[section].get_prev()
last_update = self.status.get_datetime('last update', section)
if last_update:
last_update += timezone.utcoffset(last_update)
while self.cron[section].get_current(datetime) <= last_update:
self.cron[section].get_next()
# create service uploader objects
self.services = {}
for section in list(self.cron.keys()) + [
'live', 'logged', 'hourly', '12 hourly', 'daily']:
for name, options in self._parse_templates(section, 'services'):
if name in self.services:
continue
if os.path.exists(os.path.join(self.module_dir, name + '.py')):
sys.path.insert(0, self.module_dir)
mod = importlib.import_module(name)
del sys.path[0]
else:
mod = importlib.import_module('pywws.service.' + name)
self.services[name] = mod.ToService(context)
self.duration = timedelta(hours=24)
if self.x_lo:
self.x_lo = self._eval_time(self.x_lo)
if self.x_hi:
self.x_hi = self._eval_time(self.x_hi)
self.duration = self.x_hi - self.x_lo
else:
self.x_hi = self.x_lo + self.duration
elif self.x_hi:
self.x_hi = self._eval_time(self.x_hi)
self.x_lo = self.x_hi - self.duration
else:
self.x_hi = self.hourly_data.before(datetime.max)
if not self.x_hi:
self.x_hi = datetime.utcnow() # only if no hourly data
self.x_hi += timezone.utcoffset(self.x_hi)
if self.duration < timedelta(hours=6):
# set end of graph to start of the next minute after last item
self.x_hi += timedelta(seconds=55)
self.x_hi = self.x_hi.replace(second=0)
else:
# set end of graph to start of the next hour after last item
self.x_hi += timedelta(minutes=55)
self.x_hi = self.x_hi.replace(minute=0, second=0)
self.x_lo = self.x_hi - self.duration
self.utcoffset = timezone.utcoffset(self.x_hi)
# open gnuplot command file
self.tmp_files = []
cmd_file = os.path.join(self.work_dir, 'plot.cmd')
self.tmp_files.append(cmd_file)
of = codecs.open(cmd_file, 'w', encoding=self.encoding[0])
# write gnuplot set up
def _eval_time(self, time_str):
# get timestamp of last data item
result = self.hourly_data.before(datetime.max)
if not result:
result = datetime.utcnow() # only if no hourly data
result += timezone.utcoffset(result)
# set to start of the day
result = result.replace(hour=0, minute=0, second=0, microsecond=0)
# apply time string
return eval('result.replace(%s)' % time_str)
def _cron_due(self, now):
if not self.cron:
return []
# convert now to local time
local_now = now + timezone.utcoffset(now)
# make list of due sections
sections = []
for section in self.cron:
if self.cron[section].get_current(datetime) > local_now:
continue
sections.append(section)
while self.cron[section].get_current(datetime) <= local_now:
self.cron[section].get_next()
return sections