Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_make_subscriber():
sub, chan = await utils.make_subscriber("test")
assert sub is not None
assert chan is not None
assert isinstance(sub, aioredis.Redis)
assert isinstance(chan, aioredis.Channel)
await sub.subscribe("channel:test")
settings.loop.create_task(reader(chan))
assert await utils.publish_message("test", {"hello": "world"}) == 1
async def test_ping_redis(self):
"""
Test periodical Redis ping.
"""
original_ping = aioredis.Redis.ping
def dummy_ping(*args, **kwargs):
dummy_ping.n_pings += 1
if dummy_ping.n_pings < 2:
return original_ping(*args, **kwargs)
else:
loop = asyncio.get_event_loop()
return create_future(loop)
dummy_ping.n_pings = 0
shark = SocketShark(TEST_CONFIG)
await shark.prepare()
client = MockClient(shark)
session = client.session
# Have at least one subscription so we-re in pubsub mode.
import aioredis
import aioredis.util
import redis
from dragonchain import logger
_log = logger.get_logger()
REDIS_ENDPOINT = os.environ["REDIS_ENDPOINT"]
LRU_REDIS_ENDPOINT = os.environ["LRU_REDIS_ENDPOINT"]
REDIS_PORT = int(os.environ["REDIS_PORT"]) or 6379
redis_client: redis.Redis = cast(redis.Redis, None)
redis_client_lru: redis.Redis = cast(redis.Redis, None)
async_redis_client: aioredis.Redis = cast(aioredis.Redis, None)
def _set_redis_client_if_necessary() -> None:
global redis_client
if redis_client is None:
redis_client = _initialize_redis(host=REDIS_ENDPOINT, port=REDIS_PORT)
def _set_redis_client_lru_if_necessary() -> None:
global redis_client_lru
if redis_client_lru is None:
redis_client_lru = _initialize_redis(host=LRU_REDIS_ENDPOINT, port=REDIS_PORT)
async def _set_redis_client_async_if_necessary() -> None:
global async_redis_client
async def initialize(self, loop):
self._loop = loop
async with self.init_lock:
if self.initialized is False:
try:
settings = app_settings["redis"]
self._pool = await aioredis.create_pool(
(settings["host"], settings["port"]), **settings["pool"], loop=loop
)
self._pubsub_subscriptor = aioredis.Redis(await self._pool.acquire())
self.initialized = True
except AssertionError:
logger.error("Error on initializing redis", exc_info=True)
devices: Collection[AbstractComputeDevice]
alloc_map: AbstractAllocMap
class AbstractAgent(aobject, metaclass=ABCMeta):
loop: asyncio.AbstractEventLoop
config: Mapping[str, Any]
etcd: AsyncEtcd
agent_id: str
kernel_registry: MutableMapping[KernelId, AbstractKernel]
computers: MutableMapping[str, ComputerContext]
images: Mapping[str, str]
port_pool: Set[int]
redis: aioredis.Redis
zmq_ctx: zmq.asyncio.Context
terminating_kernels: Set[KernelId]
restarting_kernels: MutableMapping[KernelId, RestartTracker]
timer_tasks: MutableSequence[asyncio.Task]
container_lifecycle_queue: 'asyncio.Queue[Union[ContainerLifecycleEvent, Sentinel]]'
stat_ctx: StatContext
stat_sync_sockpath: Path
stat_sync_task: asyncio.Task
stats_monitor: StatsPluginContext
error_monitor: ErrorPluginContext
def __init__(
self,
if not redis_pool:
# Connect lazily using the provided parameters
self.connection_parameters = self.connection_parameters.copy()
self.connection_parameters.update(connection_parameters)
if url:
self.connection_parameters["address"] = url
else:
# Use the provided connection
if isinstance(redis_pool, (ConnectionsPool,)):
# If they've passed a raw pool then wrap it up in a Redis object.
# aioredis.create_redis_pool() normally does this for us.
redis_pool = Redis(redis_pool)
if not isinstance(redis_pool, (Redis,)):
raise InvalidRedisPool(
"Invalid Redis connection provided: {}. If unsure, use aioredis.create_redis_pool() to "
"create your redis connection.".format(redis_pool)
)
if not isinstance(redis_pool._pool_or_conn, (ConnectionsPool,)):
raise InvalidRedisPool(
"The provided redis connection is backed by a single connection, rather than a "
"pool of connections. This will lead to lightbus deadlocks and is unsupported. "
"If unsure, use aioredis.create_redis_pool() to create your redis connection."
)
# Determine the connection parameters from the given pool
# (we will need these in other to create new pools for other threads)
self.connection_parameters = dict(
address=redis_pool.address,
db=redis_pool.db,
from typing import Any, Dict, Optional
import aioredis
import ujson
class RedisConnection: # pragma: no cover
redis: Optional[aioredis.Redis] = None
connection_info: Dict[str, Any]
@classmethod
async def _get_redis_connection(cls):
if cls.redis and not cls.redis.closed:
return cls.redis
cls.redis = await aioredis.create_redis_pool(cls.connection_info)
return cls.redis
@classmethod
async def initialize(cls, connection_info):
cls.connection_info = connection_info
await cls._get_redis_connection()
@classmethod
Consumer example: database updates upon specific events.
Subscribers use the broadcast pattern. All subscribers in many manager worker processes
receive the same event.
Subscriber example: enqueuing events to the queues for event streaming API handlers
'''
loop: asyncio.AbstractEventLoop
root_app: web.Application
consumers: MutableMapping[str, Set[EventHandler]]
subscribers: MutableMapping[str, Set[EventHandler]]
redis_producer: aioredis.Redis
redis_consumer: aioredis.Redis
redis_subscriber: aioredis.Redis
consumer_task: asyncio.Task
subscriber_task: asyncio.Task
producer_lock: asyncio.Lock
def __init__(self, app: web.Application) -> None:
self.loop = current_loop()
self.root_app = app
self.consumers = defaultdict(set)
self.subscribers = defaultdict(set)
async def __ainit__(self) -> None:
self.redis_producer = await self._create_redis()
self.redis_consumer = await self._create_redis()
self.redis_subscriber = await self._create_redis()
self.consumer_task = self.loop.create_task(self._consume())
self.subscriber_task = self.loop.create_task(self._subscribe())
async def _initialize_async_redis(host: str, port: int, wait_time: int = 30) -> aioredis.Redis:
"""Initiailize an aioredis, but ensure that the redis is up and connectable, otherwise throw an error
Args:
host: host of the redis to initialize a connection
port: port of the redis to initialize a connection
wait_time: number of seconds to wait with a failed connection before throwing a RuntimeException
Returns:
aioredis (https://aioredis.readthedocs.io/en/latest/) client (with a connection pool) that is connected and available
"""
expire_time = time.time() + wait_time
_log.debug(f"Attempting to connect to redis at {host}:{port}")
sleep_time = 1 # Number of seconds to wait after a failure to connect before retrying
while time.time() < expire_time:
try:
client = await aioredis.create_redis_pool((host, port))
if await client.ping():
_log.debug(f"Successfully connected with redis at {host}:{port}")
self._closed = False
if not redis_pool:
# Connect lazily using the provided parameters
self.connection_parameters = self.connection_parameters.copy()
self.connection_parameters.update(connection_parameters)
if url:
self.connection_parameters["address"] = url
else:
# Use the provided connection
if isinstance(redis_pool, (ConnectionsPool,)):
# If they've passed a raw pool then wrap it up in a Redis object.
# aioredis.create_redis_pool() normally does this for us.
redis_pool = Redis(redis_pool)
if not isinstance(redis_pool, (Redis,)):
raise InvalidRedisPool(
"Invalid Redis connection provided: {}. If unsure, use aioredis.create_redis_pool() to "
"create your redis connection.".format(redis_pool)
)
if not isinstance(redis_pool._pool_or_conn, (ConnectionsPool,)):
raise InvalidRedisPool(
"The provided redis connection is backed by a single connection, rather than a "
"pool of connections. This will lead to lightbus deadlocks and is unsupported. "
"If unsure, use aioredis.create_redis_pool() to create your redis connection."
)
# Determine the connection parameters from the given pool
# (we will need these in other to create new pools for other threads)
self.connection_parameters = dict(
address=redis_pool.address,