Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def load_manifest(self, *, user: str, identity_file: str):
"""
Load a cluster's manifest from the master. This will populate information
about installed services and configured storage.
Providers shouldn't need to override this method.
"""
if not self.master_ip:
return
master_ssh_client = get_ssh_client(
user=user,
host=self.master_ip,
identity_file=identity_file,
wait=True,
print_status=False)
with master_ssh_client:
manifest_raw = ssh_check_output(
client=master_ssh_client,
command="""
cat "$HOME/.flintrock-manifest.json"
""")
# TODO: Would it be better if storage (ephemeral and otherwise) was
# implemented as a Flintrock service and tracked in the manifest?
ephemeral_dirs_raw = ssh_check_output(
client=master_ssh_client,
def copy_file_node(
*,
user: str,
host: str,
identity_file: str,
local_path: str,
remote_path: str):
"""
Copy a file to the specified remote path on a node.
This method is role-agnostic; it runs on both the cluster master and slaves.
This method is meant to be called asynchronously.
"""
ssh_client = get_ssh_client(
user=user,
host=host,
identity_file=identity_file)
with ssh_client:
remote_dir = posixpath.dirname(remote_path)
try:
ssh_check_output(
client=ssh_client,
command="""
test -d {path}
""".format(path=shlex.quote(remote_dir)))
except Exception as e:
# TODO: Catch more specific exception.
raise Exception("Remote directory does not exist: {d}".format(d=remote_dir))
user: str,
identity_file: str):
"""
Connect to a freshly launched cluster and install the specified services.
"""
partial_func = functools.partial(
provision_node,
services=services,
user=user,
identity_file=identity_file,
cluster=cluster)
hosts = [cluster.master_ip] + cluster.slave_ips
run_against_hosts(partial_func=partial_func, hosts=hosts)
master_ssh_client = get_ssh_client(
user=user,
host=cluster.master_host,
identity_file=identity_file)
with master_ssh_client:
manifest = {
'services': [[type(m).__name__, m.manifest] for m in services],
'ssh_key_pair': cluster.ssh_key_pair._asdict(),
}
# The manifest tells us how the cluster is configured. We'll need this
# when we resize the cluster or restart it.
ssh_check_output(
client=master_ssh_client,
command="""
echo {m} > "$HOME/.flintrock-manifest.json"
chmod go-rw "$HOME/.flintrock-manifest.json"
started up by the provider (e.g. EC2, GCE, etc.) they're hosted on
and are running.
"""
self.load_manifest(user=user, identity_file=identity_file)
partial_func = functools.partial(
start_node,
services=self.services,
user=user,
identity_file=identity_file,
cluster=self)
hosts = [self.master_ip] + self.slave_ips
run_against_hosts(partial_func=partial_func, hosts=hosts)
master_ssh_client = get_ssh_client(
user=user,
host=self.master_ip,
identity_file=identity_file)
with master_ssh_client:
for service in self.services:
service.configure_master(
ssh_client=master_ssh_client,
cluster=self)
for service in self.services:
service.health_check(master_host=self.master_ip)
add_slaves(self, *, user: str, identity_file: str, num_slaves: int, **provider_specific_options)
This method should be called after the new hosts are online and have been
added to the cluster's internal list.
"""
hosts = [self.master_ip] + self.slave_ips
partial_func = functools.partial(
add_slaves_node,
services=self.services,
user=user,
identity_file=identity_file,
cluster=self,
new_hosts=new_hosts)
run_against_hosts(partial_func=partial_func, hosts=hosts)
master_ssh_client = get_ssh_client(
user=user,
host=self.master_ip,
identity_file=identity_file)
with master_ssh_client:
for service in self.services:
service.configure_master(
ssh_client=master_ssh_client,
cluster=self)
user: str,
host: str,
identity_file: str,
services: list,
cluster: FlintrockCluster,
new_hosts: list):
"""
If the node is new, set it up. If not, just reconfigure it to recognize
the newly added nodes.
This method is role-agnostic; it runs on both the cluster master and slaves.
This method is meant to be called asynchronously.
"""
is_new_host = host in new_hosts
client = get_ssh_client(
user=user,
host=host,
identity_file=identity_file,
wait=is_new_host)
with client:
if is_new_host:
setup_node(
ssh_client=client,
services=services,
cluster=cluster)
for service in services:
service.configure(
ssh_client=client,
cluster=cluster)
def run_command_node(*, user: str, host: str, identity_file: str, command: tuple):
"""
Run a shell command on a node.
This method is role-agnostic; it runs on both the cluster master and slaves.
This method is meant to be called asynchronously.
"""
ssh_client = get_ssh_client(
user=user,
host=host,
identity_file=identity_file)
logger.info("[{h}] Running command...".format(h=host))
command_str = ' '.join(command)
with ssh_client:
ssh_check_output(
client=ssh_client,
command=command_str)
logger.info("[{h}] Command complete.".format(h=host))
def provision_node(
*,
services: list,
user: str,
host: str,
identity_file: str,
cluster: FlintrockCluster):
"""
Connect to a freshly launched node, set it up for SSH access, configure ephemeral
storage, and install the specified services.
This method is role-agnostic; it runs on both the cluster master and slaves.
This method is meant to be called asynchronously.
"""
client = get_ssh_client(
user=user,
host=host,
identity_file=identity_file,
wait=True)
with client:
setup_node(
ssh_client=client,
services=services,
cluster=cluster)
for service in services:
service.configure(
ssh_client=client,
cluster=cluster)