Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.submit_job_socket = self.ctx.socket(zmq.REQ)
self.submit_job_socket.linger = 0
self.submit_job_socket.setsockopt(
zmq.RCVTIMEO, remote_constants.HEARTBEAT_TIMEOUT_S * 1000)
self.submit_job_socket.connect("tcp://{}".format(master_address))
self.start_time = time.time()
thread = threading.Thread(target=self._reply_heartbeat)
thread.setDaemon(True)
thread.start()
self.heartbeat_socket_initialized.wait()
# check if the master is connected properly
try:
self.submit_job_socket.send_multipart([
remote_constants.CLIENT_CONNECT_TAG,
to_byte(self.heartbeat_master_address),
to_byte(socket.gethostname())
])
_ = self.submit_job_socket.recv_multipart()
except zmq.error.Again as e:
logger.warning("[Client] Can not connect to the master, please "
"check if master is started and ensure the input "
"address {} is correct.".format(master_address))
self.master_is_alive = False
raise Exception("Client can not connect to the master, please "
"check if master is started and ensure the input "
"address {} is correct.".format(master_address))
logger.error(
"Memory used by this job exceeds {}. This job will exist."
.format(self.max_memory))
time.sleep(5)
socket.close(0)
os._exit(1)
except zmq.error.Again as e:
logger.warning(
"[Job] Cannot connect to the client. This job will exit and inform the worker."
)
break
socket.close(0)
with self.lock:
self.kill_job_socket.send_multipart(
[remote_constants.KILLJOB_TAG,
to_byte(self.job_address)])
try:
_ = self.kill_job_socket.recv_multipart()
except zmq.error.Again as e:
pass
logger.warning("[Job]lost connection with the client, will exit")
os._exit(1)
message = reply_socket.recv_multipart()
tag = message[0]
obj = None
if tag == remote_constants.INIT_OBJECT_TAG:
try:
cls = cloudpickle.loads(message[1])
args, kwargs = cloudpickle.loads(message[2])
obj = cls(*args, **kwargs)
except Exception as e:
traceback_str = str(traceback.format_exc())
error_str = str(e)
logger.error("traceback:\n{}".format(traceback_str))
reply_socket.send_multipart([
remote_constants.EXCEPTION_TAG,
to_byte(error_str + "\ntraceback:\n" + traceback_str)
])
return None
reply_socket.send_multipart([remote_constants.NORMAL_TAG])
else:
logger.error("Message from job {}".format(message))
reply_socket.send_multipart([
remote_constants.EXCEPTION_TAG,
b"[job]Unkonwn tag when tried to receive the class definition"
])
raise NotImplementedError
return obj
# a new worker connects to the master
if tag == remote_constants.WORKER_CONNECT_TAG:
self.client_socket.send_multipart([remote_constants.NORMAL_TAG])
elif tag == remote_constants.MONITOR_TAG:
status = self._get_status()
self.client_socket.send_multipart(
[remote_constants.NORMAL_TAG, status])
# `xparl status` command line API
elif tag == remote_constants.STATUS_TAG:
status_info = self.cluster_monitor.get_status_info()
self.client_socket.send_multipart(
[remote_constants.NORMAL_TAG,
to_byte(status_info)])
elif tag == remote_constants.WORKER_INITIALIZED_TAG:
initialized_worker = cloudpickle.loads(message[1])
worker_address = initialized_worker.worker_address
self.job_center.add_worker(initialized_worker)
hostname = self.job_center.get_hostname(worker_address)
self.cluster_monitor.add_worker_status(worker_address, hostname)
logger.info("A new worker {} is added, ".format(worker_address) +
"the cluster has {} CPUs.\n".format(self.cpu_num))
# a thread for sending heartbeat signals to `worker.address`
thread = threading.Thread(
target=self._create_worker_monitor,
args=(initialized_worker.worker_address, ))
thread.start()
def wrapper(*args, **kwargs):
self.internal_lock.acquire()
data = dumps_argument(*args, **kwargs)
self.command_socket.send_multipart(
[remote_constants.NORMAL_TAG,
to_byte(attr), data])
message = self.command_socket.recv_multipart()
tag = message[0]
if tag == remote_constants.NORMAL_TAG:
ret = loads_return(message[1])
elif tag == remote_constants.EXCEPTION_TAG:
error_str = to_str(message[1])
raise RemoteError(attr, error_str)
elif tag == remote_constants.ATTRIBUTE_EXCEPTION_TAG:
error_str = to_str(message[1])
raise RemoteAttributeError(attr, error_str)
elif tag == remote_constants.SERIALIZE_EXCEPTION_TAG:
error_str = to_str(message[1])
raise RemoteSerializeError(attr, error_str)
elif tag == remote_constants.DESERIALIZE_EXCEPTION_TAG:
error_str = to_str(message[1])
target=self._create_client_monitor,
args=(client_heartbeat_address, ))
thread.start()
self.client_socket.send_multipart([remote_constants.NORMAL_TAG])
# a client submits a job to the master
elif tag == remote_constants.CLIENT_SUBMIT_TAG:
# check available CPU resources
if self.cpu_num:
logger.info("Submitting job...")
job = self.job_center.request_job()
self.client_socket.send_multipart([
remote_constants.NORMAL_TAG,
to_byte(job.job_address),
to_byte(job.client_heartbeat_address),
to_byte(job.ping_heartbeat_address),
])
self._print_workers()
else:
self.client_socket.send_multipart([remote_constants.CPU_TAG])
# a worker updates
elif tag == remote_constants.NEW_JOB_TAG:
initialized_job = cloudpickle.loads(message[1])
last_job_address = to_str(message[2])
self.client_socket.send_multipart([remote_constants.NORMAL_TAG])
self.job_center.update_job(last_job_address, initialized_job,
initialized_job.worker_address)
logger.info("A worker updated. cpu_num:{}".format(self.cpu_num))
if not initialized_job.is_alive: # make sure that the job is still alive.
self.worker_status.remove_job(
initialized_job.job_address)
continue
else:
logger.warning(
"[Worker] a dead job found. The job buffer will not accept this one."
)
if initialized_job.is_alive:
break
self.lock.acquire()
self.request_master_socket.send_multipart([
remote_constants.NEW_JOB_TAG,
cloudpickle.dumps(initialized_job),
to_byte(job_address)
])
_ = self.request_master_socket.recv_multipart()
self.lock.release()
thread = threading.Thread(
target=self._create_client_monitor,
args=(client_heartbeat_address, ))
thread.start()
self.client_socket.send_multipart([remote_constants.NORMAL_TAG])
# a client submits a job to the master
elif tag == remote_constants.CLIENT_SUBMIT_TAG:
# check available CPU resources
if self.cpu_num:
logger.info("Submitting job...")
job = self.job_center.request_job()
self.client_socket.send_multipart([
remote_constants.NORMAL_TAG,
to_byte(job.job_address),
to_byte(job.client_heartbeat_address),
to_byte(job.ping_heartbeat_address),
])
self._print_workers()
else:
self.client_socket.send_multipart([remote_constants.CPU_TAG])
# a worker updates
elif tag == remote_constants.NEW_JOB_TAG:
initialized_job = cloudpickle.loads(message[1])
last_job_address = to_str(message[2])
self.client_socket.send_multipart([remote_constants.NORMAL_TAG])
self.job_center.update_job(last_job_address, initialized_job,
initialized_job.worker_address)
logger.info("A worker updated. cpu_num:{}".format(self.cpu_num))