Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if tf.test.is_built_with_cuda():
# compute my index relative to other nodes on the same host (for GPU allocation)
my_addr = cluster_spec[job_name][task_index]
my_host = my_addr.split(':')[0]
flattened = [v for sublist in cluster_spec.values() for v in sublist]
local_peers = [p for p in flattened if p.startswith(my_host)]
my_index = local_peers.index(my_addr)
num_gpus = tf_args.num_gpus if 'num_gpus' in tf_args else 1
gpus_to_use = gpu_info.get_gpus(num_gpus, my_index)
gpu_str = "GPUs" if num_gpus > 1 else "GPU"
logger.debug("Requested {} {}, setting CUDA_VISIBLE_DEVICES={}".format(num_gpus, gpu_str, gpus_to_use))
os.environ['CUDA_VISIBLE_DEVICES'] = gpus_to_use
# create a context object to hold metadata for TF
ctx = TFNodeContext(executor_id, job_name, task_index, cluster_spec, cluster_meta['default_fs'], cluster_meta['working_dir'], TFSparkNode.mgr)
# release port reserved for TF as late as possible
if tmp_sock is not None:
tmp_sock.close()
# Background mode relies reuse of python worker in Spark.
if background:
# However, reuse of python worker can't work on Windows, we need to check if the current
# script runs on Windows or not.
if os.name == 'nt' or platform.system() == 'Windows':
raise Exception("Background mode is not supported on Windows.")
# Check if the config of reuse python worker is enabled on Spark.
if not os.environ.get("SPARK_REUSE_WORKER"):
raise Exception("Background mode relies reuse of python worker on Spark. This config 'spark.python.worker.reuse' is not enabled on Spark. Please enable it before using background.")
def wrapper_fn(args, context):
for node in cluster_info:
if node['host'] == host and node['executor_id'] == executor_id:
addr = node['addr']
authkey = node['authkey']
TFSparkNode.mgr = TFManager.connect(addr, authkey)
break
if TFSparkNode.mgr is None:
msg = "No TFManager found on this node, please ensure that:\n" + \
"1. Spark num_executors matches TensorFlow cluster_size\n" + \
"2. Spark cores/tasks per executor is 1.\n" + \
"3. Spark dynamic allocation is disabled."
raise Exception(msg)
logger.info("Connected to TFSparkNode.mgr on {0}, executor={1}, state={2}".format(host, executor_id, str(TFSparkNode.mgr.get('state'))))
return TFSparkNode.mgr
"""
for node in cluster_info:
if node['host'] == host and node['executor_id'] == executor_id:
addr = node['addr']
authkey = node['authkey']
TFSparkNode.mgr = TFManager.connect(addr, authkey)
break
if TFSparkNode.mgr is None:
msg = "No TFManager found on this node, please ensure that:\n" + \
"1. Spark num_executors matches TensorFlow cluster_size\n" + \
"2. Spark cores/tasks per executor is 1.\n" + \
"3. Spark dynamic allocation is disabled."
raise Exception(msg)
logger.info("Connected to TFSparkNode.mgr on {0}, executor={1}, state={2}".format(host, executor_id, str(TFSparkNode.mgr.get('state'))))
return TFSparkNode.mgr
except Exception:
errq.put(traceback.format_exc())
if job_name in ('ps', 'evaluator') or background:
# invoke the TensorFlow main function in a background thread
logger.info("Starting TensorFlow {0}:{1} as {2} on cluster node {3} on background process".format(
job_name, task_index, job_name, executor_id))
p = multiprocessing.Process(target=wrapper_fn_background, args=(tf_args, ctx))
if job_name in ('ps', 'evaluator'):
p.daemon = True
p.start()
# for ps and evaluator nodes, wait indefinitely in foreground thread for a "control" event (None == "stop")
if job_name in ('ps', 'evaluator'):
queue = TFSparkNode.mgr.get_queue('control')
equeue = TFSparkNode.mgr.get_queue('error')
done = False
while not done:
while (queue.empty() and equeue.empty()):
time.sleep(1)
if (not equeue.empty()):
e_str = equeue.get()
raise Exception("Exception in " + job_name + ":\n" + e_str)
msg = queue.get(block=True)
logger.info("Got msg: {0}".format(msg))
if msg is None:
logger.info("Terminating {}".format(job_name))
TFSparkNode.mgr.set('state', 'stopped')
done = True
queue.task_done()
else:
# check for existing TFManagers
if TFSparkNode.mgr is not None and str(TFSparkNode.mgr.get('state')) != "'stopped'":
if TFSparkNode.cluster_id == cluster_id:
# raise an exception to force Spark to retry this "reservation" task on another executor
raise Exception("TFManager already started on {0}, executor={1}, state={2}".format(host, executor_id, str(TFSparkNode.mgr.get("state"))))
else:
# old state, just continue with creating new manager
logger.warn("Ignoring old TFManager with cluster_id {0}, requested cluster_id {1}".format(TFSparkNode.cluster_id, cluster_id))
# start a TFManager and get a free port
# use a random uuid as the authkey
authkey = uuid.uuid4().bytes
addr = None
if job_name in ('ps', 'evaluator'):
# PS nodes must be remotely accessible in order to shutdown from Spark driver.
TFSparkNode.mgr = TFManager.start(authkey, ['control', 'error'], 'remote')
addr = (host, TFSparkNode.mgr.address[1])
else:
# worker nodes only need to be locally accessible within the executor for data feeding
TFSparkNode.mgr = TFManager.start(authkey, queues)
addr = TFSparkNode.mgr.address
# initialize mgr state
TFSparkNode.mgr.set('state', 'running')
TFSparkNode.cluster_id = cluster_id
# expand Hadoop classpath wildcards for JNI (Spark 2.x)
if 'HADOOP_PREFIX' in os.environ:
classpath = os.environ['CLASSPATH']
hadoop_path = os.path.join(os.environ['HADOOP_PREFIX'], 'bin', 'hadoop')
hadoop_classpath = subprocess.check_output([hadoop_path, 'classpath', '--glob']).decode()
logger.debug("CLASSPATH: {0}".format(hadoop_classpath))
cluster_id = cluster_meta['id']
cluster_template = cluster_meta['cluster_template']
for jobtype in cluster_template:
nodes = cluster_template[jobtype]
if executor_id in nodes:
job_name = jobtype
task_index = nodes.index(executor_id)
break
# get unique key (hostname, executor_id) for this executor
host = util.get_ip_address()
util.write_executor_id(executor_id)
port = 0
# check for existing TFManagers
if TFSparkNode.mgr is not None and str(TFSparkNode.mgr.get('state')) != "'stopped'":
if TFSparkNode.cluster_id == cluster_id:
# raise an exception to force Spark to retry this "reservation" task on another executor
raise Exception("TFManager already started on {0}, executor={1}, state={2}".format(host, executor_id, str(TFSparkNode.mgr.get("state"))))
else:
# old state, just continue with creating new manager
logger.warn("Ignoring old TFManager with cluster_id {0}, requested cluster_id {1}".format(TFSparkNode.cluster_id, cluster_id))
# start a TFManager and get a free port
# use a random uuid as the authkey
authkey = uuid.uuid4().bytes
addr = None
if job_name in ('ps', 'evaluator'):
# PS nodes must be remotely accessible in order to shutdown from Spark driver.
TFSparkNode.mgr = TFManager.start(authkey, ['control', 'error'], 'remote')
addr = (host, TFSparkNode.mgr.address[1])
else:
# start a TFManager and get a free port
# use a random uuid as the authkey
authkey = uuid.uuid4().bytes
addr = None
if job_name in ('ps', 'evaluator'):
# PS nodes must be remotely accessible in order to shutdown from Spark driver.
TFSparkNode.mgr = TFManager.start(authkey, ['control', 'error'], 'remote')
addr = (host, TFSparkNode.mgr.address[1])
else:
# worker nodes only need to be locally accessible within the executor for data feeding
TFSparkNode.mgr = TFManager.start(authkey, queues)
addr = TFSparkNode.mgr.address
# initialize mgr state
TFSparkNode.mgr.set('state', 'running')
TFSparkNode.cluster_id = cluster_id
# expand Hadoop classpath wildcards for JNI (Spark 2.x)
if 'HADOOP_PREFIX' in os.environ:
classpath = os.environ['CLASSPATH']
hadoop_path = os.path.join(os.environ['HADOOP_PREFIX'], 'bin', 'hadoop')
hadoop_classpath = subprocess.check_output([hadoop_path, 'classpath', '--glob']).decode()
logger.debug("CLASSPATH: {0}".format(hadoop_classpath))
os.environ['CLASSPATH'] = classpath + os.pathsep + hadoop_classpath
# start TensorBoard if requested
tb_pid = 0
tb_port = 0
if tensorboard and job_name == 'worker' and task_index == 0:
tb_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tb_sock.bind(('', 0))
def _get_manager(cluster_info, host, executor_id):
"""Returns this executor's "singleton" instance of the multiprocessing.Manager, reconnecting per python-worker if needed.
Args:
:cluster_info: cluster node reservations
:host: host IP address
:executor_id: unique id per executor (created during initial call to run())
Returns:
TFManager instance for this executor/python-worker
"""
for node in cluster_info:
if node['host'] == host and node['executor_id'] == executor_id:
addr = node['addr']
authkey = node['authkey']
TFSparkNode.mgr = TFManager.connect(addr, authkey)
break
if TFSparkNode.mgr is None:
msg = "No TFManager found on this node, please ensure that:\n" + \
"1. Spark num_executors matches TensorFlow cluster_size\n" + \
"2. Spark cores/tasks per executor is 1.\n" + \
"3. Spark dynamic allocation is disabled."
raise Exception(msg)
logger.info("Connected to TFSparkNode.mgr on {0}, executor={1}, state={2}".format(host, executor_id, str(TFSparkNode.mgr.get('state'))))
return TFSparkNode.mgr
if TFSparkNode.mgr is not None and str(TFSparkNode.mgr.get('state')) != "'stopped'":
if TFSparkNode.cluster_id == cluster_id:
# raise an exception to force Spark to retry this "reservation" task on another executor
raise Exception("TFManager already started on {0}, executor={1}, state={2}".format(host, executor_id, str(TFSparkNode.mgr.get("state"))))
else:
# old state, just continue with creating new manager
logger.warn("Ignoring old TFManager with cluster_id {0}, requested cluster_id {1}".format(TFSparkNode.cluster_id, cluster_id))
# start a TFManager and get a free port
# use a random uuid as the authkey
authkey = uuid.uuid4().bytes
addr = None
if job_name in ('ps', 'evaluator'):
# PS nodes must be remotely accessible in order to shutdown from Spark driver.
TFSparkNode.mgr = TFManager.start(authkey, ['control', 'error'], 'remote')
addr = (host, TFSparkNode.mgr.address[1])
else:
# worker nodes only need to be locally accessible within the executor for data feeding
TFSparkNode.mgr = TFManager.start(authkey, queues)
addr = TFSparkNode.mgr.address
# initialize mgr state
TFSparkNode.mgr.set('state', 'running')
TFSparkNode.cluster_id = cluster_id
# expand Hadoop classpath wildcards for JNI (Spark 2.x)
if 'HADOOP_PREFIX' in os.environ:
classpath = os.environ['CLASSPATH']
hadoop_path = os.path.join(os.environ['HADOOP_PREFIX'], 'bin', 'hadoop')
hadoop_classpath = subprocess.check_output([hadoop_path, 'classpath', '--glob']).decode()
logger.debug("CLASSPATH: {0}".format(hadoop_classpath))
os.environ['CLASSPATH'] = classpath + os.pathsep + hadoop_classpath