Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def float_x1000(H, persis_info, sim_specs, _):
"""
Multiplies worker ID by 1000 and sends back values
Input (X) is ignored in this case
"""
# All values
output = np.zeros(1, dtype=sim_specs['out'])
# First test fill - even though - it will do arr_vals and scal_val
jobctl = JobController.controller
x = jobctl.workerID * 1000.0
output.fill(x) # All set
output['scal_val'] = x + x/1e7 # Make scalar value distinct
return output, persis_info
if not empty:
if 'sim_dir' in Worker.sim_specs:
#worker_dir = Worker.sim_specs['sim_dir'] + '_' + str(comm_color) + "_" + str(rank)
self.worker_dir = Worker.sim_specs['sim_dir'] + '_' + str(self.workerID)
if 'sim_dir_prefix' in Worker.sim_specs:
self.worker_dir = os.path.join(os.path.expanduser(Worker.sim_specs['sim_dir_prefix']), os.path.split(os.path.abspath(os.path.expanduser(self.worker_dir)))[1])
assert ~os.path.isdir(self.worker_dir), "Worker directory already exists."
# if not os.path.exists(worker_dir):
shutil.copytree(Worker.sim_specs['sim_dir'], self.worker_dir)
self.locations[EVAL_SIM_TAG] = self.worker_dir
#Optional - set workerID in job_controller - so will be added to jobnames
try:
jobctl = JobController.controller
jobctl.set_workerID(workerID)
except Exception as e:
#logger
print("Info: No job_controller set on worker", workerID)
self.job_controller_set = False
else:
self.job_controller_set = True
#jobctl.set_workerID(workerID)
sim_particles = sim_specs['user']['sim_particles']
sim_timesteps = sim_specs['user']['sim_timesteps']
time_limit = sim_specs['user']['sim_kill_minutes'] * 60.0
# Get from dictionary if key exists, else return default (e.g. 0)
cores = sim_specs['user'].get('cores', None)
kill_rate = sim_specs['user'].get('kill_rate', 0)
particle_variance = sim_specs['user'].get('particle_variance', 0)
# Composing variable names and x values to set up simulation
seed = int(np.rint(x[0][0]))
# This is to give a random variance of work-load
sim_particles = perturb(sim_particles, seed, particle_variance)
jobctl = JobController.controller
args = str(int(sim_particles)) + ' ' + str(sim_timesteps) + ' ' + str(seed) + ' ' + str(kill_rate)
if cores:
job = jobctl.launch(calc_type='sim', num_procs=cores, app_args=args, stdout='out.txt', stderr='err.txt', wait_on_run=True)
else:
job = jobctl.launch(calc_type='sim', app_args=args, stdout='out.txt', stderr='err.txt', wait_on_run=True)
# Stat file to check for bad runs
statfile = 'forces.stat'
filepath = os.path.join(job.workdir, statfile)
line = None
poll_interval = 1
while not job.finished:
line = read_last_line(filepath)
if line == "kill":
assert isinstance(out, tuple), \
"Calculation output must be a tuple."
assert len(out) >= 2, \
"Calculation output must be at least two elements."
calc_status = out[2] if len(out) >= 3 else UNSET_TAG
return out[0], out[1], calc_status
except Exception:
logger.debug("Re-raising exception from calc")
calc_status = CALC_EXCEPTION
raise
finally:
# This was meant to be handled by calc_stats module.
if job_timing and JobController.controller.list_of_jobs:
# Initially supporting one per calc. One line output.
job = JobController.controller.list_of_jobs[-1]
calc_msg = "Calc {:5d}: {} {} {} Status: {}".\
format(calc_id,
calc_type_strings[calc_type],
timer,
job.timer,
calc_status_strings.get(calc_status, "Completed"))
else:
calc_msg = "Calc {:5d}: {} {} Status: {}".\
format(calc_id,
calc_type_strings[calc_type],
timer,
calc_status_strings.get(calc_status, "Completed"))
logging.getLogger(LogConfig.config.stats_name).info(calc_msg)
def libE_local(sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, H0):
"Main routine for thread/process launch of libE."
nworkers = libE_specs['nprocesses']
check_inputs(libE_specs, alloc_specs, sim_specs, gen_specs, exit_criteria, H0)
jobctl = JobController.controller
if jobctl is not None:
local_host = socket.gethostname()
jobctl.add_comm_info(libE_nodes=local_host, serial_setup=True)
hist = History(alloc_specs, sim_specs, gen_specs, exit_criteria, H0)
# Launch worker team and set up logger
wcomms = start_proc_team(nworkers, sim_specs, gen_specs)
manager_logging_config()
# Set up cleanup routine to shut down worker team
def cleanup():
"Handler to clean up comms team."
kill_proc_team(wcomms, timeout=libE_specs.get('worker_timeout'))
# Run generic manager
logger.debug("Return from calc call")
assert isinstance(out, tuple), \
"Calculation output must be a tuple."
assert len(out) >= 2, \
"Calculation output must be at least two elements."
calc_status = out[2] if len(out) >= 3 else UNSET_TAG
return out[0], out[1], calc_status
except Exception:
logger.debug("Re-raising exception from calc")
calc_status = CALC_EXCEPTION
raise
finally:
# This was meant to be handled by calc_stats module.
if job_timing and JobController.controller.list_of_jobs:
# Initially supporting one per calc. One line output.
job = JobController.controller.list_of_jobs[-1]
calc_msg = "Calc {:5d}: {} {} {} Status: {}".\
format(calc_id,
calc_type_strings[calc_type],
timer,
job.timer,
calc_status_strings.get(calc_status, "Completed"))
else:
calc_msg = "Calc {:5d}: {} {} Status: {}".\
format(calc_id,
calc_type_strings[calc_type],
timer,
calc_status_strings.get(calc_status, "Completed"))
logging.getLogger(LogConfig.config.stats_name).info(calc_msg)
def _set_job_controller(workerID, comm):
"Optional -- set worker ID in the job controller, return if set"
jobctl = JobController.controller
if isinstance(jobctl, JobController):
jobctl.set_worker_info(comm, workerID)
return True
else:
logger.info("No job_controller set on worker {}".format(workerID))
return False
def libE_mpi(sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, H0):
"MPI version of the libE main routine"
libE_specs, mpi_comm_null = libE_mpi_defaults(libE_specs)
comm = libE_specs['comm']
if libE_specs['comm'] == mpi_comm_null:
return [], persis_info, 3 # Process not in comm
rank = comm.Get_rank()
is_master = (rank == 0)
check_inputs(libE_specs, alloc_specs, sim_specs, gen_specs, exit_criteria, H0)
jobctl = JobController.controller
if jobctl is not None:
local_host = socket.gethostname()
libE_nodes = set(comm.allgather(local_host))
jobctl.add_comm_info(libE_nodes=libE_nodes, serial_setup=is_master)
# Run manager or worker code, depending
if is_master:
return libE_mpi_manager(comm, sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, H0)
# Worker returns a subset of MPI output
libE_mpi_worker(comm, sim_specs, gen_specs, libE_specs)
return [], persis_info, []