Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, ip=None, port=None, socket=None, del_after_read=True, flush=False, retention=None):
self.del_after_read = del_after_read
self.retention = retention
self.last_id = {}
self.ids = defaultdict(list)
if ip and port and socket:
raise ValueError("Cannot specify ip/port and socket for Redis")
self.conn = StorageEngines.redis.Redis(ip, port, unix_socket_path=socket, decode_responses=True)
if flush:
LOG.info('Flushing cache')
self.conn.flushall()