Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_configure_logging_json(capsys):
data = str(uuid.uuid4())
aiomisc.log.basic_config(
level=logging.DEBUG, log_format='json', buffered=False
)
logging.info(data)
time.sleep(0.3)
stdout, stderr = capsys.readouterr()
json_result = json.loads(stdout.strip())
assert json_result['msg'] == data
logging.basicConfig(handlers=[], level=logging.INFO)
def test_configure_logging_stderr(capsys):
data = str(uuid.uuid4())
out, err = capsys.readouterr()
# logging.basicConfig(level=logging.INFO)
aiomisc.log.basic_config(level=logging.DEBUG,
log_format='stream', buffered=False)
logging.info(data)
time.sleep(0.3)
stdout, stderr = capsys.readouterr()
assert data in stderr
logging.basicConfig(handlers=[])
async def start():
nonlocal loop
nonlocal services
nonlocal log_format
nonlocal log_level
nonlocal log_buffer_size
nonlocal log_flush_interval
if log_config:
basic_config(
level=log_level,
log_format=log_format,
buffered=True,
loop=loop,
buffer_size=log_buffer_size,
flush_interval=log_flush_interval,
)
starting = []
for svc in services:
svc.set_loop(loop)
starting.append(loop.create_task(svc.start()))
await asyncio.gather(*starting, loop=loop)
"""
:param loop: loop
:param services: Service instances which will be starting.
:param pool_size: thread pool size
:param log_level: Logging level which will be configured
:param log_format: Logging format which will be configures
:param log_buffer_size: Buffer size for logging
:param log_flush_interval: interval in seconds for flushing logs
:param log_config: if False do not configure logging
"""
loop = loop or new_event_loop(pool_size)
if log_config:
basic_config(
level=log_level,
log_format=log_format,
buffered=False,
)
async def start():
nonlocal loop
nonlocal services
nonlocal log_format
nonlocal log_level
nonlocal log_buffer_size
nonlocal log_flush_interval
if log_config:
basic_config(
level=log_level,