Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
data = self._stream(fileobj)
headers["content-type"] = "application/x-tar"
if fileobj and encoding:
headers["Content-Encoding"] = encoding
if buildargs:
params.update({"buildargs": json.dumps(buildargs)})
if labels:
params.update({"labels": json.dumps(labels)})
cm = self.docker._query(
"build",
"POST",
params=clean_map(params),
headers=headers,
data=data,
)
return self._handle_response(cm, stream)
headers = None
if auth:
headers = {"X-Registry-Auth": compose_auth_header(auth, registry)}
config = {
"TaskTemplate": task_template,
"Name": name,
"Labels": labels,
"Mode": mode,
"UpdateConfig": update_config,
"RollbackConfig": rollback_config,
"Networks": clean_networks(networks),
"EndpointSpec": endpoint_spec,
}
data = json.dumps(clean_map(config))
response = await self.docker._query_json(
"services/create", method="POST", data=data, headers=headers
)
return response
True if successful.
"""
if image is None and rollback is False:
raise ValueError("You need to specify an image.")
inspect_service = await self.inspect(service_id)
spec = inspect_service["Spec"]
if image is not None:
spec["TaskTemplate"]["ContainerSpec"]["Image"] = image
params = {"version": version}
if rollback is True:
params["rollback"] = "previous"
data = json.dumps(clean_map(spec))
await self.docker._query_json(
"services/{service_id}/update".format(service_id=service_id),
method="POST",
data=data,
params=params,
)
return True
Addresses of manager nodes already participating in the swarm.
join_token
Secret token for joining this swarm.
"""
data = {
"RemoteAddrs": list(remote_addrs),
"JoinToken": join_token,
"ListenAddr": listen_addr,
"AdvertiseAddr": advertise_addr,
"DataPathAddr": data_path_addr,
}
async with self.docker._query(
"swarm/join", method="POST", data=clean_map(data)
):
return True