Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import asyncio
from unittest.mock import MagicMock, call
import aioredis
import pytest
from asynctest import CoroutineMock, patch
from aioredlock.errors import LockError
from aioredlock.redis import Instance, Redis
EVAL_OK = b'OK'
EVAL_ERROR = aioredis.errors.ReplyError('ERROR')
CANCELLED = asyncio.CancelledError('CANCELLED')
CONNECT_ERROR = OSError('ERROR')
RANDOM_ERROR = Exception('FAULT')
class FakePool:
SET_IF_NOT_EXIST = 'SET_IF_NOT_EXIST'
def __init__(self):
self.script_cache = {}
self.eval = CoroutineMock(return_value=True)
self.get = CoroutineMock(return_value=False)
async def test_increment_typerror(self, redis, redis_connection):
redis_connection.incrby.side_effect = aioredis.errors.ReplyError("msg")
with pytest.raises(TypeError):
await redis._increment(pytest.KEY, 2)
async def maybe_create_group(self) -> Any:
"""
Create the consumer group (and the streams) if they don't exist.
"""
try:
return await self.create_group()
logger.debug("group-created", group=self.keys.group)
except aioredis.errors.ReplyError as e:
if str(e).startswith("BUSYGROUP"):
logger.debug("group-exists", group=self.keys.group)
else:
raise
async def _increment(self, key, delta, _conn=None):
try:
return await _conn.incrby(key, delta)
except aioredis.errors.ReplyError:
raise TypeError("Value is not an integer") from None
request: web.Request,
handler: WebRequestHandler) -> web.StreamResponse:
# This is a global middleware: request.app is the root app.
now = Decimal(time.time()).quantize(_time_prec)
rr = app['redis_rlim']
if request['is_authorized']:
rate_limit = request['keypair']['rate_limit']
access_key = request['keypair']['access_key']
while True:
try:
ret = await redis.execute_with_retries(lambda: rr.evalsha(
app['redis_rlim_script'],
keys=[access_key],
args=[str(now), str(_rlim_window)]))
break
except aioredis.errors.ReplyError:
# Redis may have been restarted.
app['redis_rlim_script'] = await rr.script_load(_rlim_script)
continue
rolling_count = int(ret)
if rolling_count > rate_limit:
raise RateLimitExceeded
remaining = rate_limit - rolling_count
response = await handler(request)
response.headers['X-RateLimit-Limit'] = str(rate_limit)
response.headers['X-RateLimit-Remaining'] = str(remaining)
response.headers['X-RateLimit-Window'] = str(_rlim_window)
return response
else:
# No checks for rate limiting for non-authorized queries.
response = await handler(request)
response.headers['X-RateLimit-Limit'] = '1000'
:param resource: redis key to set
:param lock_identifier: uniquie id of lock
:param lock_timeout: timeout for lock in seconds
:raises: LockError if lock is not acquired
"""
lock_timeout_ms = int(lock_timeout * 1000)
try:
with await self.connect() as redis:
await redis.eval(
self.set_lock_script,
keys=[resource],
args=[lock_identifier, lock_timeout_ms]
)
except aioredis.errors.ReplyError as exc: # script fault
self.log.debug('Can not set lock "%s" on %s',
resource, repr(self))
raise LockError('Can not set lock') from exc
except (aioredis.errors.RedisError, OSError) as exc:
self.log.error('Can not set lock "%s" on %s: %s',
resource, repr(self), repr(exc))
raise LockError('Can not set lock') from exc
except asyncio.CancelledError:
self.log.debug('Lock "%s" is cancelled on %s',
resource, repr(self))
raise
except Exception as exc:
self.log.exception('Can not set lock "%s" on %s',
resource, repr(self))
raise
else:
async def get(self):
await self._lock.acquire()
try:
async with self.pool as conn:
result = await conn.blpop(self._key)
self._lock.release()
except aioredis.errors.PoolClosedError:
await self._lock.acquire()
value = self.decode(result[-1])
return value
async with client.lock:
for stream, ts, result_dict in message:
if stream in client.streams:
client.streams[stream] = ts
ws_mesg = {'type': 'spublish', 'message': message}
await client.websocket.send(json.dumps(ws_mesg))
logging.debug('Client %d stream %s sent websocket message\n%s',
client.id, streams, ws_mesg)
# If we've been canceled, go quietly into that good night.
except asyncio.CancelledError as e:
raise e
# If we've lost the connection, sit tight and let its task expire
except aioredis.errors.ConnectionClosedError:
logging.info('Client %d lost connection', client.id)
# Catch any other thing that goes wrong.
except Exception as e:
logging.warning('Stream reader exception: %s', type(e))
# Should only get here if we have an exception, I think.
logging.info('Client %d stream %s task completed', client.id, stream)
def listen_redis(self):
"""
Listen to the messages from the subscribed Redis channel and launch
publish handler.
"""
while True:
yield self._channel.wait_message()
try:
msg = yield self._channel.get(encoding='utf-8')
except aioredis.errors.ChannelClosedError:
print("Redis channel was closed. Stopped listening.")
return
if msg:
body_utf8 = msg.encode('utf-8')
yield [con.update(body_utf8) for con in self._connections]
print("Message in {}: {}".format(self._channel.name, msg))