Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
process_queue = [[] for _ in range(processes)]
for index, ops in enumerate(operations):
process_queue[index % processes].append(ops)
for num in range(processes):
with open('input-%s.pkl' % num, 'wb') as writer:
pickle.dump(process_queue[num], writer, protocol=2)
for process in subprocs:
process.start()
for process in subprocs:
process.join()
with FanoutCache('tmp') as cache:
warnings.simplefilter('error')
warnings.simplefilter('ignore', category=UnknownFileWarning)
warnings.simplefilter('ignore', category=EmptyDirWarning)
cache.check()
timings = {'get': [], 'set': [], 'delete': [], 'self': 0.0}
for num in range(processes):
with open('output-%s.pkl' % num, 'rb') as reader:
data = pickle.load(reader)
for key in data:
timings[key] += data[key]
if delete:
for num in range(processes):
os.remove('input-%s.pkl' % num)
def test_copy():
cache_dir1 = tempfile.mkdtemp()
with dc.FanoutCache(cache_dir1) as cache1:
for count in range(10):
cache1[count] = str(count)
for count in range(10, 20):
cache1[count] = str(count) * int(1e5)
cache_dir2 = tempfile.mkdtemp()
shutil.rmtree(cache_dir2)
shutil.copytree(cache_dir1, cache_dir2)
with dc.FanoutCache(cache_dir2) as cache2:
for count in range(10):
assert cache2[count] == str(count)
for count in range(10, 20):
assert cache2[count] == str(count) * int(1e5)
shutil.rmtree(cache_dir1, ignore_errors=True)
shutil.rmtree(cache_dir2, ignore_errors=True)
def worker(queue, eviction_policy, processes, threads):
timings = {'get': [], 'set': [], 'delete': []}
cache = FanoutCache('tmp', eviction_policy=eviction_policy)
for index, (action, key, value) in enumerate(iter(queue.get, None)):
start = time.time()
if action == 'set':
cache.set(key, value, expire=EXPIRE)
elif action == 'get':
result = cache.get(key)
else:
assert action == 'delete'
cache.delete(key)
stop = time.time()
if action == 'get' and processes == 1 and threads == 1 and EXPIRE is None:
assert result == value
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--cache-dir', default='/tmp/test')
parser.add_argument('--iterations', type=int, default=100)
parser.add_argument('--sleep', type=float, default=0.1)
parser.add_argument('--size', type=int, default=25)
args = parser.parse_args()
data = dc.FanoutCache(args.cache_dir)
delays = []
values = {str(num): num for num in range(args.size)}
iterations = args.iterations
for i in range(args.iterations):
print(f'Iteration {i + 1}/{iterations}', end='\r')
time.sleep(args.sleep)
for key, value in values.items():
start = time.monotonic()
data[key] = value
stop = time.monotonic()
diff = stop - start
delays.append(diff)
# Discard warmup delays, first two iterations.
del delays[:(len(values) * 2)]
def test_rsync():
try:
run(['rsync', '--version'])
except OSError:
return # No rsync installed. Skip test.
rsync_args = ['rsync', '-a', '--checksum', '--delete', '--stats']
cache_dir1 = tempfile.mkdtemp() + os.sep
cache_dir2 = tempfile.mkdtemp() + os.sep
# Store some items in cache_dir1.
with dc.FanoutCache(cache_dir1) as cache1:
for count in range(100):
cache1[count] = str(count)
for count in range(100, 200):
cache1[count] = str(count) * int(1e5)
# Rsync cache_dir1 to cache_dir2.
run(rsync_args + [cache_dir1, cache_dir2])
# Validate items in cache_dir2.
with dc.FanoutCache(cache_dir2) as cache2:
for count in range(100):
assert cache2[count] == str(count)
def test_incr_concurrent():
with dc.FanoutCache(shards=1, timeout=0.001) as cache:
count = 16
limit = 50
threads = [
threading.Thread(target=stress_incr, args=(cache, limit))
for _ in range(count)
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
assert cache.get(b'key') == count * limit
cache.check()
def getCache(scope_str):
return FanoutCache('data-unversioned/cache/' + scope_str,
disk=GzipDisk,
shards=64,
timeout=1,
size_limit=2e11,
# disk_min_file_size=2**20,
def __init__(self, *, config: Config, name: str, module_configuration: ConfigDict) -> None:
read_cache_directory = Config.get_from_dict(module_configuration, 'readCache.directory', None, types=str)
read_cache_maximum_size = Config.get_from_dict(module_configuration, 'readCache.maximumSize', None, types=int)
read_cache_shards = Config.get_from_dict(module_configuration, 'readCache.shards', None, types=int)
if read_cache_directory and read_cache_maximum_size:
os.makedirs(read_cache_directory, exist_ok=True)
try:
self._read_cache = FanoutCache(
read_cache_directory,
size_limit=read_cache_maximum_size,
shards=read_cache_shards,
eviction_policy='least-frequently-used',
statistics=1,
)
except Exception:
logger.warning('Unable to enable disk based read caching. Continuing without it.')
self._read_cache = None
else:
logger.debug('Disk based read caching instantiated (cache size {}, shards {}).'.format(
read_cache_maximum_size, read_cache_shards))
else:
self._read_cache = None
self._use_read_cache = True