Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, corrupt_archive=False):
self.corrupt_archive = corrupt_archive
self.metadataFormat = whisper.metadataFormat
self.archiveInfoFormat = whisper.archiveInfoFormat
self.CACHE_HEADERS = whisper.CACHE_HEADERS
target_xff = struct.unpack("!f", struct.pack("!f", xff))[0]
self.assertEqual(info2['xFilesFactor'], target_xff)
# same aggregationMethod assertion again, but double-checking since
# we are playing with packed values and seek()
self.assertEqual(ag, info2['aggregationMethod'])
with SimulatedCorruptWhisperFile():
with AssertRaisesException(
whisper.CorruptWhisperFile(
'Unable to read header', self.filename)):
whisper.setAggregationMethod(self.filename, ag)
whisper.LOCK = original_lock
whisper.AUTOFLUSH = original_autoflush
whisper.CACHE_HEADERS = original_caching
def test_normal(self):
whisper.create(self.filename, [(1, 60), (60, 60)])
whisper.CACHE_HEADERS = True
whisper.info(self.filename)
whisper.info(self.filename)
whisper.CACHE_HEADERS = False
def test_setAggregation(self):
"""
Create a db, change aggregation, xFilesFactor, then use info() to validate
"""
original_lock = whisper.LOCK
original_caching = whisper.CACHE_HEADERS
original_autoflush = whisper.AUTOFLUSH
whisper.LOCK = True
whisper.AUTOFLUSH = True
whisper.CACHE_HEADERS = True
# create a new db with a valid configuration
whisper.create(self.filename, self.retention)
with AssertRaisesException(
whisper.InvalidAggregationMethod(
'Unrecognized aggregation method: yummy beer')):
whisper.setAggregationMethod(self.filename, 'yummy beer')
# set setting every AggregationMethod available
for ag in whisper.aggregationMethods:
for xff in 0.0, 0.2, 0.4, 0.7, 0.75, 1.0:
# original xFilesFactor
info0 = whisper.info(self.filename)
# optional xFilesFactor not passed
old_ag = whisper.setAggregationMethod(self.filename, ag)
def test_setAggregation(self):
"""
Create a db, change aggregation, xFilesFactor, then use info() to validate
"""
original_lock = whisper.LOCK
original_caching = whisper.CACHE_HEADERS
original_autoflush = whisper.AUTOFLUSH
whisper.LOCK = True
whisper.AUTOFLUSH = True
whisper.CACHE_HEADERS = True
# create a new db with a valid configuration
whisper.create(self.filename, self.retention)
with AssertRaisesException(
whisper.InvalidAggregationMethod(
'Unrecognized aggregation method: yummy beer')):
whisper.setAggregationMethod(self.filename, 'yummy beer')
# set setting every AggregationMethod available
for ag in whisper.aggregationMethods:
for xff in 0.0, 0.2, 0.4, 0.7, 0.75, 1.0:
def __exit__(self, *args, **kwargs):
whisper.metadataFormat = self.metadataFormat
whisper.archiveInfoFormat = self.archiveInfoFormat
whisper.CACHE_HEADERS = self.CACHE_HEADERS