Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _init_from_model_run(self, model_run, debug_data):
self._model_run = model_run
self._debug_data = debug_data
log_time(logger, self._timings, 'model_run_creation', comment='Model: preprocessing stage 1 (model_run)')
self._model_data_original = build_model_data(model_run)
log_time(logger, self._timings, 'model_data_original_creation', comment='Model: preprocessing stage 2 (model_data)')
random_seed = self._model_run.get_key('model.random_seed', None)
if random_seed:
np.random.seed(seed=random_seed)
# After setting the random seed, time clustering can take place
time_config = model_run.model.get('time', None)
if not time_config:
_model_data = self._model_data_original
else:
_model_data = apply_time_clustering(
self._model_data_original, model_run
)
def run_plan(model_data, timings, backend, build_only, backend_rerun=False):
log_time(logger, timings, 'run_start', comment='Backend: starting model run')
if not backend_rerun:
backend_model = backend.generate_model(model_data)
log_time(
logger, timings, 'run_backend_model_generated', time_since_run_start=True,
comment='Backend: model generated'
)
else:
backend_model = backend_rerun
run_config = backend_model.__calliope_run_config
solver = run_config['solver']
solver_io = run_config.get('solver_io', None)
solver_options = run_config.get('solver_options', None)
save_logs = run_config.get('save_logs', None)
if build_only:
results = xr.Dataset()
def _init_from_model_run(self, model_run, debug_data):
self._model_run = model_run
self._debug_data = debug_data
log_time(logger, self._timings, 'model_run_creation', comment='Model: preprocessing stage 1 (model_run)')
self._model_data_original = build_model_data(model_run)
log_time(logger, self._timings, 'model_data_original_creation', comment='Model: preprocessing stage 2 (model_data)')
random_seed = self._model_run.get_key('model.random_seed', None)
if random_seed:
np.random.seed(seed=random_seed)
# After setting the random seed, time clustering can take place
time_config = model_run.model.get('time', None)
if not time_config:
_model_data = self._model_data_original
else:
_model_data = apply_time_clustering(
self._model_data_original, model_run
)
self._model_data = final_timedimension_processing(_model_data)
log_time(
logger, self._timings, 'model_data_creation',
if build_only:
iterations = [0]
else:
iterations = range(len(window_starts))
for i in iterations:
start_timestep = window_starts.index[i]
# Build full model in first instance
if i == 0:
warmstart = False
end_timestep = horizon_ends.index[i]
timesteps = slice(start_timestep, end_timestep)
window_model_data = model_data.loc[dict(timesteps=timesteps)]
log_time(
logger, timings, 'model_gen_1',
comment='Backend: generating initial model'
)
backend_model = backend.generate_model(window_model_data)
# Build the full model in the last instance(s),
# where number of timesteps is less than the horizon length
elif i > len(horizon_ends) - 1:
warmstart = False
end_timestep = window_ends.index[i]
timesteps = slice(start_timestep, end_timestep)
window_model_data = model_data.loc[dict(timesteps=timesteps)]
log_time(
logger, timings, 'model_gen_{}'.format(i + 1),
Returns
-------
new_model : calliope.Model
New calliope model, including both inputs and results, but no backend interface.
"""
backend_model.__calliope_run_config = AttrDict.from_yaml_string(model_data.attrs['run_config'])
if backend_model.__calliope_run_config['mode'] != 'plan':
raise exceptions.ModelError(
'Cannot rerun the backend in {} run mode. Only `plan` mode is '
'possible.'.format(backend_model.__calliope_run_config['mode'])
)
timings = {}
log_time(logger, timings, 'model_creation')
results, backend_model = backend_run.run_plan(
model_data, timings, run_pyomo,
build_only=False, backend_rerun=backend_model
)
inputs = access_pyomo_model_inputs(backend_model)
# Add additional post-processed result variables to results
if results.attrs.get('termination_condition', None) in ['optimal', 'feasible']:
results = postprocess_model_results(
results, model_data.reindex(results.coords), timings
)
for key, var in results.data_vars.items():
var.attrs['is_result'] = 1
self._model_data = model_data
self.inputs = self._model_data.filter_by_attrs(is_result=0)
self.model_config = UpdateObserverDict(
initial_yaml_string=model_data.attrs.get('model_config', '{}'),
name='model_config', observer=self._model_data
)
self.run_config = UpdateObserverDict(
initial_yaml_string=model_data.attrs.get('run_config', '{}'),
name='run_config', observer=self._model_data
)
results = self._model_data.filter_by_attrs(is_result=1)
if len(results.data_vars) > 0:
self.results = results
log_time(
logger, self._timings, 'model_data_loaded',
comment='Model: loaded model_data'
)
if build_only:
results = xr.Dataset()
else:
log_time(
logger, timings, 'run_solver_start',
comment='Backend: sending model to solver'
)
results = backend.solve_model(
backend_model, solver=solver,
solver_io=solver_io, solver_options=solver_options, save_logs=save_logs
)
log_time(
logger, timings, 'run_solver_exit', time_since_run_start=True,
comment='Backend: solver finished running'
)
termination = backend.load_results(backend_model, results)
log_time(
logger, timings, 'run_results_loaded',
comment='Backend: loaded results'
)
results = backend.get_result_array(backend_model, model_data)
results.attrs['termination_condition'] = termination
if results.attrs['termination_condition'] in ['optimal', 'feasible']:
results.attrs['objective_function_value'] = backend_model.obj()
def run_operate(model_data, timings, backend, build_only):
"""
For use when mode is 'operate', to allow the model to be built, edited, and
iteratively run within Pyomo.
"""
log_time(logger, timings, 'run_start',
comment='Backend: starting model run in operational mode')
defaults = AttrDict.from_yaml_string(model_data.attrs['defaults'])
run_config = AttrDict.from_yaml_string(model_data.attrs['run_config'])
operate_params = ['purchased'] + [
i.replace('_max', '') for i in defaults if i[-4:] == '_max'
]
# Capacity results (from plan mode) can be used as the input to operate mode
if (any(model_data.filter_by_attrs(is_result=1).data_vars) and
run_config.get('operation.use_cap_results', False)):
# Anything with is_result = 1 will be ignored in the Pyomo model
for varname, varvals in model_data.data_vars.items():
if varname in operate_params:
varvals.attrs['is_result'] = 1