Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def make_client(self, mock_socket_values, **kwargs):
tracer = get_dummy_tracer()
Pin.override(pymemcache, tracer=tracer)
self.client = pymemcache.client.base.Client((TEST_HOST, TEST_PORT), **kwargs)
self.client.sock = MockSocket(list(mock_socket_values))
return self.client
def serialize_json(key, value):
if type(value) == str:
return value, _STRING_TYPE
return json.dumps(value), _JSON_TYPE
def deserialize_json(key, value, flags):
if flags == _STRING_TYPE:
return value
if flags == _JSON_TYPE:
return json.loads(value)
raise Exception("Unknown flags for value: {1}".format(flags))
self.client = Client(
self.endpoint,
no_delay=True,
timeout=self.timeout,
connect_timeout=self.connect_timeout,
key_prefix="data_model_cache__",
serializer=serialize_json,
deserializer=deserialize_json,
ignore_exc=True,
)
return self.client
except:
logger.exception("Got exception when creating memcached client to %s", self.endpoint)
return None
MemcacheIllegalInputError,
)
# project
from ...constants import ANALYTICS_SAMPLE_RATE_KEY
from ...compat import reraise
from ...ext import SpanTypes, net, memcached as memcachedx
from ...internal.logger import get_logger
from ...pin import Pin
from ...settings import config
log = get_logger(__name__)
# keep a reference to the original unpatched clients
_Client = Client
class WrappedClient(wrapt.ObjectProxy):
"""Wrapper providing patched methods of a pymemcache Client.
Relevant connection information is obtained during initialization and
attached to each span.
Keys are tagged in spans for methods that act upon a key.
"""
def __init__(self, *args, **kwargs):
c = _Client(*args, **kwargs)
super(WrappedClient, self).__init__(c)
# tags to apply to each span generated by this client
def _cache_client(self):
client = Client((self.host, self.port), self.kwargs)
return client
def _create_client(self):
client = Client(self.server,
serde=self.serde,
connect_timeout=self.connect_timeout,
timeout=self.timeout,
no_delay=self.no_delay,
# We need to know when it fails *always* so that we
# can remove/destroy it from the pool...
ignore_exc=False,
socket_module=self.socket_module,
key_prefix=self.key_prefix,
default_noreply=self.default_noreply,
allow_unicode_keys=self.allow_unicode_keys)
return client
def connect(self, server):
server = CacheMemcacheStrategy.splitHosts(server)
assert server, "{} is not a suitable server".format(server)
if len(server) == 1:
clientClass = Client
server = server[0]
else:
from pymemcache.client.hash import HashClient
clientClass = HashClient
self.client = clientClass(server, ignore_exc=True,
serializer=python_memcache_serializer,
deserializer=python_memcache_deserializer,
timeout=5,
connect_timeout=5,
key_prefix=(getStringHash(self.fileStrategy.dir) + "_").encode("UTF-8")
)