Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
file = os.path.join(envdir, file)
with open(file, 'wb') as code_file:
code_file.write(code)
# save other files to current directory
for file, content in pyfiles['other_files'].items():
# create directory (i.e. ./rom_files/)
if '/' in file:
try:
os.makedirs(os.path.join(*file.rsplit('/')[:-1]))
except OSError as e:
pass
with open(file, 'wb') as f:
f.write(content)
logger.info('[job] reply')
reply_socket.send_multipart([remote_constants.NORMAL_TAG])
return envdir
else:
logger.error("NotImplementedError:{}, received tag:{}".format(
job_address, ))
raise NotImplementedError
tag = message[0]
if tag == remote_constants.NORMAL_TAG:
ret = loads_return(message[1])
elif tag == remote_constants.EXCEPTION_TAG:
error_str = to_str(message[1])
self.job_shutdown = True
raise RemoteError(attr, error_str)
elif tag == remote_constants.ATTRIBUTE_EXCEPTION_TAG:
error_str = to_str(message[1])
self.job_shutdown = True
raise RemoteAttributeError(attr, error_str)
elif tag == remote_constants.SERIALIZE_EXCEPTION_TAG:
error_str = to_str(message[1])
self.job_shutdown = True
raise RemoteSerializeError(attr, error_str)
elif tag == remote_constants.DESERIALIZE_EXCEPTION_TAG:
error_str = to_str(message[1])
self.job_shutdown = True
raise RemoteDeserializeError(attr, error_str)
else:
self.job_shutdown = True
raise NotImplementedError()
self.internal_lock.release()
return ret
args, kwargs = cloudpickle.loads(message[2])
obj = cls(*args, **kwargs)
except Exception as e:
traceback_str = str(traceback.format_exc())
error_str = str(e)
logger.error("traceback:\n{}".format(traceback_str))
reply_socket.send_multipart([
remote_constants.EXCEPTION_TAG,
to_byte(error_str + "\ntraceback:\n" + traceback_str)
])
return None
reply_socket.send_multipart([remote_constants.NORMAL_TAG])
else:
logger.error("Message from job {}".format(message))
reply_socket.send_multipart([
remote_constants.EXCEPTION_TAG,
b"[job]Unkonwn tag when tried to receive the class definition"
])
raise NotImplementedError
return obj
# a client submits a job to the master
elif tag == remote_constants.CLIENT_SUBMIT_TAG:
# check available CPU resources
if self.cpu_num:
logger.info("Submitting job...")
job = self.job_center.request_job()
self.client_socket.send_multipart([
remote_constants.NORMAL_TAG,
to_byte(job.job_address),
to_byte(job.client_heartbeat_address),
to_byte(job.ping_heartbeat_address),
])
self._print_workers()
else:
self.client_socket.send_multipart([remote_constants.CPU_TAG])
# a worker updates
elif tag == remote_constants.NEW_JOB_TAG:
initialized_job = cloudpickle.loads(message[1])
last_job_address = to_str(message[2])
self.client_socket.send_multipart([remote_constants.NORMAL_TAG])
self.job_center.update_job(last_job_address, initialized_job,
initialized_job.worker_address)
logger.info("A worker updated. cpu_num:{}".format(self.cpu_num))
self._print_workers()
# check before start a worker
elif tag == remote_constants.NORMAL_TAG:
self.client_socket.send_multipart([remote_constants.NORMAL_TAG])
def _create_client_monitor(self, client_heartbeat_address):
"""When a new client connects to the master, a socket is created to
send heartbeat signals to the client.
"""
client_heartbeat_socket = self.ctx.socket(zmq.REQ)
client_heartbeat_socket.linger = 0
client_heartbeat_socket.setsockopt(
zmq.RCVTIMEO, remote_constants.HEARTBEAT_TIMEOUT_S * 1000)
client_heartbeat_socket.connect("tcp://" + client_heartbeat_address)
client_is_alive = True
while client_is_alive and self.master_is_alive:
try:
client_heartbeat_socket.send_multipart(
[remote_constants.HEARTBEAT_TAG])
client_status = client_heartbeat_socket.recv_multipart()
self.cluster_monitor.update_client_status(
client_status, client_heartbeat_address,
self.client_hostname[client_heartbeat_address])
except zmq.error.Again as e:
client_is_alive = False
self.cluster_monitor.drop_cluster_status(
client_heartbeat_address)
logger.warning("[Master] cannot connect to the client " +
"{}. ".format(client_heartbeat_address) +
"Please check if it is still alive.")
time.sleep(remote_constants.HEARTBEAT_INTERVAL_S)
logger.warning("Master exits client monitor for {}.\n".format(
client_heartbeat_address))
"Please add more CPU resources to the "
"master or try again later.")
self.internal_lock = threading.Lock()
# Send actor commands like `init` and `call` to the job.
self.job_socket = self.ctx.socket(zmq.REQ)
self.job_socket.linger = 0
self.job_socket.connect("tcp://{}".format(job_address))
self.job_address = job_address
self.job_shutdown = False
self.send_file(self.job_socket)
self.job_socket.send_multipart([
remote_constants.INIT_OBJECT_TAG,
cloudpickle.dumps(cls),
cloudpickle.dumps([args, kwargs]),
])
message = self.job_socket.recv_multipart()
tag = message[0]
if tag == remote_constants.EXCEPTION_TAG:
traceback_str = to_str(message[1])
self.job_shutdown = True
raise RemoteError('__init__', traceback_str)
def _receive_message(self):
"""Master node will receive various types of message: (1) worker
connection; (2) worker update; (3) client connection; (4) job
submittion; (5) reset job.
"""
message = self.client_socket.recv_multipart()
tag = message[0]
# a new worker connects to the master
if tag == remote_constants.WORKER_CONNECT_TAG:
self.client_socket.send_multipart([remote_constants.NORMAL_TAG])
elif tag == remote_constants.MONITOR_TAG:
status = self._get_status()
self.client_socket.send_multipart(
[remote_constants.NORMAL_TAG, status])
# `xparl status` command line API
elif tag == remote_constants.STATUS_TAG:
status_info = self.cluster_monitor.get_status_info()
self.client_socket.send_multipart(
[remote_constants.NORMAL_TAG,
to_byte(status_info)])
elif tag == remote_constants.WORKER_INITIALIZED_TAG:
initialized_worker = cloudpickle.loads(message[1])
def wrapper(*args, **kwargs):
self.internal_lock.acquire()
data = dumps_argument(*args, **kwargs)
self.command_socket.send_multipart(
[remote_constants.NORMAL_TAG,
to_byte(attr), data])
message = self.command_socket.recv_multipart()
tag = message[0]
if tag == remote_constants.NORMAL_TAG:
ret = loads_return(message[1])
elif tag == remote_constants.EXCEPTION_TAG:
error_str = to_str(message[1])
raise RemoteError(attr, error_str)
elif tag == remote_constants.ATTRIBUTE_EXCEPTION_TAG:
error_str = to_str(message[1])
raise RemoteAttributeError(attr, error_str)
elif tag == remote_constants.SERIALIZE_EXCEPTION_TAG:
error_str = to_str(message[1])
raise RemoteSerializeError(attr, error_str)
elif tag == remote_constants.DESERIALIZE_EXCEPTION_TAG:
error_str = to_str(message[1])
raise RemoteDeserializeError(attr, error_str)
else:
raise NotImplementedError()
def _reply_kill_job(self):
"""Worker starts a thread to wait jobs' commands to kill the job"""
self.kill_job_socket.linger = 0
self.kill_job_socket.setsockopt(
zmq.RCVTIMEO, remote_constants.HEARTBEAT_RCVTIMEO_S * 1000)
while self.worker_is_alive and self.master_is_alive:
try:
message = self.kill_job_socket.recv_multipart()
tag = message[0]
assert tag == remote_constants.KILLJOB_TAG
to_kill_job_address = to_str(message[1])
self._kill_job(to_kill_job_address)
self.kill_job_socket.send_multipart(
[remote_constants.NORMAL_TAG])
except zmq.error.Again as e:
#detect whether `self.worker_is_alive` is True periodically
pass
to_byte(error_str)
])
raise DeserializeError
else:
traceback_str = str(traceback.format_exc())
logger.error("traceback:\n{}".format(traceback_str))
reply_socket.send_multipart([
remote_constants.EXCEPTION_TAG,
to_byte(error_str + "\ntraceback:\n" +
traceback_str)
])
break
# receive DELETE_TAG from actor, and stop replying worker heartbeat
elif tag == remote_constants.KILLJOB_TAG:
reply_socket.send_multipart([remote_constants.NORMAL_TAG])
logger.warning("An actor exits and this job {} will exit.".
format(job_address))
break
else:
logger.error(
"The job receives an unknown message: {}".format(message))
raise NotImplementedError