Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def add_adjustments(client: InfluxDBClient, adjustments: list, provider: str):
"""
add a list of splits/dividends to the database
:param client: influxdb client
:param adjustments: list of adjustments of the type [(timestamp: datetime.date, symbol: str, typ: str, value), ...]
:param provider: data provider
"""
points = [_get_adjustment_json_query(*a, provider=provider) for a in adjustments]
return InfluxDBClient.write_points(client, points, protocol='json', time_precision='s')
def add_adjustment(client: InfluxDBClient, timestamp: datetime.date, symbol: str, typ: str, value: float, provider: str):
"""
add splits/dividends to the database
:param client: influxdb client
:param timestamp: date of the adjustment
:param symbol: symbol
:param typ: 'split' or 'dividend'
:param value: split_factor/dividend_rate
:param provider: data provider
"""
json_body = _get_adjustment_json_query(timestamp=timestamp, symbol=symbol, typ=typ, value=value, provider=provider)
return InfluxDBClient.write_points(client, [json_body], protocol='json', time_precision='s')
for f in fundamentals:
points.append(
{
"measurement": "iqfeed_fundamentals",
"tags": {
"symbol": f['symbol'],
},
"time": datetime.datetime.combine(datetime.datetime.utcnow().date(), datetime.datetime.min.time()),
"fields": {
"data": json.dumps(f, default=lambda x: x.isoformat() if isinstance(x, datetime.datetime) else str(x)),
}
}
)
try:
InfluxDBClient.write_points(client, points, protocol='json', time_precision='s')
except Exception as err:
logging.getLogger(__name__).error(err)