Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for i in sets.techlists
if 'carrier_prod_equals' in model_run.model.get_key('group_share.{}'.format(i), {}).keys()
for carrier in sets.carriers
if carrier in model_run.model.get_key('group_share.{}.carrier_prod_equals'.format(i), {}).keys()
]
# group.py
group_constraints = {
name: data for name, data in model_run['group_constraints'].items()
if data.get("exists", True)
}
constraint_sets['constraint_groups'] = list(group_constraints.keys())
for group_constraint_name, group_constraint in group_constraints.items():
tech_groups = [
[k for k, v in checks.DEFAULTS.tech_groups.items()
if i in v['allowed_group_constraints']]
for i in group_constraint.keys()
if i not in ['techs', 'locs', 'exists']
]
allowed_tech_groups = set(tech_groups[0]).intersection(*tech_groups)
allowed_techs = sum([sets['techs_{}'.format(i)] for i in allowed_tech_groups], [])
techs = group_constraint.get('techs', allowed_techs)
locs = group_constraint.get('locs', sets['locs'])
# If there are transmission techs, keep only those that link to allowed locations
techs = [i for i in techs if ':' not in techs or i.split(':')[-1] in locs]
trans_techs = set(techs).intersection(sets['techs_transmission_names'])
for i in trans_techs:
techs += [i + ':' + j for j in locs]
errors = []
debug_comments = AttrDict()
for tech_id, tech_config in config_model.techs.items():
# If a tech specifies ``exists: false``, we skip it entirely
if not tech_config.get('exists', True):
continue
tech_result = AttrDict()
# Add inheritance chain
tech_result.inheritance = get_parents(tech_id, config_model)
# CHECK: A tech's parent must lead to one of the built-in tech_groups
builtin_tech_groups = checks.DEFAULTS.tech_groups.keys()
if tech_result.inheritance[-1] not in builtin_tech_groups:
errors.append(
'tech {} must inherit from a built-in tech group'.format(tech_id)
)
# Process inheritance
tech_result.essentials = AttrDict()
tech_result.constraints = AttrDict()
for parent in reversed(tech_result.inheritance):
# Does the parent group have model-wide settings?
parent_essentials = config_model.tech_groups[parent].essentials
parent_systemwide_constraints = util.get_systemwide_constraints(
config_model.tech_groups[parent]
)
for k in parent_essentials.as_dict_flat():
debug_comments.set_key(
def final_timedimension_processing(model_data):
# Final checking of the data
model_data, final_check_comments, warns, errors = checks.check_model_data(model_data)
exceptions.print_warnings_and_raise_errors(warnings=warns, errors=errors)
model_data = add_max_demand_timesteps(model_data)
model_data = add_zero_carrier_ratio_sets(model_data)
model_data = reorganise_xarray_dimensions(model_data)
return model_data
# FutureWarning: check if config includes an explicit objective cost class.
# Added in 0.6.4-dev, to be removed in v0.7.0-dev.
has_explicit_cost_class = isinstance(config.get_key('run.objective_options.cost_class', None), dict)
# The input files are allowed to override other model defaults
config_model.union(config, allow_override=True)
# First pass of applying override dict before applying scenarios,
# so that can override scenario definitions by override_dict
if override_dict:
if isinstance(override_dict, str):
override_dict = AttrDict.from_yaml_string(override_dict)
elif not isinstance(override_dict, AttrDict):
override_dict = AttrDict(override_dict)
warning_messages = checks.check_overrides(config_model, override_dict)
exceptions.print_warnings_and_raise_errors(warnings=warning_messages)
# FutureWarning: If config does not include an explicit objective cost class, check override dict.
# Added in 0.6.4-dev, to be removed in v0.7.0-dev.
if has_explicit_cost_class is False:
has_explicit_cost_class = isinstance(override_dict.get_key('run.objective_options.cost_class', None), dict)
config_model.union(
override_dict, allow_override=True, allow_replacement=True
)
if scenario:
scenarios = config_model.get('scenarios', {})
if scenario in scenarios.keys():
# Manually defined scenario names cannot be the same as single
)
# 6) Grab additional relevant bits from run and model config
model_run['run'] = config['run']
model_run['model'] = config['model']
model_run['group_constraints'] = config.get('group_constraints', {})
# 7) Initialize sets
all_sets = sets.generate_simple_sets(model_run)
all_sets.union(sets.generate_loc_tech_sets(model_run, all_sets))
all_sets = AttrDict({k: list(v) for k, v in all_sets.items()})
model_run['sets'] = all_sets
model_run['constraint_sets'] = constraint_sets.generate_constraint_sets(model_run)
# 8) Final sense-checking
final_check_comments, warning_messages, errors = checks.check_final(model_run)
debug_comments.union(final_check_comments)
exceptions.print_warnings_and_raise_errors(warnings=warning_messages, errors=errors)
# 9) Build a debug data dict with comments and the original configs
debug_data = AttrDict({
'comments': debug_comments,
'config_initial': config,
})
return model_run, debug_data
def add_attributes(model_run):
attr_dict = AttrDict()
attr_dict['calliope_version'] = __version__
attr_dict['applied_overrides'] = model_run['applied_overrides']
attr_dict['scenario'] = model_run['scenario']
##
# Build the `defaults` attribute that holds all default settings
# used in get_param() lookups inside the backend
##
default_tech_dict = checks.DEFAULTS.techs.default_tech.as_dict()
default_location_dict = checks.DEFAULTS.locations.default_location.as_dict()
# Group constraint defaults are a little bit more involved
default_group_constraint_keys = [
i for i in checks.DEFAULTS.group_constraints.default_group.keys()
if i not in ['locs', 'techs', 'exists']
]
default_group_constraint_dict = {}
for k in default_group_constraint_keys:
k_default = checks.DEFAULTS.group_constraints.default_group[k]
if isinstance(k_default, dict):
assert len(k_default.keys()) == 1
default_group_constraint_dict['group_' + k] = k_default[list(k_default.keys())[0]]
else:
default_group_constraint_dict['group_' + k] = k_default
"""
Returns a processed model_run configuration AttrDict and a debug
YAML object with comments attached, ready to write to disk.
Parameters
----------
config : AttrDict
debug_comments : AttrDict
"""
model_run = AttrDict()
model_run['scenario'] = scenario
model_run['applied_overrides'] = ';'.join(applied_overrides)
# 1) Initial checks on model configuration
warning_messages, errors = checks.check_initial(config)
exceptions.print_warnings_and_raise_errors(warnings=warning_messages, errors=errors)
# 2) Fully populate techs
# Raises ModelError if necessary
model_run['techs'], debug_techs, errors = process_techs(config)
debug_comments.set_key('model_run.techs', debug_techs)
exceptions.print_warnings_and_raise_errors(errors=errors)
# 3) Fully populate tech_groups
model_run['tech_groups'] = process_tech_groups(config, model_run['techs'])
# 4) Fully populate locations
model_run['locations'], debug_locs, warning_messages, errors = locations.process_locations(
config, model_run['techs']
)
debug_comments.set_key('model_run.locations', debug_locs)
##
# Build the `defaults` attribute that holds all default settings
# used in get_param() lookups inside the backend
##
default_tech_dict = checks.DEFAULTS.techs.default_tech.as_dict()
default_location_dict = checks.DEFAULTS.locations.default_location.as_dict()
# Group constraint defaults are a little bit more involved
default_group_constraint_keys = [
i for i in checks.DEFAULTS.group_constraints.default_group.keys()
if i not in ['locs', 'techs', 'exists']
]
default_group_constraint_dict = {}
for k in default_group_constraint_keys:
k_default = checks.DEFAULTS.group_constraints.default_group[k]
if isinstance(k_default, dict):
assert len(k_default.keys()) == 1
default_group_constraint_dict['group_' + k] = k_default[list(k_default.keys())[0]]
else:
default_group_constraint_dict['group_' + k] = k_default
attr_dict['defaults'] = ruamel.yaml.dump({
**default_tech_dict['constraints'],
**{'cost_{}'.format(k): v for k, v in default_tech_dict['costs']['default_cost'].items()},
**default_location_dict,
**default_group_constraint_dict
})
return attr_dict