Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_add_occupied_timewindow(self, alchemy_store, fact, alchemy_fact):
"""
Make sure that passing a fact with a timewindow that already has a fact raises error.
"""
fact.start = alchemy_fact.start - datetime.timedelta(days=4)
fact.end = alchemy_fact.start + datetime.timedelta(minutes=15)
with pytest.raises(ValueError):
alchemy_store.facts._add(fact)
frozen_time.tick(datetime.timedelta(seconds=11))
start = timeit.default_timer()
with pytest.raises(MessageReceiveTimeout) as error_context:
core.receive_message('test_expired_message')
elapsed = timeit.default_timer() - start
assert 0 < elapsed < 0.1 # This shouldn't actually take 3 seconds, it should be instant
assert 'expired' in error_context.value.args[0]
frozen_time.tick(datetime.timedelta(seconds=1))
request_id = 19
core.send_message('test_expired_message', request_id, {}, {'test': 'payload'})
frozen_time.tick(datetime.timedelta(seconds=10))
start = timeit.default_timer()
response = core.receive_message('test_expired_message')
elapsed = timeit.default_timer() - start
assert 0 < elapsed < 0.1
assert response[0] == request_id
def get_time_attr_map(t):
"""
now -> |
hour ago -> |
day ago -> |
|--------------|--------------------|---------------------|
"""
now = datetime.datetime.now()
if t + datetime.timedelta(hours=3) > now:
return get_map("main_list_white")
if t + datetime.timedelta(days=3) > now:
return get_map("main_list_lg")
else:
return get_map("main_list_dg")
:return:
Returns a :class:`datetime.date`
"""
if not 0 < week < 54:
raise ValueError('Invalid week: {}'.format(week))
if not 0 < day < 8: # Range is 1-7
raise ValueError('Invalid weekday: {}'.format(day))
# Get week 1 for the specific year:
jan_4 = date(year, 1, 4) # Week 1 always has January 4th in it
week_1 = jan_4 - timedelta(days=jan_4.isocalendar()[2] - 1)
# Now add the specific number of weeks and days to get what we want
week_offset = (week - 1) * 7 + (day - 1)
return week_1 + timedelta(days=week_offset)
def GetRandomDatetime():
"""Return a datetime in the next week."""
seconds_offset = random.randint(0, 60 * 60 * 24 * 7)
dt = datetime.today() + timedelta(seconds=seconds_offset)
return dt.replace(second=0, microsecond=0)
def do_precip12(ts, ds):
"""Compute the 24 Hour precip at 12 UTC, we do some more tricks though"""
offset = iemre.daily_offset(ts)
ets = utc(ts.year, ts.month, ts.day, 12)
sts = ets - datetime.timedelta(hours=24)
offset1 = iemre.hourly_offset(sts)
offset2 = iemre.hourly_offset(ets)
if ts.month == 1 and ts.day == 1:
if sts.year >= 1900:
LOG.warning(
"p01d_12z for %s [idx:%s] %s(%s)->%s(%s) SPECIAL",
ts,
offset,
sts.strftime("%Y%m%d%H"),
offset1,
ets.strftime("%Y%m%d%H"),
offset2,
)
ncfn = iemre.get_hourly_ncname(ets.year)
if not os.path.isfile(ncfn):
LOG.warning("Missing %s", ncfn)
def soon():
soon = datetime.date.today() + datetime.timedelta(days=30)
return {'month': soon.month, 'year': soon.year}
# feeds about all legislation that we offer the user to subscribe to
feeds = [f for f in Feed.get_simple_feeds() if f.category == "federal-bills"]
# info about bills by status
groups = [
( g[0], # title
g[1], # text 1
g[2], # text 2
"/congress/bills/browse?status=" + ",".join(str(s) for s in g[4]) + "&sort=-current_status_date", # link
load_bill_status_qs(g[4]).count(), # count in category
load_bill_status_qs(g[4]).order_by('-current_status_date')[0:6], # top 6 in this category
)
for g in bill_status_groups ]
# legislation coming up
dhg_bills = Bill.objects.filter(congress=CURRENT_CONGRESS, docs_house_gov_postdate__gt=datetime.datetime.now() - datetime.timedelta(days=10)).filter(docs_house_gov_postdate__gt=F('current_status_date'))
sfs_bills = Bill.objects.filter(congress=CURRENT_CONGRESS, senate_floor_schedule_postdate__gt=datetime.datetime.now() - datetime.timedelta(days=5)).filter(senate_floor_schedule_postdate__gt=F('current_status_date'))
coming_up = list((dhg_bills | sfs_bills).order_by('scheduled_consideration_date'))
# top tracked bills
top_bills = Feed.objects\
.filter(feedname__startswith='bill:')\
.filter(feedname__regex='^bill:[hs][jcr]?%d-' % CURRENT_CONGRESS)
top_bills = top_bills\
.annotate(count=Count('tracked_in_lists'))\
.order_by('-count')\
.values('feedname', 'count')\
[0:25]
top_bills = [(Bill.from_feed(Feed.from_name(bf["feedname"])), bf["count"]) for bf in top_bills]
# trending bills
trf = Feed.get_trending_feeds()
def is_newer_than(after, seconds):
"""Return True if after is newer than seconds."""
if isinstance(after, basestring):
after = parse_strtime(after).replace(tzinfo=None)
return after - utcnow() > datetime.timedelta(seconds=seconds)
def func():
self.high_post = self.high_post + timedelta(hours=self.threshold)