Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Check subsecond request in start
start_micros = "{:.6f}".format(start).split('.')[1]
if start_found == True and start_micros != '000000':
use_microseconds = True
else:
# No decimal part, check subsecond request in length
start_micros = "{:.6f}".format(length).split('.')[1]
if length_found == True and start_micros != '000000':
use_microseconds = True
# Build UTC datetime start/stop from start timestamp with/without microseconds
if use_microseconds == False:
start_date = datetime.datetime.fromtimestamp(start, datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
stop_date = datetime.datetime.fromtimestamp(start + length, datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
else:
start_date = datetime.datetime.fromtimestamp(start, datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")
stop_date = datetime.datetime.fromtimestamp(start + length, datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")
# Prepare payload
_aggregate = PayloadBuilder().AGGREGATE(["all"]).chain_payload()
_and_where = PayloadBuilder(_aggregate).WHERE(["asset_code", "in", asset_code_list]).AND_WHERE([
"user_ts", ">=", str(start_date)], ["user_ts", "<=", str(stop_date)]).chain_payload()
_bucket = PayloadBuilder(_and_where).TIMEBUCKET('user_ts', bucket_size,
'YYYY-MM-DD HH24:MI:SS', 'timestamp').chain_payload()
payload = PayloadBuilder(_bucket).LIMIT(int(float(length / float(bucket_size)))).payload()
# Sort & timebucket modifiers can not be used in same payload
# payload = PayloadBuilder(limit).ORDER_BY(["user_ts", "desc"]).payload()
results = await _readings.query(payload)
response = results['rows']
def _translate_history_entry(entry, running):
if entry.start_time:
start_time = date_utils.astimezone(entry.start_time, timezone.utc).isoformat()
else:
start_time = None
return {
'id': entry.id,
'startTime': start_time,
'user': entry.user_name,
'script': entry.script_name,
'status': running_flag_to_status(running),
'exitCode': entry.exit_code
}
def _missing(self, s):
# clean out any gaps by unzipping the chained composites
Composite.clean(s)
# range we expect data for
finish = round_hour(dt.datetime.now(tz=dt.timezone.utc), up=True)
missing_sources = self.__missing_sources(s)
missing_recent = any(self.__missing_recent(s, constant.short_name, finish)
for constant in self.response_constants)
if missing_recent or missing_sources:
if missing_recent: log.info('Incomplete coverage (so will re-calculate)')
if missing_sources: log.info('Additional sources (so will re-calculate)')
self._delete_from(s)
start = round_hour(self.__start(s), up=False)
return [(start, finish)]
else:
return []
def get_calendar(self, query="Missing"):
endpoint = '/api/calendar/'
today = str(date.today())
last_days = str(date.today() - timedelta(days=self.server.missing_days))
future = str(date.today() + timedelta(days=self.server.future_days))
now = datetime.now(timezone.utc).astimezone().isoformat()
if query == "Missing":
params = {'start': last_days, 'end': today}
else:
params = {'start': today, 'end': future}
influx_payload = []
air_days = []
missing = []
req = self.session.prepare_request(Request('GET', self.server.url + endpoint, params=params))
get = connection_handler(self.session, req, self.server.verify_ssl)
if not get:
return
tv_shows = []
for show in get:
def now():
return datetime.now(timezone.utc)
def mtime(self):
#if the file does not exist
if not self.exists():
return None
timestamp = os.path.getmtime(self.url)
mtime = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)
return mtime
from datetime import datetime, timezone
from dateutil import parser
from .config import config
# the current machine Time Zone
_current_tz = datetime.now(timezone.utc).astimezone().tzinfo
def dt_to_iso_string(dt: datetime, tz=_current_tz) -> str:
_check_dt_instance(dt)
if not _dt_has_tz(dt):
dt = dt_convert_to_utc(dt)
if not _dt_is_utc(dt):
dt = dt.astimezone(timezone.utc)
return _dt_to_iso_string(dt)
def dt_from_iso_string(dt_str: str) -> datetime:
if dt_str is None:
raise TypeError('"dt_str" argument must not be None')
elif isinstance(dt_str, str) and dt_str.strip() == "":
raise TypeError('"dt_str" argument must not be blank')
except Exception as e:
msg = "Internal server Error: {}".format(e)
logging.getLogger().error(msg, exc_info=True)
return msg, 500
attributes = []
entries = []
entity_value = []
entity_type = []
entity_types = []
if entities:
for e in entities:
matched_attr = lookup_string_match(e, attr_name)
try:
f_date = dateutil.parser.isoparse(from_date).replace(tzinfo=timezone.utc).isoformat()
except Exception as ex:
f_date = ''
try:
t_date = dateutil.parser.isoparse(to_date).replace(tzinfo=timezone.utc).isoformat()
except Exception as ex:
t_date = ''
index = [f_date, t_date] if aggr_method and not aggr_period else e['index']
entity = {
'entityId': e['id'],
'index': index,
'values': matched_attr['values'] if matched_attr else []
}
if e['type'] not in entity_types:
entity_value = []
entity_value.append(entity)
if not row:
raise InvalidClaim()
(project_id, username, upper_sequence_num, lower_sequence_num,
ip_address, datetime_claimed) = row
item_stat['project'] = project_id
item_stat['username'] = username
item_stat['scanned'] = upper_sequence_num - lower_sequence_num + 1
item_stat['started'] = datetime_claimed.replace(
tzinfo=datetime.timezone.utc).timestamp()
query_args = []
# tz instead of utcnow() for Unix timestamp in UTC instead of local
time = datetime.datetime.now(datetime.timezone.utc)
item_stat['finished'] = time.timestamp()
for shortcode in results.keys():
url = results[shortcode]['url']
encoding = results[shortcode]['encoding']
query_args.append({
'project_id': project_id,
'shortcode': shortcode,
'url': url,
'encoding': encoding,
'datetime': time
})
if len(query_args) > 0:
query = insert(Result)
from datetime import timezone
from factory import (
DjangoModelFactory,
LazyAttribute,
Faker,
Trait,
)
from networkapi.landingpage.models import LandingPage, Signup
sentence_faker = Faker('sentence', nb_words=3, variable_nb_words=False)
past_datetime_faker = Faker('past_datetime', start_date='-30d', tzinfo=timezone.utc)
class SignupFactory(DjangoModelFactory):
class Meta:
model = Signup
exclude = (
'title_text',
'header_text',
'newsletter_text',
)
title = LazyAttribute(lambda o: o.title_text.rstrip('.'))
header = LazyAttribute(lambda o: o.header_text.rstrip('.'))
newsletter = LazyAttribute(lambda o: o.newsletter_text.rstrip('.'))
description = Faker('paragraph', nb_sentences=5, variable_nb_sentences=True)