Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_fromisoformat_timespecs(self):
if sys.version_info >= (3, 6):
datetime_bases = [
(2009, 12, 4, 8, 17, 45, 123456),
(2009, 12, 4, 8, 17, 45, 0)]
tzinfos = [None, pytz.utc,
pytz.FixedOffset(-5 * 60),
pytz.FixedOffset(2 * 60),
pytz.FixedOffset(6 * 60 + 27)]
timespecs = ['hours', 'minutes', 'seconds', 'milliseconds', 'microseconds']
for ip, ts in enumerate(timespecs):
for tzi in tzinfos:
for dt_tuple in datetime_bases:
if ts == 'milliseconds':
new_microseconds = 1000 * (dt_tuple[6] // 1000)
dt_tuple = dt_tuple[0:6] + (new_microseconds,)
dt = datetime(*(dt_tuple[0:(4 + ip)]), tzinfo=tzi)
dtstr = dt.isoformat(timespec=ts)
with self.subTest(dtstr=dtstr):
dt_rt = datetime.fromisoformat(dtstr)
self.assertEqual(dt, dt_rt)
assert label['time1'] == datetime.time(12)
assert isinstance(label['time_s'], datetime.time)
assert label['time_s'] == datetime.time(12, 0, 45)
assert isinstance(label['time_s_float'], datetime.time)
assert label['time_s_float'] == datetime.time(12, 0, 45, 457100)
assert isinstance(label['time_tz1'], datetime.time)
assert label['time_tz1'] == datetime.time(15, 24, 12, tzinfo=pytz.utc)
assert isinstance(label['time_tz2'], datetime.time)
assert label['time_tz2'] == datetime.time(1, 12, 22, tzinfo=pytz.FixedOffset(420)) # noqa
assert isinstance(label['time_tz3'], datetime.time)
assert label['time_tz3'] == datetime.time(1, 12, 22, tzinfo=pytz.FixedOffset(420)) # noqa
assert isinstance(label['time_tz4'], datetime.time)
assert label['time_tz4'] == datetime.time(1, 10, 39, 457500, pytz.FixedOffset(420)) # noqa
assert isinstance(label['datetime1'], datetime.datetime)
assert label['datetime1'] == datetime.datetime(1990, 7, 4, 12)
assert isinstance(label['datetime2'], datetime.datetime)
assert label['datetime2'] == datetime.datetime(1990, 6, 7, 15, 24, 12, tzinfo=pytz.utc) # noqa
assert isinstance(label['datetime3'], datetime.datetime)
assert label['datetime3'] == datetime.datetime(2001, 1, 1, 1, 10, 39, tzinfo=pytz.FixedOffset(420)) # noqa
assert isinstance(label['datetime4'], datetime.datetime)
assert label['datetime4'] == datetime.datetime(2001, 1, 1, 1, 10, 39, 457591, pytz.FixedOffset(420)) # noqa
def test_pythonvalue(self):
instance = builtins.gMonth()
assert instance.pythonvalue("--05") == (5, None)
assert instance.pythonvalue("--11Z") == (11, pytz.utc)
assert instance.pythonvalue("--11+02:00") == (11, pytz.FixedOffset(120))
assert instance.pythonvalue("--11-04:00") == (11, pytz.FixedOffset(-240))
assert instance.pythonvalue("--11") == (11, None)
assert instance.pythonvalue("--02") == (2, None)
with pytest.raises(builtins.ParseError):
assert instance.pythonvalue("99")
valid_subseconds = [
("", set(), {}),
(".{microsecond}", set(["microsecond"]), {"microsecond": microsecond}), # TODO: Generate the trimmed 0's version?
(",{microsecond}", set(["microsecond"]), {"microsecond": microsecond}),
]
valid_tz_info_formats = [
("", set(), {}),
("Z", set(), {"tzinfo": pytz.UTC}),
("z", set(), {"tzinfo": pytz.UTC}),
("-{tzhour}", set(["tzhour"]), {"tzinfo": pytz.FixedOffset(-1 * tzhour * 60)}),
("+{tzhour}", set(["tzhour"]), {"tzinfo": pytz.FixedOffset(1 * tzhour * 60)}),
("-{tzhour}{tzminute}", set(["tzhour", "tzminute"]), {"tzinfo": pytz.FixedOffset(-1 * ((tzhour * 60) + tzminute))}),
("+{tzhour}{tzminute}", set(["tzhour", "tzminute"]), {"tzinfo": pytz.FixedOffset(1 * ((tzhour * 60) + tzminute))}),
("-{tzhour}:{tzminute}", set(["tzhour", "tzminute"]), {"tzinfo": pytz.FixedOffset(-1 * ((tzhour * 60) + tzminute))}),
("+{tzhour}:{tzminute}", set(["tzhour", "tzminute"]), {"tzinfo": pytz.FixedOffset(1 * ((tzhour * 60) + tzminute))})
]
for valid_calendar_date_formats, valid_time_formats in [(valid_basic_calendar_date_formats, valid_basic_time_formats), (valid_extended_calendar_date_formats, valid_extended_time_formats)]:
for calendar_format, calendar_fields, calendar_params in valid_calendar_date_formats:
for date_and_time_separator in valid_date_and_time_separators:
if date_and_time_separator is None:
full_format = calendar_format
datetime_params = calendar_params
yield (full_format, calendar_fields, datetime_params)
else:
for time_format, time_fields, time_params in valid_time_formats:
for subsecond_format, subsecond_fields, subsecond_params in valid_subseconds:
for tz_info_format, tz_info_fields, tz_info_params in valid_tz_info_formats:
if "second" in time_fields:
# Add subsecond
full_format = calendar_format + date_and_time_separator + time_format + subsecond_format + tz_info_format
def _register_param(self):
self.datetime = datetime.datetime.now().replace(tzinfo=pytz.FixedOffset(480))
self.time = time.time()
self.loop_time = self.io_loop.time()
for fun_name, args in self.module_arg_dict.items():
if 'crontab' in args:
key = args['crontab']
self.crontab_router[key]['func'] = getattr(self, fun_name)
self.crontab_router[key]['iter'] = croniter(args['crontab'], self.datetime)
self.crontab_router[key]['handle'] = None
elif 'channel' in args:
self.channel_router[args['channel']] = getattr(self, fun_name)
df.index = pd.DatetimeIndex(df.time)
df['atr'] = ATR(df, timeperiod=his_atr_n)
df['short_trend'] = df.close
df['long_trend'] = df.close
for idx in range(1, df.shape[0]):
df.ix[idx, 'short_trend'] = (df.ix[idx-1, 'short_trend'] * (his_short_n - 1) +
df.ix[idx, 'close']) / his_short_n
df.ix[idx, 'long_trend'] = (df.ix[idx-1, 'long_trend'] * (his_long_n - 1) +
df.ix[idx, 'close']) / his_long_n
df['high_line'] = df.close.rolling(window=his_break_n).max()
df['low_line'] = df.close.rolling(window=his_break_n).min()
cur_pos = 0
last_trade = None
for cur_idx in range(his_break_n+1, df.shape[0]):
idx = cur_idx - 1
cur_date = df.index[cur_idx].to_pydatetime().replace(tzinfo=pytz.FixedOffset(480))
prev_date = df.index[idx].to_pydatetime().replace(tzinfo=pytz.FixedOffset(480))
if cur_pos == 0:
if df.short_trend[idx] > df.long_trend[idx] and df.close[idx] > df.high_line[idx-1]:
new_bar = MainBar.objects.filter(
exchange=inst.exchange, product_code=inst.product_code, time=cur_date).first()
Signal.objects.create(
code=new_bar.code, trigger_value=df.atr[idx],
strategy=strategy, instrument=inst, type=SignalType.BUY, processed=True,
trigger_time=prev_date, price=new_bar.open, volume=1, priority=PriorityType.LOW)
last_trade = Trade.objects.create(
broker=strategy.broker, strategy=strategy, instrument=inst,
code=new_bar.code, direction=DirectionType.LONG,
open_time=cur_date, shares=1, filled_shares=1, avg_entry_price=new_bar.open)
cur_pos = cur_idx
elif df.short_trend[idx] < df.long_trend[idx] and df.close[idx] < df.low_line[idx-1]:
new_bar = MainBar.objects.filter(
activity.Stationary = not activity.GPS # I think
# 0 = public, 1 = private, 2 = friends
activity.Private = act["visibility"] == 1
activity.StartTime = dateutil.parser.parse(act["departed_at"])
try:
activity.TZ = pytz.timezone(act["time_zone"])
except pytz.exceptions.UnknownTimeZoneError:
# Sometimes the time_zone returned isn't quite what we'd like it
# So, just pull the offset from the datetime
if isinstance(activity.StartTime.tzinfo, tzutc):
activity.TZ = pytz.utc # The dateutil tzutc doesn't have an _offset value.
else:
activity.TZ = pytz.FixedOffset(activity.StartTime.tzinfo.utcoffset(activity.StartTime).total_seconds() / 60)
activity.StartTime = activity.StartTime.replace(tzinfo=activity.TZ) # Overwrite dateutil's sillyness
activity.EndTime = activity.StartTime + timedelta(seconds=self._duration_to_seconds(act["duration"]))
logger.debug("Activity s/t " + str(activity.StartTime))
activity.AdjustTZ()
activity.Stats.Distance = ActivityStatistic(ActivityStatisticUnit.Meters, float(act["distance"]))
mapStatTriple(act, activity.Stats.Power, "watts", ActivityStatisticUnit.Watts)
mapStatTriple(act, activity.Stats.Speed, "speed", ActivityStatisticUnit.KilometersPerHour)
mapStatTriple(act, activity.Stats.Cadence, "cad", ActivityStatisticUnit.RevolutionsPerMinute)
mapStatTriple(act, activity.Stats.HR, "hr", ActivityStatisticUnit.BeatsPerMinute)
if "elevation_gain" in act and act["elevation_gain"]:
activity.Stats.Elevation.update(ActivityStatistic(ActivityStatisticUnit.Meters, gain=float(act["elevation_gain"])))
def _generate_tzinfo_from_tzoffset(tzoffset_minutes: int) -> pytz._FixedOffset:
"""Generates tzinfo object from tzoffset."""
return pytz.FixedOffset(tzoffset_minutes)
def get_max_timestamps(nwsli):
""" Fetch out our max values """
icursor = ISUAG.cursor()
data = {
"hourly": datetime.datetime(2012, 1, 1, tzinfo=pytz.FixedOffset(-360)),
"minute": datetime.datetime(2012, 1, 1, tzinfo=pytz.FixedOffset(-360)),
"daily": datetime.date(2012, 1, 1),
}
icursor.execute(
"""
SELECT max(valid) from sm_daily
WHERE station = %s
""",
(nwsli,),
)
row = icursor.fetchone()
if row[0] is not None:
data["daily"] = row[0]
icursor.execute(
"""
def build_dataframe(docs: Iterable[SON]) -> pd.DataFrame:
"""Builds DataFrame from passed BSON documents.
Input BSON document must match schema defined at ``make_bson_doc``.
"""
dfs = []
for doc in docs:
index = pd.Index(_deserialize_array(doc['index']))
index = index.tz_localize(pytz.FixedOffset(doc['metadata']['utc_offset'] or 0))
columns = [_deserialize_array(col) for col in doc['columns'].values()]
names = doc['columns'].keys()
dfs.append(pd.DataFrame(index=index, data=dict(zip(names, columns))))
return pd.concat(dfs, sort=False).sort_index()