Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
endpoint = 'http+unix://{}'.format(parse.quote(path, safe=''))
self.api = _APINode(endpoint, timeout=timeout)
self.api = self.api[version]
# Verify the connection is valid.
try:
response = self.api.get()
if response.status_code != 200:
raise exceptions.ClientConnectionFailed()
self.host_info = response.json()['metadata']
except (requests.exceptions.ConnectionError,
requests.exceptions.InvalidURL) as e:
raise exceptions.ClientConnectionFailed(str(e))
self.cluster = managers.ClusterManager(self)
self.certificates = managers.CertificateManager(self)
self.containers = managers.ContainerManager(self)
self.images = managers.ImageManager(self)
self.networks = managers.NetworkManager(self)
self.operations = managers.OperationManager(self)
self.profiles = managers.ProfileManager(self)
self.storage_pools = managers.StoragePoolManager(self)
def __init__(self, *args, **kwargs):
super(Container, self).__init__(*args, **kwargs)
self.snapshots = managers.SnapshotManager(self.client, self)
self.files = self.FilesManager(self.client, self)
def __init__(self, *args, **kwargs):
super(Cluster, self).__init__(*args, **kwargs)
self.members = managers.ClusterMemberManager(self.client, self)
def __init__(self, *args, **kwargs):
super(Container, self).__init__(*args, **kwargs)
self.snapshots = managers.SnapshotManager(self.client, self)
self.files = self.FilesManager(self.client, self)
self.api = _APINode(endpoint, timeout=timeout)
self.api = self.api[version]
# Verify the connection is valid.
try:
response = self.api.get()
if response.status_code != 200:
raise exceptions.ClientConnectionFailed()
self.host_info = response.json()['metadata']
except (requests.exceptions.ConnectionError,
requests.exceptions.InvalidURL) as e:
raise exceptions.ClientConnectionFailed(str(e))
self.cluster = managers.ClusterManager(self)
self.certificates = managers.CertificateManager(self)
self.containers = managers.ContainerManager(self)
self.images = managers.ImageManager(self)
self.networks = managers.NetworkManager(self)
self.operations = managers.OperationManager(self)
self.profiles = managers.ProfileManager(self)
self.storage_pools = managers.StoragePoolManager(self)
environment = {}
response = self.api['exec'].post(json={
'command': commands,
'environment': environment,
'wait-for-websocket': True,
'interactive': False,
})
fds = response.json()['metadata']['metadata']['fds']
operation_id = \
Operation.extract_operation_id(response.json()['operation'])
parsed = parse.urlparse(
self.client.api.operations[operation_id].websocket._api_endpoint)
with managers.web_socket_manager(WebSocketManager()) as manager:
stdin = _StdinWebsocket(
self.client.websocket_url, payload=stdin_payload,
encoding=stdin_encoding
)
stdin.resource = '{}?secret={}'.format(parsed.path, fds['0'])
stdin.connect()
stdout = _CommandWebsocketClient(
manager, self.client.websocket_url,
encoding=encoding, decode=decode, handler=stdout_handler)
stdout.resource = '{}?secret={}'.format(parsed.path, fds['1'])
stdout.connect()
stderr = _CommandWebsocketClient(
manager, self.client.websocket_url,
encoding=encoding, decode=decode, handler=stderr_handler)
stderr.resource = '{}?secret={}'.format(parsed.path, fds['2'])
stderr.connect()