Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_connect_envvar(monkeypatch):
monkeypatch.setenv("DOCKER_HOST", "unix:///var/run/does-not-exist-docker.sock")
docker = Docker()
assert isinstance(docker.connector, aiohttp.connector.UnixConnector)
assert docker.docker_host == "unix://localhost"
with pytest.raises(aiohttp.ClientOSError):
await docker.containers.list()
await docker.close()
monkeypatch.setenv("DOCKER_HOST", "http://localhost:9999")
docker = Docker()
assert isinstance(docker.connector, aiohttp.TCPConnector)
assert docker.docker_host == "http://localhost:9999"
with pytest.raises(aiohttp.ClientOSError):
await docker.containers.list()
await docker.close()
def test_wait_timeout():
docker = Docker()
yield from docker.pull("debian:jessie")
config = {
"Cmd":["sh"],
"Image":"debian:jessie",
"AttachStdin":True,
"AttachStdout":True,
"AttachStderr":True,
"Tty":False,
"OpenStdin":True,
"StdinOnce":True,
}
container = yield from docker.containers.create_or_replace(config=config, name='testing')
yield from container.start()
def test_stdio_stdin():
docker = Docker()
yield from docker.pull("debian:jessie")
config = {
"Cmd":["sh"],
"Image":"debian:jessie",
"AttachStdin":True,
"AttachStdout":True,
"AttachStderr":True,
"Tty":False,
"OpenStdin":True,
"StdinOnce":True,
}
container = yield from docker.containers.create_or_replace(config=config, name='testing')
def test_image_push():
docker = Docker()
yield from docker.pull("debian")
yield from docker.images.tag("debian", repo=os.environ['DOCKER_REGISTRY']+"/debian")
#push_results = yield from docker.images.push(os.environ['DOCKER_REGISTRY']+"/debian")
def test_events():
docker = Docker()
queue = yield from docker.events.query()
yield from docker.pull("debian:jessie")
config = {
"Cmd":["sh"],
"Image":"debian:jessie",
}
container = yield from docker.containers.create_or_replace(config=config, name='testing')
#print("put archive response:", result)
yield from container.start()
i = yield from queue.__aiter__()
while True:
try:
async def prepare_krunner_env_impl(distro: str) -> Tuple[str, Optional[str]]:
if (m := re.search(r'^([a-z]+)\d+\.\d+$', distro)) is None:
raise ValueError('Unrecognized "distro[version]" format string.')
distro_name = m.group(1)
docker = Docker()
arch = platform.machine()
current_version = int(Path(
pkg_resources.resource_filename(
f'ai.backend.krunner.{distro_name}',
f'./krunner-version.{distro}.txt'))
.read_text().strip())
volume_name = f'backendai-krunner.v{current_version}.{distro}'
extractor_image = 'backendai-krunner-extractor:latest'
try:
for item in (await docker.images.list()):
if item['RepoTags'] is None:
continue
if item['RepoTags'][0] == extractor_image:
break
else:
async def __ainit__(self) -> None:
self.docker = Docker()
if not self._skip_initial_scan:
docker_version = await self.docker.version()
log.info('running with Docker {0} with API {1}',
docker_version['Version'], docker_version['ApiVersion'])
await super().__ainit__()
self.agent_sockpath = ipc_base_path / f'agent.{self.agent_id}.sock'
self.agent_sock_task = asyncio.create_task(self.handle_agent_socket())
self.monitor_docker_task = asyncio.create_task(self.monitor_docker_events())
def __setstate__(self, props):
super().__setstate__(props)
self._docker = Docker()
asyncio.ensure_future(_send())
resp = await ws.receive()
print("received: {}".format(resp))
await ws.close()
output = await container.log(stdout=True)
print("log output: {}".format(output))
finally:
print("removing container")
await container.delete(force=True)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
docker = Docker()
try:
loop.run_until_complete(demo(docker))
finally:
loop.run_until_complete(docker.close())
loop.close()
print(key, ':', value)
print('--------------------------------')
print('- Check Docker Container List')
containers = await docker.containers.list()
for container in containers:
container_show = await container.show()
for key, value in container_show.items():
if key == 'Id':
print('Id', ':', value[:12])
print('--------------------------------')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
docker = Docker()
try:
loop.run_until_complete(demo(docker))
finally:
loop.run_until_complete(docker.close())
loop.close()