Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_influxdb_cache(self):
client = InfluxDBClient(host='localhost', port=8086, username='root', password='root', database='test_cache')
try:
client.drop_database('test_cache')
client.create_database('test_cache')
client.switch_database('test_cache')
with InfluxDBCache(client=DataFrameClient(host='localhost', port=8086, username='root', password='root', database='test_cache')) as cache:
listeners = SyncListeners()
QuandlEvents(listeners)
non_cache_data = get_table([{'datatable_code': 'SHARADAR/SF1', 'ticker': 'AAPL', 'dimension': 'MRY', 'qopts': {"columns": ['dimension', 'ticker', 'datekey', 'revenue']}},
{'datatable_code': 'SHARADAR/SF1', 'ticker': 'IBM', 'dimension': 'MRY', 'qopts': {"columns": ['dimension', 'ticker', 'datekey', 'revenue']}}])
items = list()
for df in non_cache_data['SHARADAR/SF1']:
items.append(df.rename({'revenue': 'value', 'datekey': 'timestamp'}, axis=1).set_index('timestamp'))
def _setup_influxdb_server(inst):
inst.influxd_inst = InfluxDbInstance(
inst.influxdb_template_conf,
udp_enabled=getattr(inst, 'influxdb_udp_enabled', False))
inst.cli = InfluxDBClient('localhost',
inst.influxd_inst.http_port,
'root',
'',
database='db')
if not using_pypy:
inst.cliDF = DataFrameClient('localhost',
inst.influxd_inst.http_port,
'root',
'',
database='db')
def get_handle_db():
global HANDLE_DB
if HANDLE_DB == '':
HANDLE_DB = influxdb.InfluxDBClient(
host=DOCKER_IP,
port=TEST_PORT_INFLUXDB_API,
database=DATABASE_NAME,
username="juniper",
password="juniper"
)
return HANDLE_DB
def test_setup_query_fail(self, mock_client):
"""Test the setup for query failures."""
config = {
'influxdb': {
'host': 'host',
'username': 'user',
'password': 'pass',
}
}
mock_client.return_value.query.side_effect = \
influx_client.exceptions.InfluxDBClientError('fake')
assert not setup_component(self.hass, influxdb.DOMAIN, config)
def test_alter_retention_policy_invalid(self):
self.cli.create_retention_policy('somename', '1d', 1)
with self.assertRaises(InfluxDBClientError) as ctx:
self.cli.alter_retention_policy('somename', 'db')
self.assertEqual(400, ctx.exception.code)
self.assertIn('{"error":"error parsing query: ',
ctx.exception.content)
rsp = self.cli.get_list_retention_policies()
self.assertEqual(
[{'duration': '0', 'default': True,
'replicaN': 1, 'name': 'default'},
{'duration': '24h0m0s', 'default': False,
'replicaN': 1, 'name': 'somename'}],
rsp
)
def test_query(mock_flux):
db = influxdb.InfluxDBClient(database="fizz")
db.query.side_effect = influxdb.exceptions.InfluxDBClientError(None)
client = InfluxAlchemy(db)
query = client.query(Measurement.new("buzz"))
tools.assert_equal(str(query), "SELECT * FROM buzz;")
def setup_db(self):
try:
self.client.drop_database(self.db_name)
except influxdb.exceptions.InfluxDBClientError:
pass
self.client.create_database(self.db_name)
data = [{
"measurement": series,
"tags": {},
"time": _time,
"fields": {
"value": self.series_values[i],
}
}
for i, series in enumerate(self.series)
for _time in [
(self.end_time - datetime.timedelta(minutes=30)).strftime("%Y-%m-%dT%H:%M:%SZ"),
(self.end_time - datetime.timedelta(minutes=2)).strftime("%Y-%m-%dT%H:%M:%SZ"),
]]
self.assertTrue(self.client.write_points(data))
def test_repr(mock_qry):
mock_qry.side_effect = influxdb.exceptions.InfluxDBClientError(None)
db = influxdb.InfluxDBClient(database="example")
client = InfluxAlchemy(db)
query = client.query(Measurement.new("fizz"))
tools.assert_equal(repr(query), "SELECT * FROM fizz;")
def test_event_listener_fail_write(self, mock_client):
"""Test the event listener for write failures."""
self._setup()
state = mock.MagicMock(
state=1, domain='fake', entity_id='entity-id', object_id='entity',
attributes={})
event = mock.MagicMock(data={'new_state': state}, time_fired=12345)
mock_client.return_value.write_points.side_effect = \
influx_client.exceptions.InfluxDBClientError('foo')
self.handler_method(event)
def test_filter_by(mock_qry):
mock_qry.side_effect = influxdb.exceptions.InfluxDBClientError(None)
db = influxdb.InfluxDBClient(database="example")
client = InfluxAlchemy(db)
query = client.query(Measurement.new("fizz")).filter_by(buzz="goo")
tools.assert_equal(str(query), "SELECT * FROM fizz WHERE (buzz = 'goo');")