Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _createdb(self, wsp, schema=[(1, 20)], data=None):
whisper.create(wsp, schema)
if data is None:
tn = time.time() - 20
data = []
for i in range(20):
data.append((tn + 1 + i, random.random() * 10))
whisper.update_many(wsp, data)
return data
num_data_points = 20
# create sample data
self.addCleanup(self._remove, wsp)
whisper.create(wsp, schema, sparse=sparse, useFallocate=useFallocate)
tn = int(time.time()) - num_data_points
data = []
for i in range(num_data_points):
data.append((tn + 1 + i, random.random() * 10))
# test single update
whisper.update(wsp, data[0][1], data[0][0])
# test multi update
whisper.update_many(wsp, data[1:])
return data
def _createdb(self, wsp, schema=[(1, 20)], data=None):
whisper.create(wsp, schema)
if data is None:
tn = time.time() - 20
data = []
for i in range(20):
data.append((tn + 1 + i, random.random() * 10))
whisper.update_many(wsp, data)
return data
def test_single_metric(self):
xfilesfactor = 0.5
aggregation_method = "last"
# This retentions are such that every other point is present in both
# archives. Test validates that duplicate points gets inserted only once.
retentions = [(1, 10), (2, 10)]
high_precision_duration = retentions[0][0] * retentions[0][1]
low_precision_duration = retentions[1][0] * retentions[1][1]
now = int(time.time())
time_from, time_to = now - low_precision_duration, now
points = [(float(t), float(now - t)) for t in range(time_from, time_to)]
metric = "test_metric"
metric_path = os_path.join(self.tempdir, metric + ".wsp")
whisper.create(metric_path, retentions, xfilesfactor, aggregation_method)
whisper.update_many(metric_path, points)
self._call_main()
metric = self.accessor.get_metric(metric)
self.assertTrue(metric)
self.assertEqual(metric.name, metric.name)
self.assertEqual(metric.aggregator.carbon_name, aggregation_method)
self.assertEqual(metric.carbon_xfilesfactor, xfilesfactor)
self.assertEqual(metric.retention.as_string, "10*1s:10*2s")
points_again = list(
self.accessor.fetch_points(metric, time_from, time_to, metric.retention[0])
)
self.assertEqual(points[-high_precision_duration:], points_again)
if len(args) < 2:
option_parser.print_help()
sys.exit(1)
path = args[0]
datapoint_strings = args[1:]
datapoint_strings = [point.replace('N:', '%d:' % now)
for point in datapoint_strings]
datapoints = [tuple(point.split(':')) for point in datapoint_strings]
try:
if len(datapoints) == 1:
timestamp,value = datapoints[0]
whisper.update(path, value, timestamp)
else:
whisper.update_many(path, datapoints)
except whisper.WhisperException as exc:
raise SystemExit('[ERROR] %s' % str(exc))
def _update(self, datapoints):
"""
This method store in the datapoints in the current database.
:datapoints: is a list of tupple with the epoch timestamp and value
[(1368977629,10)]
"""
if len(datapoints) == 1:
timestamp, value = datapoints[0]
whisper.update(self.path, value, timestamp)
else:
whisper.update_many(self.path, datapoints)
def write(self, metric, datapoints, sr):
"""
Persist datapoints into the database metric
Datapoints are [(timestamp, value), ....]
"""
path = self.get_path(metric)
whisper.update_many(path, datapoints)