Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run_query(self, query, user):
client = InfluxDBClusterClient.from_DSN(self.configuration['url'])
logger.debug("influxdb url: %s", self.configuration['url'])
logger.debug("influxdb got query: %s", query)
try:
results = client.query(query)
if not isinstance(results, list):
results = [results]
json_data = _transform_result(results)
error = None
except Exception, ex:
json_data = None
error = ex.message
return json_data, error
def run_query(self, query, user):
client = InfluxDBClusterClient.from_DSN(self.configuration["url"])
logger.debug("influxdb url: %s", self.configuration["url"])
logger.debug("influxdb got query: %s", query)
try:
results = client.query(query)
if not isinstance(results, list):
results = [results]
json_data = _transform_result(results)
error = None
except Exception as ex:
json_data = None
error = str(ex)
return json_data, error