Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_reservation_server(self):
"""Test reservation server, expecting 1 reservation"""
s = Server(1)
addr = s.start()
# add first reservation
c = Client(addr)
resp = c.register({'node': 1})
self.assertEqual(resp, 'OK')
# get list of reservations
reservations = c.get_reservations()
self.assertEqual(len(reservations), 1)
# should return immediately with list of reservations
reservations = c.await_reservations()
self.assertEqual(len(reservations), 1)
# request server stop
c.request_stop()
time.sleep(1)
self.assertEqual(s.done, True)
def reserve(num):
c = Client(addr)
# time.sleep(random.randint(0,5)) # simulate varying start times
resp = c.register({'node': num})
self.assertEqual(resp, 'OK')
c.await_reservations()
c.close()
c.await_reservations()
c.close()
# start/register clients
threads = [None] * num_clients
for i in range(num_clients):
threads[i] = threading.Thread(target=reserve, args=(i,))
threads[i].start()
# wait for clients to complete
for i in range(num_clients):
threads[i].join()
print("all done")
# get list of reservations
c = Client(addr)
reservations = c.get_reservations()
self.assertEqual(len(reservations), num_clients)
# request server stop
c.request_stop()
time.sleep(1)
self.assertEqual(s.done, True)
# Copyright 2017 Yahoo Inc.
# Licensed under the terms of the Apache 2.0 license.
# Please see LICENSE file in the project root for terms.
"""
Simple utility to shutdown a Spark StreamingContext by signaling the reservation Server.
Note: use the reservation server address (host, port) reported in the driver logs.
"""
from tensorflowonspark import reservation
import sys
if __name__ == "__main__":
host = sys.argv[1]
port = int(sys.argv[2])
addr = (host, port)
client = reservation.Client(addr)
client.request_stop()
client.close()
raise Exception("Exception in worker:\n" + e_str)
time.sleep(1)
timeout -= 1
if timeout <= 0:
raise Exception("Timeout while feeding partition")
logger.info("Processed {0} items in partition".format(count))
# check if TF is terminating feed after this partition
if not terminating:
state = str(mgr.get('state'))
terminating = state == "'terminating'"
if terminating:
try:
logger.info("TFSparkNode: requesting stop")
client = reservation.Client(cluster_meta['server_addr'])
client.request_stop()
client.close()
except Exception as e:
# ignore any errors while requesting stop
logger.debug("Error while requesting stop: {0}".format(e))
return [terminating]
search_path = os.pathsep.join([pydir, sys_path, os.environ['PATH'], os.environ['PYTHONPATH']])
tb_path = util.find_in_path(search_path, 'tensorboard') # executable in PATH
if not tb_path:
tb_path = util.find_in_path(search_path, 'tensorboard/main.py') # TF 1.3+
if not tb_path:
tb_path = util.find_in_path(search_path, 'tensorflow/tensorboard/__main__.py') # TF 1.2-
if not tb_path:
raise Exception("Unable to find 'tensorboard' in: {}".format(search_path))
# launch tensorboard
tb_proc = subprocess.Popen([pypath, tb_path, "--logdir=%s" % logdir, "--port=%d" % tb_port], env=os.environ)
tb_pid = tb_proc.pid
# check server to see if this task is being retried (i.e. already reserved)
client = reservation.Client(cluster_meta['server_addr'])
cluster_info = client.get_reservations()
tmp_sock = None
node_meta = None
for node in cluster_info:
(nhost, nexec) = (node['host'], node['executor_id'])
if nhost == host and nexec == executor_id:
node_meta = node
port = node['port']
# if not already done, register everything we need to set up the cluster
if node_meta is None:
# first, find a free port for TF
tmp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tmp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tmp_sock.bind(('', port))
port = tmp_sock.getsockname()[1]