Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
info0 = whisper.info(self.filename)
# optional xFilesFactor not passed
old_ag = whisper.setAggregationMethod(self.filename, ag)
# should return old aggregationmethod
self.assertEqual(old_ag, info0['aggregationMethod'])
# original value should not change
info1 = whisper.info(self.filename)
self.assertEqual(info0['xFilesFactor'], info1['xFilesFactor'])
# the selected aggregation method should have applied
self.assertEqual(ag, info1['aggregationMethod'])
# optional xFilesFactor used
old_ag = whisper.setAggregationMethod(self.filename, ag, xff)
# should return old aggregationmethod
self.assertEqual(old_ag, info1['aggregationMethod'])
# new info should match what we just set it to
info2 = whisper.info(self.filename)
# packing and unpacking because
# AssertionError: 0.20000000298023224 != 0.2
target_xff = struct.unpack("!f", struct.pack("!f", xff))[0]
self.assertEqual(info2['xFilesFactor'], target_xff)
# same aggregationMethod assertion again, but double-checking since
# we are playing with packed values and seek()
self.assertEqual(ag, info2['aggregationMethod'])
with SimulatedCorruptWhisperFile():
with AssertRaisesException(
whisper.CorruptWhisperFile(
# new info should match what we just set it to
info2 = whisper.info(self.filename)
# packing and unpacking because
# AssertionError: 0.20000000298023224 != 0.2
target_xff = struct.unpack("!f", struct.pack("!f", xff))[0]
self.assertEqual(info2['xFilesFactor'], target_xff)
# same aggregationMethod assertion again, but double-checking since
# we are playing with packed values and seek()
self.assertEqual(ag, info2['aggregationMethod'])
with SimulatedCorruptWhisperFile():
with AssertRaisesException(
whisper.CorruptWhisperFile(
'Unable to read header', self.filename)):
whisper.setAggregationMethod(self.filename, ag)
whisper.LOCK = original_lock
whisper.AUTOFLUSH = original_autoflush
whisper.CACHE_HEADERS = original_caching
Create a db, change aggregation, xFilesFactor, then use info() to validate
"""
original_lock = whisper.LOCK
original_caching = whisper.CACHE_HEADERS
original_autoflush = whisper.AUTOFLUSH
whisper.LOCK = True
whisper.AUTOFLUSH = True
whisper.CACHE_HEADERS = True
# create a new db with a valid configuration
whisper.create(self.filename, self.retention)
with AssertRaisesException(
whisper.InvalidAggregationMethod(
'Unrecognized aggregation method: yummy beer')):
whisper.setAggregationMethod(self.filename, 'yummy beer')
# set setting every AggregationMethod available
for ag in whisper.aggregationMethods:
for xff in 0.0, 0.2, 0.4, 0.7, 0.75, 1.0:
# original xFilesFactor
info0 = whisper.info(self.filename)
# optional xFilesFactor not passed
old_ag = whisper.setAggregationMethod(self.filename, ag)
# should return old aggregationmethod
self.assertEqual(old_ag, info0['aggregationMethod'])
# original value should not change
info1 = whisper.info(self.filename)
self.assertEqual(info0['xFilesFactor'], info1['xFilesFactor'])
(options, args) = option_parser.parse_args()
if len(args) < 2:
option_parser.print_help()
sys.exit(1)
path = args[0]
aggregationMethod = args[1]
xFilesFactor = None
if len(args) == 3:
xFilesFactor = args[2]
try:
oldAggregationMethod = whisper.setAggregationMethod(path, aggregationMethod, xFilesFactor)
except IOError:
sys.stderr.write("[ERROR] File '%s' does not exist!\n\n" % path)
option_parser.print_help()
sys.exit(1)
except whisper.WhisperException as exc:
raise SystemExit('[ERROR] %s' % str(exc))
print('Updated aggregation method: %s (%s -> %s)' % (path,oldAggregationMethod,aggregationMethod))
def set_metadata(self, metric, key, value):
"""
Modify metric metadata
"""
if key != "aggregationMethod":
raise ValueError("Invalid metadata key: %s" % key)
path = self.get_path(metric)
return whisper.setAggregationMethod(path, value)
def setAggregation(path, mode):
if not mode:
return 0
if not os.path.exists(path):
return 0
try:
whisper.setAggregationMethod(path, mode)
return 1
except whisper.WhisperException as exc:
logging.warning("%s failed (%s)" % (path, str(exc)))
def setMetadata(self, metric, key, value):
if key != 'aggregationMethod':
raise ValueError("Unsupported metadata key \"%s\"" % key)
wsp_path = self.getFilesystemPath(metric)
return whisper.setAggregationMethod(wsp_path, value)