Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@backoff.on_exception(backoff.expo, (InfluxDBServerError, InfluxDBClientError), max_tries=4)
def query(self, query):
with Timer(self.logger, f"Influx query {query}"):
return self.cnx.query(query)
def write_to_influxdb(self, json):
"""Write preprocessed events to influxdb, with retry."""
for retry in range(self.max_tries + 1):
try:
self.influx.write_points(json)
if self.write_errors:
_LOGGER.error("Resumed, lost %d events", self.write_errors)
self.write_errors = 0
_LOGGER.debug("Wrote %d events", len(json))
break
except (
exceptions.InfluxDBClientError,
exceptions.InfluxDBServerError,
IOError,
) as err:
if retry < self.max_tries:
time.sleep(RETRY_DELAY)
else:
if not self.write_errors:
_LOGGER.error("Write error: %s", err)
self.write_errors += len(json)
Raises: Exception if unable to delete the user in 5 retries
"""
for _ in range(5):
try:
self.influx.drop_measurement(get_measurement_name(musicbrainz_id))
break
except InfluxDBClientError as e:
# influxdb-python raises client error if measurement isn't found
# so we have to handle that case.
if 'measurement not found' in e.content:
return
else:
self.log.error('Error in influx client while dropping user %s: %s', musicbrainz_id, str(e), exc_info=True)
time.sleep(3)
except InfluxDBServerError as e:
self.log.error('Error in influx server while dropping user %s: %s', musicbrainz_id, str(e), exc_info=True)
time.sleep(3)
except Exception as e:
self.log.error('Error while trying to drop user %s: %s', musicbrainz_id, str(e), exc_info=True)
time.sleep(3)
else:
raise InfluxListenStoreException("Couldn't delete user with MusicBrainz ID: %s" % musicbrainz_id)
def profile(user_name):
# Which database to use to showing user listens.
db_conn = webserver.influx_connection._influx
# Which database to use to show playing_now stream.
playing_now_conn = webserver.redis_connection._redis
user = _get_user(user_name)
# User name used to get user may not have the same case as original user name.
user_name = user.musicbrainz_id
try:
have_listen_count = True
listen_count = db_conn.get_listen_count_for_user(user_name)
except (InfluxDBServerError, InfluxDBClientError):
have_listen_count = False
listen_count = 0
# Getting data for current page
max_ts = request.args.get("max_ts")
if max_ts is not None:
try:
max_ts = int(max_ts)
except ValueError:
raise BadRequest("Incorrect timestamp argument max_ts: %s" % request.args.get("max_ts"))
min_ts = request.args.get("min_ts")
if min_ts is not None:
try:
min_ts = int(min_ts)
except ValueError:
def _write_points(points, num_points):
"""
Write the points to the InfluxDB in groups that are MAX_POINTS_PER_WRITE in
size.
"""
LOG.debug("Writing points %d", num_points)
write_index = 0
points_written = 0
while write_index < num_points:
max_write_index = write_index + MAX_POINTS_PER_WRITE
write_points = points[write_index:max_write_index]
try:
g_client.write_points(write_points)
points_written += len(write_points)
except InfluxDBServerError as svr_exc:
LOG.error(
"InfluxDBServerError: %s\nFailed to write points: %s",
str(svr_exc),
_get_point_names(write_points),
)
except InfluxDBClientError as client_exc:
LOG.error(
"InfluxDBClientError writing points: %s\n" "Error: %s",
_get_point_names(write_points),
str(client_exc),
)
except requests.exceptions.ConnectionError as req_exc:
LOG.error(
"ConnectionError exception caught writing points: %s\n" "Error: %s",
_get_point_names(write_points),
str(req_exc),
response._msgpack = msgpack.unpackb(
packed=response.content,
ext_hook=_msgpack_parse_hook,
raw=False)
else:
response._msgpack = None
def reformat_error(response):
if response._msgpack:
return json.dumps(response._msgpack, separators=(',', ':'))
else:
return response.content
# if there's not an error, there must have been a successful response
if 500 <= response.status_code < 600:
raise InfluxDBServerError(reformat_error(response))
elif response.status_code == expected_response_code:
return response
else:
err_msg = reformat_error(response)
raise InfluxDBClientError(err_msg, response.status_code)
def _doWrite(self, jsonList):
logging.debug('Writing {} series to db: {}'.format(
len(jsonList), jsonList))
success = False
try:
self._client.write_points(jsonList)
except requests.exceptions.ConnectionError as e:
logging.error('Connection Error: {}'.format(e))
except InfluxDBClientError as e:
logging.error('InfluxDBClientError: {}'.format(e))
except InfluxDBServerError as e:
logging.error('InfluxDBServerError: {}'.format(e))
except:
e = sys.exc_info()[0]
logging.error('Exception: {}'.format(e))
else:
success = True
return(success)
Args:
data: the data to be inserted into the ListenStore
retries: the number of retries to make before deciding that we've failed
Returns: number of listens successfully sent
"""
if not data:
return 0
failure_count = 0
while True:
try:
self.ls.insert(data)
return len(data)
except (InfluxDBServerError, InfluxDBClientError, ValueError) as e:
failure_count += 1
if failure_count >= retries:
break
sleep(self.ERROR_RETRY_DELAY)
except ConnectionError as e:
current_app.logger.error("Cannot write data to listenstore: %s. Sleep." % str(e), exc_info=True)
sleep(self.ERROR_RETRY_DELAY)
# if we get here, we failed on trying to write the data
if len(data) == 1:
# try to send the bad listen one more time and if it doesn't work
# log the error
try:
self.ls.insert(data)
return 1
except (InfluxDBServerError, InfluxDBClientError, ValueError, ConnectionError) as e:
def __init__(self, content):
"""Initialize the InfluxDBServerError handler."""
super(InfluxDBServerError, self).__init__(content)
def write_points(self, data):
d = data
self.logger.debug('Writing Data to InfluxDB %s', d)
try:
self.influx.write_points(d)
except (InfluxDBServerError, ConnectionError) as e:
self.logger.error('Error writing data to influxdb. Dropping this set of data. '
'Check your database! Error: %s', e)