Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def ranges(client: InfluxDBClient):
"""
:return: list of latest times for each entry grouped by symbol and interval
"""
parse_time = lambda t: parse(t).replace(tzinfo=tz.gettz('UTC'))
points = InfluxDBClient.query(client, "select FIRST(close), symbol, interval, time from bars group by symbol, interval").get_points()
firsts = {(entry['symbol'], int(entry['interval'].split('_')[0]), entry['interval'].split('_')[1]): parse_time(entry['time']) for entry in points}
points = InfluxDBClient.query(client, "select LAST(close), symbol, interval, time from bars group by symbol, interval").get_points()
lasts = {(entry['symbol'], int(entry['interval'].split('_')[0]), entry['interval'].split('_')[1]): parse_time(entry['time']) for entry in points}
result = {k: (firsts[k], lasts[k]) for k in firsts.keys() & lasts.keys()}
return result
def ranges(client: InfluxDBClient):
"""
:return: list of latest times for each entry grouped by symbol and interval
"""
parse_time = lambda t: parse(t).replace(tzinfo=tz.gettz('UTC'))
points = InfluxDBClient.query(client, "select FIRST(close), symbol, interval, time from bars group by symbol, interval").get_points()
firsts = {(entry['symbol'], int(entry['interval'].split('_')[0]), entry['interval'].split('_')[1]): parse_time(entry['time']) for entry in points}
points = InfluxDBClient.query(client, "select LAST(close), symbol, interval, time from bars group by symbol, interval").get_points()
lasts = {(entry['symbol'], int(entry['interval'].split('_')[0]), entry['interval'].split('_')[1]): parse_time(entry['time']) for entry in points}
result = {k: (firsts[k], lasts[k]) for k in firsts.keys() & lasts.keys()}
return result
@property
def ranges(self):
"""
:return: list of latest times for each entry grouped by symbol and tag
"""
parse_time = lambda t: parse(t).replace(tzinfo=tz.gettz('UTC'))
points = InfluxDBClient.query(self.client, "select FIRST(value), symbol, itag, time from intrinio_tags group by symbol, itag").get_points()
firsts = {(entry['symbol'], entry['itag']): parse_time(entry['time']) for entry in points}
points = InfluxDBClient.query(self.client, "select LAST(value), symbol, itag, time from intrinio_tags group by symbol, itag").get_points()
lasts = {(entry['symbol'], entry['itag']): parse_time(entry['time']) for entry in points}
result = {k: (firsts[k], lasts[k]) for k in firsts.keys() & lasts.keys()}
return result
@property
def ranges(self):
"""
:return: list of latest times for each entry grouped by symbol and tag
"""
parse_time = lambda t: parse(t).replace(tzinfo=tz.gettz('UTC'))
points = InfluxDBClient.query(self.client, "select FIRST(value), symbol, itag, time from intrinio_tags group by symbol, itag").get_points()
firsts = {(entry['symbol'], entry['itag']): parse_time(entry['time']) for entry in points}
points = InfluxDBClient.query(self.client, "select LAST(value), symbol, itag, time from intrinio_tags group by symbol, itag").get_points()
lasts = {(entry['symbol'], entry['itag']): parse_time(entry['time']) for entry in points}
result = {k: (firsts[k], lasts[k]) for k in firsts.keys() & lasts.keys()}
return result
def get_cache_fundamentals(client: InfluxDBClient, symbol: typing.Union[list, str] = None):
query = "SELECT * FROM iqfeed_fundamentals"
if isinstance(symbol, list) and len(symbol) > 0:
query += " WHERE symbol =~ /{}/".format("|".join(['^' + s + '$' for s in symbol]))
elif isinstance(symbol, str) and len(symbol) > 0:
query += " WHERE symbol = '{}'".format(symbol)
result = {f['symbol']: {**json.loads(f['data']), **{'last_update': f['time']}} for f in list(InfluxDBClient.query(client, query, chunked=True).get_points())}
return result[symbol] if isinstance(symbol, str) else result