Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _create_jobs(self):
"""Create jobs and send a instance of ``InitializedWorker`` that contains the worker information to the master."""
try:
self.request_master_socket.send_multipart(
[remote_constants.WORKER_CONNECT_TAG])
_ = self.request_master_socket.recv_multipart()
except zmq.error.Again as e:
logger.error("Can not connect to the master, "
"please check if master is started.")
self.master_is_alive = False
return
initialized_jobs = self._init_jobs(job_num=self.cpu_num)
self.request_master_socket.setsockopt(
zmq.RCVTIMEO, remote_constants.HEARTBEAT_TIMEOUT_S * 1000)
self.reply_master_hearbeat_thread = threading.Thread(
target=self._reply_heartbeat,
args=("master {}".format(self.master_address), ))
self.reply_master_hearbeat_thread.start()
self.heartbeat_socket_initialized.wait()
for job in initialized_jobs:
job.worker_address = self.master_heartbeat_address
def send_file(self, socket):
try:
socket.send_multipart([
remote_constants.SEND_FILE_TAG,
self.GLOBAL_CLIENT.pyfiles
])
_ = socket.recv_multipart()
except zmq.error.Again as e:
logger.error("Send python files failed.")
A local instance of the remote class object.
"""
message = reply_socket.recv_multipart()
tag = message[0]
obj = None
if tag == remote_constants.INIT_OBJECT_TAG:
try:
cls = cloudpickle.loads(message[1])
args, kwargs = cloudpickle.loads(message[2])
obj = cls(*args, **kwargs)
except Exception as e:
traceback_str = str(traceback.format_exc())
error_str = str(e)
logger.error("traceback:\n{}".format(traceback_str))
reply_socket.send_multipart([
remote_constants.EXCEPTION_TAG,
to_byte(error_str + "\ntraceback:\n" + traceback_str)
])
return None
reply_socket.send_multipart([remote_constants.NORMAL_TAG])
else:
logger.error("Message from job {}".format(message))
reply_socket.send_multipart([
remote_constants.EXCEPTION_TAG,
b"[job]Unkonwn tag when tried to receive the class definition"
])
raise NotImplementedError
return obj
""" Sometimes the client may receive a job that is dead, thus
we have to check if this job is still alive before sending it to the actor.
"""
# job_heartbeat_socket: sends heartbeat signal to job
job_heartbeat_socket = self.ctx.socket(zmq.REQ)
job_heartbeat_socket.linger = 0
job_heartbeat_socket.setsockopt(zmq.RCVTIMEO, int(0.9 * 1000))
job_heartbeat_socket.connect("tcp://" + ping_heartbeat_address)
try:
job_heartbeat_socket.send_multipart(
[remote_constants.HEARTBEAT_TAG,
to_byte(str(max_memory))])
job_heartbeat_socket.recv_multipart()
except zmq.error.Again:
job_heartbeat_socket.close(0)
logger.error(
"[Client] connects to a finished job, will try again, ping_heartbeat_address:{}"
.format(ping_heartbeat_address))
return False
job_heartbeat_socket.disconnect("tcp://" + ping_heartbeat_address)
job_heartbeat_socket.connect("tcp://" + job_heartbeat_address)
job_heartbeat_socket.setsockopt(
zmq.RCVTIMEO, remote_constants.HEARTBEAT_TIMEOUT_S * 1000)
# a thread for sending heartbeat signals to job
thread = threading.Thread(
target=self._create_job_monitor, args=(job_heartbeat_socket, ))
thread.setDaemon(True)
thread.start()
return True
def _check_remote_status(self):
while True:
for remote_id in list(self.remote_latest_timestamp.keys()):
if time.time() - self.remote_latest_timestamp[
remote_id] > 3 * remote_constants.HEARTBEAT_INTERVAL_S:
logger.error(
'Remote object {} is lost, please check if anything wrong happens in the remote client'
.format(remote_id))
self.remote_latest_timestamp.pop(remote_id)
time.sleep(3 * remote_constants.HEARTBEAT_INTERVAL_S)
reply_socket.send_multipart([
remote_constants.SERIALIZE_EXCEPTION_TAG,
to_byte(error_str)
])
raise SerializeError
elif type(e) == DeserializeError:
reply_socket.send_multipart([
remote_constants.DESERIALIZE_EXCEPTION_TAG,
to_byte(error_str)
])
raise DeserializeError
else:
traceback_str = str(traceback.format_exc())
logger.error("traceback:\n{}".format(traceback_str))
reply_socket.send_multipart([
remote_constants.EXCEPTION_TAG,
to_byte(error_str + "\ntraceback:\n" +
traceback_str)
])
break
# receive DELETE_TAG from actor, and stop replying worker heartbeat
elif tag == remote_constants.KILLJOB_TAG:
reply_socket.send_multipart([remote_constants.NORMAL_TAG])
logger.warning("An actor exits and this job {} will exit.".
format(job_address))
break
else:
logger.error(
"The job receives an unknown message: {}".format(message))
try:
cls = cloudpickle.loads(message[1])
args, kwargs = cloudpickle.loads(message[2])
obj = cls(*args, **kwargs)
except Exception as e:
traceback_str = str(traceback.format_exc())
error_str = str(e)
logger.error("traceback:\n{}".format(traceback_str))
reply_socket.send_multipart([
remote_constants.EXCEPTION_TAG,
to_byte(error_str + "\ntraceback:\n" + traceback_str)
])
return None
reply_socket.send_multipart([remote_constants.NORMAL_TAG])
else:
logger.error("Message from job {}".format(message))
reply_socket.send_multipart([
remote_constants.EXCEPTION_TAG,
b"[job]Unkonwn tag when tried to receive the class definition"
])
raise NotImplementedError
return obj