Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_java_major_version(client: paramiko.client.SSHClient):
possible_cmds = [
"$JAVA_HOME/bin/java -version",
"java -version"
]
for command in possible_cmds:
try:
output = ssh_check_output(
client=client,
command=command)
tokens = output.split()
# First line of the output is like: 'java version "1.8.0_20"'
# Get the version string and strip out the first two parts of the
# version as a tuple: (1, 8)
if len(tokens) >= 3:
version_parts = tokens[2].strip('"').split(".")
if len(version_parts) >= 2:
return tuple(int(part) for part in version_parts[:2])
except SSHError:
pass
return None
run_against_hosts(partial_func=partial_func, hosts=hosts)
master_ssh_client = get_ssh_client(
user=user,
host=cluster.master_host,
identity_file=identity_file)
with master_ssh_client:
manifest = {
'services': [[type(m).__name__, m.manifest] for m in services],
'ssh_key_pair': cluster.ssh_key_pair._asdict(),
}
# The manifest tells us how the cluster is configured. We'll need this
# when we resize the cluster or restart it.
ssh_check_output(
client=master_ssh_client,
command="""
echo {m} > "$HOME/.flintrock-manifest.json"
chmod go-rw "$HOME/.flintrock-manifest.json"
""".format(
m=shlex.quote(json.dumps(manifest, indent=4, sort_keys=True))
))
for service in services:
service.configure_master(
ssh_client=master_ssh_client,
cluster=cluster)
for service in services:
service.health_check(master_host=cluster.master_host)
command="""
python /tmp/download-package.py "{download_source}" "spark"
""".format(
version=self.version,
download_source=self.download_source.format(v=self.version),
))
else:
ssh_check_output(
client=ssh_client,
command="""
set -e
sudo yum install -y git
sudo yum install -y java-devel
""")
ssh_check_output(
client=ssh_client,
command="""
set -e
git clone {repo} spark
cd spark
git reset --hard {commit}
if [ -e "make-distribution.sh" ]; then
./make-distribution.sh -Phadoop-{hadoop_short_version}
else
./dev/make-distribution.sh -Phadoop-{hadoop_short_version}
fi
""".format(
repo=shlex.quote(self.git_repository),
commit=shlex.quote(self.git_commit),
# Hardcoding this here until we figure out a better way to handle
# the supported build profiles.
about installed services and configured storage.
Providers shouldn't need to override this method.
"""
if not self.master_ip:
return
master_ssh_client = get_ssh_client(
user=user,
host=self.master_ip,
identity_file=identity_file,
wait=True,
print_status=False)
with master_ssh_client:
manifest_raw = ssh_check_output(
client=master_ssh_client,
command="""
cat "$HOME/.flintrock-manifest.json"
""")
# TODO: Would it be better if storage (ephemeral and otherwise) was
# implemented as a Flintrock service and tracked in the manifest?
ephemeral_dirs_raw = ssh_check_output(
client=master_ssh_client,
# It's generally safer to avoid using ls:
# http://mywiki.wooledge.org/ParsingLs
command="""
shopt -s nullglob
for f in /media/ephemeral*; do
echo "$f"
done
""")
def setup_node(
*,
# Change this to take host, user, and identity_file?
# Add some kind of caching for SSH connections so that they
# can be looked up by host and reused?
ssh_client: paramiko.client.SSHClient,
services: list,
cluster: FlintrockCluster):
"""
Setup a new node.
Cluster methods like provision_node() and add_slaves_node() should
delegate the main work of setting up new nodes to this function.
"""
host = ssh_client.get_transport().getpeername()[0]
ssh_check_output(
client=ssh_client,
command="""
set -e
echo {private_key} > "$HOME/.ssh/id_rsa"
echo {public_key} >> "$HOME/.ssh/authorized_keys"
chmod 400 "$HOME/.ssh/id_rsa"
""".format(
private_key=shlex.quote(cluster.ssh_key_pair.private),
public_key=shlex.quote(cluster.ssh_key_pair.public)))
with ssh_client.open_sftp() as sftp:
sftp.put(
localpath=os.path.join(SCRIPTS_DIR, 'setup-ephemeral-storage.py'),
remotepath='/tmp/setup-ephemeral-storage.py')
Run a shell command on a node.
This method is role-agnostic; it runs on both the cluster master and slaves.
This method is meant to be called asynchronously.
"""
ssh_client = get_ssh_client(
user=user,
host=host,
identity_file=identity_file)
logger.info("[{h}] Running command...".format(h=host))
command_str = ' '.join(command)
with ssh_client:
ssh_check_output(
client=ssh_client,
command=command_str)
logger.info("[{h}] Command complete.".format(h=host))
def ensure_java8(client: paramiko.client.SSHClient):
host = client.get_transport().getpeername()[0]
java_major_version = get_java_major_version(client)
if not java_major_version or java_major_version < (1, 8):
logger.info("[{h}] Installing Java 1.8...".format(h=host))
ssh_check_output(
client=client,
command="""
set -e
echo {public_key} >> "$HOME/.ssh/authorized_keys"
chmod 400 "$HOME/.ssh/id_rsa"
""".format(
private_key=shlex.quote(cluster.ssh_key_pair.private),
public_key=shlex.quote(cluster.ssh_key_pair.public)))
with ssh_client.open_sftp() as sftp:
sftp.put(
localpath=os.path.join(SCRIPTS_DIR, 'setup-ephemeral-storage.py'),
remotepath='/tmp/setup-ephemeral-storage.py')
logger.info("[{h}] Configuring ephemeral storage...".format(h=host))
# TODO: Print some kind of warning if storage is large, since formatting
# will take several minutes (~4 minutes for 2TB).
storage_dirs_raw = ssh_check_output(
client=ssh_client,
command="""
set -e
python /tmp/setup-ephemeral-storage.py
rm -f /tmp/setup-ephemeral-storage.py
""")
storage_dirs = json.loads(storage_dirs_raw)
cluster.storage_dirs.root = storage_dirs['root']
cluster.storage_dirs.ephemeral = storage_dirs['ephemeral']
ensure_java8(ssh_client)
for service in services:
try:
service.install(