Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)
db_co = aioredis.create_redis(
address=('localhost', 6379),
db=13,
loop=self.loop,
encoding='utf-8',
)
self.db = self.loop.run_until_complete(db_co)
async def create_pub_sub_conn():
pub = await aioredis.create_redis("redis://localhost:6379/0?encoding=utf-8")
sub = await aioredis.create_redis("redis://localhost:6379/0?encoding=utf-8")
yield pub, sub
pub.close()
sub.close()
async def main(flush):
store = await create_redis(('localhost', 6379), commands_factory=Redis)
tf2info = await tf2search.gettf2info(config.apikey,
config.backpackkey, config.tradekey,
config.blueprintsfile)
if flush:
await store.delete('items')
await store.delete_all('items:*')
await store.delete_all('item:*')
suggestions = [[], [], []]
sitemap = Sitemap()
sitemap.add(config.homepage)
all_classes = [class_.lower() for class_ in tf2api.getallclasses()]
async def make_redis():
return await aioredis.create_redis(make_broadcast_url())
# port = int(port)
# try:
# db = int(db)
# if not db:
# db = 0
# except ValueError:
# db = 0
self._redis_consumer = self._loop_delay. \
run_until_complete(aioredis.create_redis(address=(host, port),
db=db,
password=password,
loop=self._loop_delay))
self._redis_poller = self._loop_delay. \
run_until_complete(aioredis.create_redis(address=(host, port),
db=db,
password=password,
loop=self._loop_delay))
async def get_async_redis(self, loop=None):
"""Creates an asynchronous Redis connection.
Parameters
----------
loop = Optional[asyncio.AbstractEventLoop]
The loop used for the asynchronous Redis connection.
"""
if self.loop is not None and loop is None:
loop = self.loop
return await aioredis.create_redis(
'redis://{}:{}'.format(self.config['HOST'], self.config['PORT']),
db=self.config['DB'], password=self.config['PASSWORD'], loop=loop)
async def _initialize_redis_client(self):
return await aioredis.create_redis(
(config["redis"]["host"], config["redis"]["port"]),
loop=asyncio.get_event_loop()
)
async def track_event(event, state, service_name):
"""
Store state of events in memory
:param event: Event object
:param state: EventState object
:param service_name: Name of service name
"""
redis = await aioredis.create_redis(
(EVENT_TRACKING_HOST, 6379), loop=loop)
now = datetime.utcnow()
event_id = event.event_id
tracking_data = json.dumps({
"event_id": event_id,
"timestamp": str(now),
"state": state
})
await redis.rpush(service_name, tracking_data)
redis.close()
await redis.wait_closed()
async def handle_notifications(loop, term_ev, term_barrier, registry):
redis_sub = await aioredis.create_redis(registry.redis_addr,
encoding='utf8',
loop=loop)
redis = await aioredis.create_redis(registry.redis_addr,
encoding='utf8',
loop=loop)
# Enable "expired" event notification
# See more details at: http://redis.io/topics/notifications
await redis_sub.config_set('notify-keyspace-events', 'Ex')
chprefix = '__keyevent@{}__*'.format(defs.SORNA_INSTANCE_DB)
channels = await redis_sub.psubscribe(chprefix)
log.info('subscribed redis notifications.')
g = None
while not term_ev.is_set():
msg = None
try:
g = asyncio.Task(channels[0].get(encoding='utf8'))
await asyncio.wait_for(g, 0.5, loop=loop)
except asyncio.TimeoutError:
continue