Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
use_advanced_pool=self._conf.get('memcache_use_advanced_pool'),
dead_retry=self._conf.get('memcache_pool_dead_retry'),
maxsize=self._conf.get('memcache_pool_maxsize'),
unused_timeout=self._conf.get('memcache_pool_unused_timeout'),
conn_get_timeout=self._conf.get('memcache_pool_conn_get_timeout'),
socket_timeout=self._conf.get('memcache_pool_socket_timeout'),
)
if security_strategy.lower() != 'none':
secret_key = self._conf.get('memcache_secret_key')
return _cache.SecureTokenCache(self.log,
security_strategy,
secret_key,
**cache_kwargs)
else:
return _cache.TokenCache(self.log, **cache_kwargs)
use_advanced_pool=self._conf_get('memcache_use_advanced_pool'),
dead_retry=self._conf_get('memcache_pool_dead_retry'),
maxsize=self._conf_get('memcache_pool_maxsize'),
unused_timeout=self._conf_get('memcache_pool_unused_timeout'),
conn_get_timeout=self._conf_get('memcache_pool_conn_get_timeout'),
socket_timeout=self._conf_get('memcache_pool_socket_timeout'),
)
if security_strategy:
secret_key = self._conf_get('memcache_secret_key')
return _cache.SecureTokenCache(self.log,
security_strategy,
secret_key,
**cache_kwargs)
else:
return _cache.TokenCache(self.log, **cache_kwargs)
return jsonutils.loads(data)
def set(self, token_id, data):
"""Store value into memcache."""
data = jsonutils.dumps(data)
if isinstance(data, six.text_type):
data = data.encode('utf-8')
cache_key, context = self._get_cache_key(token_id)
data_to_store = self._serialize(data, context)
with self._cache_pool.reserve() as cache:
cache.set(cache_key, data_to_store, time=self._cache_time)
class SecureTokenCache(TokenCache):
"""A token cache that stores tokens encrypted.
A more secure version of TokenCache that will encrypt tokens before
caching them.
"""
def __init__(self, log, security_strategy, secret_key, **kwargs):
super(SecureTokenCache, self).__init__(log, **kwargs)
if not secret_key:
msg = _('memcache_secret_key must be defined when a '
'memcache_security_strategy is defined')
raise exc.ConfigurationError(msg)
if isinstance(security_strategy, six.string_types):
security_strategy = security_strategy.encode('utf-8')