Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, config, name=None):
self.config = config
self._buffer_size = 50000
redis_param = self._parse_config(self.config['redis'])
self._redis = redis.Redis(**redis_param)
self._buffer = RedisBuffer(self._redis.connection_pool,
self._redis.response_callbacks,
transaction=True,
buffer_size=self._buffer_size)
if name is None:
name = _random_name(11)
self._name = name
def execute_command(self, *args, **kwargs):
if len(self.command_stack) >= self._buffer_size:
self.execute()
super(RedisBuffer, self).execute_command(*args, **kwargs)