Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.params = context.params
self.status = context.status
self.raw_data = context.raw_data
# connect to weather station
self.ws = WeatherStation(context=context)
# check computer clock isn't earlier than last stored data
self.last_stored_time = self.raw_data.before(datetime.max) or datetime.min
if datetime.utcnow() < self.last_stored_time:
raise ValueError('Computer time is earlier than last stored data')
# infer pointer of last stored data
saved_ptr = self.status.get('data', 'ptr')
if saved_ptr:
saved_ptr, sep, saved_date = saved_ptr.partition(',')
if saved_ptr and saved_date:
saved_ptr = int(saved_ptr, 16)
saved_date = WSDateTime.from_csv(saved_date)
saved_date = self.raw_data.nearest(saved_date)
while saved_date < self.last_stored_time:
saved_date = self.raw_data.after(saved_date + SECOND)
saved_ptr = self.ws.inc_ptr(saved_ptr)
while saved_date > self.last_stored_time:
saved_date = self.raw_data.before(saved_date - SECOND)
saved_ptr = self.ws.dec_ptr(saved_ptr)
else:
saved_ptr = None
self.last_stored_ptr = saved_ptr
self.check_fixed_block()
else:
round_time = None
elif command[0] == 'jump':
prevdata = data
idx, valid_data = jump(idx, int(command[1]))
data = data_set[idx]
elif command[0] == 'goto':
prevdata = data
time_str = command[1]
if '%' in time_str:
if local_time:
lcl = timezone.to_local(idx)
else:
lcl = timezone.to_utc(idx)
time_str = lcl.strftime(time_str)
new_idx = pywws.weatherstation.WSDateTime.from_csv(time_str)
if local_time:
new_idx = timezone.to_naive(timezone.localize(new_idx))
new_idx = data_set.after(new_idx)
if new_idx:
idx = new_idx
data = data_set[idx]
valid_data = True
else:
valid_data = False
elif command[0] == 'loop':
loop_count = int(command[1])
loop_start = tmplt.tell()
elif command[0] == 'endloop':
loop_count -= 1
if valid_data and loop_count > 0:
tmplt.seek(loop_start, 0)
def get_datetime(self, section, option, default=None):
result = self.get(section, option, default)
if result:
return WSDateTime.from_csv(result)
return result
'wind_gust' : float,
'wind_gust_t' : WSDateTime.from_csv,
'wind_dir' : float,
'rain' : float,
'rain_days' : int,
'illuminance_ave' : float,
'illuminance_max_lo' : float,
'illuminance_max_lo_t' : WSDateTime.from_csv,
'illuminance_max_hi' : float,
'illuminance_max_hi_t' : WSDateTime.from_csv,
'illuminance_max_ave' : float,
'uv_ave' : float,
'uv_max_lo' : int,
'uv_max_lo_t' : WSDateTime.from_csv,
'uv_max_hi' : int,
'uv_max_hi_t' : WSDateTime.from_csv,
'uv_max_ave' : float,
}
def _get_cache_path(self, target_date):
# one file per year
path = os.path.join(self._root_dir,
target_date.strftime("%Y-01-01.txt"))
lo = target_date.replace(month=1, day=1)
if lo.year < MAXYEAR:
hi = lo.replace(year=lo.year+1)
else:
hi = lo
lo = hi.replace(year=hi.year-1)
return path, lo, hi