Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_start_and_wait():
"""Test if executor await for process to accept connections."""
command = 'bash -c "sleep 2 && nc -l 3000"'
executor = TCPExecutor(command, 'localhost', port=3000, timeout=5)
executor.start()
assert executor.running() is True
executor.stop()
# check proper __str__ and __repr__ rendering:
assert 'TCPExecutor' in repr(executor)
assert command in str(executor)
def test_fail_if_other_executor_running():
"""Test raising AlreadyRunning exception."""
executor = TCPExecutor(HTTP_SERVER, host='localhost', port=PORT)
executor2 = TCPExecutor(HTTP_SERVER, host='localhost', port=PORT)
with executor:
assert executor.running() is True
with pytest.raises(AlreadyRunning):
executor2.start()
with pytest.raises(AlreadyRunning):
with executor2:
pass
extracted_version = None
return extracted_version
class RedisUnsupported(Exception):
"""Exception raised when redis<2.6 would be detected."""
class RedisMisconfigured(Exception):
"""Exception raised when the redis_exec points to non existing file."""
NoopRedis = namedtuple('NoopRedis', 'host, port, unixsocket')
class RedisExecutor(TCPExecutor):
"""
Reddis executor.
Extended TCPExecutor to contain all required logic for parametrising
and properly constructing command to start redis-server.
"""
MIN_SUPPORTED_VERSION = '2.6'
"""
Minimum required version of redis that is accepted by pytest-redis.
"""
def __init__(
self, executable, databases, redis_timeout, loglevel, logsdir,
host, port, timeout=60,
logs_prefix='', save='', daemonize='no', rdbcompression=True,
import re
import shutil
import subprocess
import time
from pkg_resources import parse_version
from mirakuru import TCPExecutor
from mirakuru.base import ExecutorType
class PostgreSQLUnsupported(Exception):
"""Exception raised when postgresql<9.0 would be detected."""
# pylint:disable=too-many-instance-attributes
class PostgreSQLExecutor(TCPExecutor):
"""
PostgreSQL executor running on pg_ctl.
Based over an `pg_ctl program
`_
"""
BASE_PROC_START_COMMAND = ' '.join((
"{executable} start -D {datadir}",
"-o \"-F -p {port} -c log_destination='stderr'",
"-c logging_collector=off -c %s='{unixsocketdir}'\"",
"-l {logfile} {startparams}"
))
VERSION_RE = re.compile(r'.* (?P\d+\.\d)')
MIN_SUPPORTED_VERSION = parse_version('9.0')
def test_fail_if_other_executor_running():
"""Test raising AlreadyRunning exception."""
executor = TCPExecutor(HTTP_SERVER, host='localhost', port=PORT)
executor2 = TCPExecutor(HTTP_SERVER, host='localhost', port=PORT)
with executor:
assert executor.running() is True
with pytest.raises(AlreadyRunning):
executor2.start()
with pytest.raises(AlreadyRunning):
with executor2:
pass
def test_default_port():
"""
Test default port for the base TCP check.
Check if HTTP executor fills in the default port for the TCP check
from the base class if no port is provided in the URL.
"""
executor = HTTPExecutor(HTTP_NORMAL_CMD, 'http://{0}/'.format(HOST))
assert executor.url.port is None
assert executor.port == PORT
assert TCPExecutor.pre_start_check(executor) is False
executor.start()
assert TCPExecutor.pre_start_check(executor) is True
executor.stop()
path_dynamodb_jar = os.path.join(
(dynamodb_dir or config['dir']),
'DynamoDBLocal.jar'
)
if not os.path.isfile(path_dynamodb_jar):
raise JarPathException(
'You have to provide a path to the dir with dynamodb jar file.'
)
dynamodb_port = get_port(port or config['port'])
dynamodb_delay = (
'-delayTransientStatuses' if delay or config['delay'] else ''
)
dynamodb_host = host or config['host']
dynamodb_executor = TCPExecutor(
f'''java
-Djava.library.path=./DynamoDBLocal_lib
-jar {path_dynamodb_jar}
-inMemory
{dynamodb_delay}
-port {dynamodb_port}''',
host=dynamodb_host,
port=dynamodb_port,
timeout=60,
)
dynamodb_executor.start()
yield dynamodb_executor
try:
dynamodb_executor.stop()
except ProcessExitedWithError:
pass
mongo_logsdir = logsdir or config['logsdir']
mongo_logpath = os.path.join(
mongo_logsdir,
f'mongo.{mongo_port}.log'
)
mongo_db_path = os.path.join(
tmpdir,
f'mongo.{mongo_port}'
)
os.mkdir(mongo_db_path)
request.addfinalizer(
lambda: os.path.exists(mongo_db_path) and rmtree(mongo_db_path)
)
mongo_executor = TCPExecutor(
(
f'{mongo_exec} --bind_ip {mongo_host} --port {mongo_port} '
f'--dbpath {mongo_db_path} '
f'--logpath {mongo_logpath} {mongo_params}'
),
host=mongo_host,
port=mongo_port,
timeout=60
)
mongo_executor.start()
request.addfinalizer(mongo_executor.stop)
return mongo_executor
def test_it_raises_error_on_timeout():
"""Check if TimeoutExpired gets raised correctly."""
command = 'bash -c "sleep 10 && nc -l 3000"'
executor = TCPExecutor(command, host='localhost', port=3000, timeout=2)
with pytest.raises(TimeoutExpired):
executor.start()
assert executor.running() is False
class MySQLUnsupported(Exception):
"""Exception raised when an unsupported MySQL has been encountered."""
class VersionNotDetected(Exception):
"""Exception raised when exector could not detect mysqls' version."""
def __init__(self, output):
"""Create error message."""
super(VersionNotDetected, self).__init__(
'Could not detect version in {}'.format(output)
)
class MySQLExecutor(TCPExecutor):
"""MySQL Executor for running MySQL server."""
VERSION_RE = re.compile(r'(?:[a-z_ ]+)(Ver)? (?P[\d.]+).*', re.I)
IMPLEMENTATION_RE = re.compile(r'.*MariaDB.*')
def __init__(
self, mysqld_safe, mysqld, admin_exec, logfile_path,
params, base_directory, user, host, port, timeout=60,
install_db=None
):
"""
Specialised Executor to run and manage MySQL server process.
:param str mysqld_safe: path to mysqld_safe executable
:param str mysqld: path to mysqld executable
:param str admin_exec: path to mysqladmin executable