Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def init():
"""Create a connection to the Redis server."""
global redis_conn
conn = await aioredis.create_connection(
"redis://{}:{}".format(
SETTINGS.get("FLOW_EXECUTOR", {})
.get("REDIS_CONNECTION", {})
.get("host", "localhost"),
SETTINGS.get("FLOW_EXECUTOR", {})
.get("REDIS_CONNECTION", {})
.get("port", 56379),
),
db=int(
SETTINGS.get("FLOW_EXECUTOR", {}).get("REDIS_CONNECTION", {}).get("db", 1)
),
)
redis_conn = aioredis.Redis(conn)
async def main():
conn = await aioredis.create_connection(
'redis://localhost', encoding='utf-8')
ok = await conn.execute('set', 'my-key', 'some value')
assert ok == 'OK', ok
str_value = await conn.execute('get', 'my-key')
raw_value = await conn.execute('get', 'my-key', encoding=None)
assert str_value == 'some value'
assert raw_value == b'some value'
print('str value:', str_value)
print('raw value:', raw_value)
# optionally close connection
conn.close()
await conn.wait_closed()
async def redis(self) -> aioredis.RedisConnection:
"""
Get Redis connection
"""
# Use thread-safe asyncio Lock because this method without that is not safe
async with self._connection_lock:
if self._redis is None:
self._redis = await aioredis.create_connection(
(self._host, self._port),
db=self._db,
password=self._password,
ssl=self._ssl,
loop=self._loop,
**self._kwargs,
)
return self._redis
async def redis(self) -> aioredis.RedisConnection:
"""
Get Redis connection
This property is awaitable.
"""
# Use thread-safe asyncio Lock because this method without that is not safe
async with self._connection_lock:
if self._redis is None:
self._redis = await aioredis.create_connection(
(self._host, self._port),
db=self._db,
password=self._password,
ssl=self._ssl,
loop=self._loop,
**self._kwargs,
)
return self._redis
async def serve_requests(websocket, path):
redis_conn = await aioredis.create_connection((redis_ip_address, redis_port),
loop=loop)
while True:
command = json.loads(await websocket.recv())
print("received command {}".format(command))
if command["command"] == "get-statistics":
await handle_get_statistics(websocket, redis_conn)
elif command["command"] == "get-drivers":
await handle_get_drivers(websocket, redis_conn)
elif command["command"] == "get-recent-tasks":
await handle_get_recent_tasks(websocket, redis_conn, command["num"])
elif command["command"] == "get-errors":
await handle_get_errors(websocket)
elif command["command"] == "get-heartbeats":
await send_heartbeats(websocket, redis_conn)
elif command["command"] == "get-log-files":
async def listen_for_errors(redis_ip_address, redis_port):
pubsub_conn = await aioredis.create_connection(
(redis_ip_address, redis_port), loop=loop)
data_conn = await aioredis.create_connection((redis_ip_address, redis_port),
loop=loop)
error_pattern = "__keyspace@0__:ErrorKeys"
await pubsub_conn.execute_pubsub("psubscribe", error_pattern)
channel = pubsub_conn.pubsub_patterns[error_pattern]
print("Listening for error messages...")
index = 0
while (await channel.wait_message()):
await channel.get()
info = await data_conn.execute("lrange", "ErrorKeys", index, -1)
for error_key in info:
worker, task = key_to_hex_identifiers(error_key)
# TODO(richard): Filter out workers so that only relevant task errors are