Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
)
# also check that expected data is right shape (e.g., in case number of species
# has changed if running a new model)
gas = ct.Solution(model, phase_name)
matches_shape = ignition_data.shape[1] == 2 + gas.n_species
if matches_number and matches_shape:
logging.info('Reusing existing autoignition samples for the starting model.')
else:
logging.info('Running autoignition simulations for starting model.')
stop_at_ignition = False
simulations = []
for idx, case in enumerate(ignition_conditions):
simulations.append([
Simulation(idx, case, model, phase_name=phase_name, path=path), stop_at_ignition
])
jobs = tuple(simulations)
if num_threads == 1:
results = []
for job in jobs:
results.append(simulation_worker(job))
else:
pool = multiprocessing.Pool(processes=num_threads)
results = pool.map(simulation_worker, jobs)
pool.close()
pool.join()
ignition_delays = np.zeros(len(ignition_conditions))
ignition_data = []
for idx, sim in enumerate(results):
sim_array : list of Simulation
List of simulations to be performed
"""
sim_array = []
i = 0
while (i < len(conditions_array)): #For all conditions
properties = {} #create properties dictionary
properties['temperature'] = float(conditions_array[i].temperature)
properties['pressure'] = float(conditions_array[i].pressure)
properties['equivalence_ratio'] = float(conditions_array[i].equi)
properties['fuel'] = conditions_array[i].fuel
properties['oxidizer'] = conditions_array[i].oxid
# create simulation object and add it to the list
sim_array.append(Simulation(i, properties, model))
i = i + 1
return sim_array
sim_tuple : tuple
Contains Simulation object and other parameters needed to setup
and run case.
Returns
-------
sim : Simulation
Object with simulation metadata
"""
sim, stop_at_ignition = sim_tuple
sim.setup_case()
sim.run_case(stop_at_ignition)
sim = Simulation(sim.idx, sim.properties, sim.model, phase_name=sim.phase_name, path=sim.path)
return sim
num_threads = multiprocessing.cpu_count()-1 or 1
if ignition_conditions:
ignition_delays = np.zeros(len(ignition_conditions))
exists_output = os.path.isfile(data_files['output_ignition'])
if reuse_saved and exists_output:
ignition_delays = np.genfromtxt(data_files['output_ignition'], delimiter=',')
if reuse_saved and len(ignition_delays) == len(ignition_conditions):
logging.info('Reusing existing autoignition samples for the starting model.')
else:
simulations = []
for idx, case in enumerate(ignition_conditions):
simulations.append([
Simulation(idx, case, model, phase_name=phase_name, path=path), idx
])
jobs = tuple(simulations)
if num_threads == 1:
results = []
for job in jobs:
results.append(ignition_worker(job))
else:
pool = multiprocessing.Pool(processes=num_threads)
results = pool.map(ignition_worker, jobs)
pool.close()
pool.join()
results = {key:val for k in results for key, val in k.items()}
ignition_delays = np.zeros(len(results))
for idx, ignition_delay in results.items():