Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
dynamodb_executor = TCPExecutor(
f'''java
-Djava.library.path=./DynamoDBLocal_lib
-jar {path_dynamodb_jar}
-inMemory
{dynamodb_delay}
-port {dynamodb_port}''',
host=dynamodb_host,
port=dynamodb_port,
timeout=60,
)
dynamodb_executor.start()
yield dynamodb_executor
try:
dynamodb_executor.stop()
except ProcessExitedWithError:
pass
return dynamodb_proc_fixture
def _monitor(runner: NodeRunner):
while not self._runner.root_task.done:
if runner.state is NodeState.STARTED:
try:
runner.executor.check_subprocess()
except ProcessExitedWithError as ex:
raise ScenarioError(
f"Raiden node {runner._index} died with non-zero exit status"
) from ex
gevent.sleep(0.5)