Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
if tech in getattr(backend_model, 'techs_transmission_names', []):
all_loc_techs = [
i for i in backend_model.loc_techs_transmission
if i.split('::')[1].split(':')[0] == tech
]
multiplier = 2 # there are always two technologies associated with one link
else:
all_loc_techs = [
i for i in backend_model.loc_techs
if i.split('::')[1] == tech
]
multiplier = 1
max_systemwide = get_param(backend_model, 'units_max_systemwide', tech)
equals_systemwide = get_param(backend_model, 'units_equals_systemwide', tech)
if np.isinf(po.value(max_systemwide)) and not equals_systemwide:
return po.Constraint.NoConstraint
elif equals_systemwide and np.isinf(po.value(equals_systemwide)):
raise ValueError(
'Cannot use inf for energy_cap_equals_systemwide for tech `{}`'.format(tech)
)
sum_expr_units = sum(
backend_model.units[loc_tech] for loc_tech in all_loc_techs
if loc_tech_is_in(backend_model, loc_tech, 'loc_techs_milp')
)
sum_expr_purchase = sum(
backend_model.purchased[loc_tech] for loc_tech in all_loc_techs
if loc_tech_is_in(backend_model, loc_tech, 'loc_techs_purchase')
processed_links = AttrDict()
for link in links_in:
loc_from, loc_to = [i.strip() for i in link.split(',')]
# Skip this link entirely if it has been told not to exist
if not links_in[link].get('exists', True):
continue
# Also skip this link - and warn about it - if it links to a
# now-inexistant (because removed) location
if (loc_from not in locations.keys() or loc_to not in locations.keys()):
warnings.append(
'Not building the link {},{} because one or both of its '
'locations have been removed from the model by setting '
'``exists: false``'.format(loc_from, loc_to)
)
continue
processed_transmission_techs = AttrDict()
for tech_name in links_in[link].techs:
# Skip techs that have been told not to exist
# for this particular link
if not links_in[link].get_key('techs.{}.exists'.format(tech_name), True):
continue
if tech_name not in processed_transmission_techs:
tech_settings = AttrDict()
# Combine model-wide settings from all parent groups
for parent in reversed(modelrun_techs[tech_name].inheritance):
tech_settings.union(
tech_groups_in[parent],
allow_override=True
)
# Now overwrite with the tech's own model-wide settings
tech_settings.union(
techs_in[tech_name],
def generate_simple_sets(model_run):
"""
Generate basic sets for a given pre-processed ``model_run``.
Parameters
----------
model_run : AttrDict
"""
sets = AttrDict()
flat_techs = model_run.techs.as_dict(flat=True)
flat_locations = model_run.locations.as_dict(flat=True)
sets.resources = set(flatten_list(
v for k, v in flat_techs.items()
if '.carrier' in k
))
sets.carriers = sets.resources - set(['resource'])
sets.carrier_tiers = set(
key.split('.carrier_')[1]
for key in flat_techs.keys()
if '.carrier_' in key
)
isinstance(r, list) and
any([i in tech_config.constraints for i in r])
)
if not single_ok and not multiple_ok:
errors.append(
'`{}` at `{}` fails to define '
'all required constraints: {}'.format(tech_id, loc_id, required)
)
# Warn if defining a carrier ratio for a conversion_plus tech,
# but applying it to a carrier that isn't one of the carriers specified by that tech
# e.g. carrier_ratios.carrier_in_2.cooling when cooling isn't a carrier`
defined_carriers = get_all_carriers(model_run.techs[tech_id].essentials)
carriers_in_ratios = [
i.split('.')[-1] for i in
tech_config.constraints.get_key('carrier_ratios', AttrDict()).as_dict_flat().keys()
]
for carrier in carriers_in_ratios:
if carrier not in defined_carriers:
model_warnings.append(
'Tech `{t}` gives a carrier ratio for `{c}`, but does not actually '
'configure `{c}` as a carrier.'.format(t=tech_id, c=carrier)
)
# If the technology involves storage, warn when energy_cap and storage_cap aren't connected
energy_cap_per_storage_cap_params = [
'charge_rate', 'energy_cap_per_storage_cap_min',
'energy_cap_per_storage_cap_max', 'energy_cap_per_storage_cap_equals'
]
if (loc_id + '::' + tech_id in model_run.sets.loc_techs_store
and not any(i in tech_config.constraints.keys() for i in energy_cap_per_storage_cap_params)):
logger.info(
# `timesteps` set is built from the results of timeseries_data processing
sets.timesteps = list(model_run.timesteps.astype(str))
model_run.del_key('timesteps')
# `techlists` are strings with comma-separated techs used for grouping in
# some model-wide constraints
sets.techlists = set()
for k in model_run.model.get_key('group_share', {}).keys():
sets.techlists.add(k)
# `constraint_groups` are the group names per constraint that is defined
# at a group level
sets.group_constraints = set()
group_constraints = AttrDict({
name: data for name, data in model_run['group_constraints'].items()
if data.get("exists", True)
})
if len(group_constraints.keys()) > 0:
sets.group_constraints.update(
i.split('.')[1] for i in group_constraints.as_dict_flat().keys()
if i.split('.')[1] not in ['techs', 'locs']
)
for constr in sets.group_constraints:
sets['group_names_' + constr] = set(
k for k, v in group_constraints.items()
if constr in v.keys()
)
return sets
# Separately find the loc_techs(_carriers) dimension and all other dimensions
loc_tech_dim = [i for i in data_var.dims if 'loc_tech' in i]
if not loc_tech_dim:
loc_tech_dim = [i for i in data_var.dims if 'loc_carrier' in i]
if not loc_tech_dim:
if return_as == 'Series':
return data_var.to_series()
elif return_as in ['DataArray', 'MultiIndex DataArray']:
return data_var
else:
raise ValueError('`return_as` must be `DataArray`, `Series`, or '
'`MultiIndex DataArray`, but `{}` given'.format(return_as))
elif len(loc_tech_dim) > 1:
e = exceptions.ModelError
raise e("Cannot split loc_techs or loc_tech_carriers dimension "
"for DataArray {}".format(data_var.name))
loc_tech_dim = loc_tech_dim[0]
# xr.Datarray -> pd.Series allows for string operations
data_var_idx = data_var[loc_tech_dim].to_index()
index_list = data_var_idx.str.split('::').tolist()
# carrier_prod, carrier_con, and carrier_export will return an index_list
# of size 3, all others will be an index list of size 2
possible_names = ['loc', 'tech', 'carrier']
names = [i + 's' for i in possible_names if i in loc_tech_dim]
data_var_midx = pd.MultiIndex.from_tuples(index_list, names=names)
# Replace the Datarray loc_tech_dim with this new MultiIndex
# Generate the set of all files we want to read from file
flattened_config = model_run.locations.as_dict_flat()
csv_files = set([
v.split('=')[1].rsplit(':', 1)[0]
for v in flattened_config.values() if 'file=' in str(v)
])
for f in csv_files:
f_path = os.path.join(config_model.model.timeseries_data_path, f)
parser = lambda x: datetime.datetime.strptime(x, dtformat)
try:
df = pd.read_csv(
f_path, index_col=0, parse_dates=True, date_parser=parser
)
except ValueError as e:
raise exceptions.ModelError(
"Incorrect datetime format used in {}, expecting "
"`{}`, got `{}` instead"
"".format(f, dtformat, e.args[0].split("'")[1]))
timeseries_data[f] = df
# Apply time subsetting, if supplied in model_run
subset_time_config = config_model.model.subset_time
if subset_time_config is not None:
if isinstance(subset_time_config, list):
if len(subset_time_config) == 2:
time_slice = slice(subset_time_config[0], subset_time_config[1])
else:
raise exceptions.ModelError(
'Invalid subset_time value: {}'.format(subset_time_config)
)
else:
# charge/discharge (including resource consumed for supply_plus techs)
elif var == 'storage':
array_flow = _get_reindexed_array('storage')
carrier_flow = (array_prod.sum('carriers') + array_con.sum('carriers') - resource_con)
carrier_flow = subset_sum_squeeze(carrier_flow, subset, sum_dims, squeeze)
elif var == 'resource_con':
array_flow = resource_con
else:
array_flow = _get_reindexed_array(var)
array_flow = subset_sum_squeeze(array_flow, subset, sum_dims, squeeze)
if 'timesteps' not in array_flow.dims or len(array_flow.dims) > 2:
e = exceptions.ModelError
raise e('Cannot plot timeseries for variable `{}` with subset `{}`'
'and `sum_dims: {}`'.format(var, subset, sum_dims))
for tech in array_flow.techs.values:
tech_dict = {'techs': tech}
if not array_flow.loc[tech_dict].sum():
continue
# We allow transmisison tech information to show up in some cases
if 'techs_transmission' in dataset and tech in dataset.techs_transmission.values:
base_tech = 'transmission'
color = dataset.colors.loc[{'techs': tech.split(':')[0]}].item()
name = dataset.names.loc[{'techs': tech.split(':')[0]}].item()
if var in carriers:
continue # no transmission in carrier flow
else:
base_tech = dataset.inheritance.loc[tech_dict].item().split('.')[0]
]
multiplier = 2 # there are always two technologies associated with one link
else:
all_loc_techs = [
i for i in backend_model.loc_techs
if i.split('::')[1] == tech
]
multiplier = 1
max_systemwide = get_param(backend_model, 'energy_cap_max_systemwide', tech)
equals_systemwide = get_param(backend_model, 'energy_cap_equals_systemwide', tech)
if np.isinf(po.value(max_systemwide)) and not equals_systemwide:
return po.Constraint.NoConstraint
elif equals_systemwide and np.isinf(po.value(equals_systemwide)):
raise exceptions.ModelError(
'Cannot use inf for energy_cap_equals_systemwide for tech `{}`'.format(tech)
)
sum_expr = sum(backend_model.energy_cap[loc_tech] for loc_tech in all_loc_techs)
if equals_systemwide:
return sum_expr == equals_systemwide * multiplier
else:
return sum_expr <= max_systemwide * multiplier
for loc_tech in model_run.sets['loc_techs_conversion']:
# For any non-conversion technology, there are only two carriers
# (one produced and one consumed)
loc_tech_carrier_in = [
i for i in
model_run.sets['loc_tech_carriers_con']
if loc_tech == i.rsplit("::", 1)[0]
]
loc_tech_carrier_out = [
i for i in
model_run.sets['loc_tech_carriers_prod']
if loc_tech == i.rsplit("::", 1)[0]
]
if len(loc_tech_carrier_in) > 1 or len(loc_tech_carrier_out) > 1:
raise exceptions.ModelError(
'More than one carrier in or out associated with '
'conversion location:technology `{}`'.format(loc_tech)
)
else:
loc_techs_conversion_array.loc[
dict(loc_techs_conversion=loc_tech, carrier_tiers=["in", "out"])
] = [loc_tech_carrier_in[0], loc_tech_carrier_out[0]]
dataset = dataset.merge(
loc_techs_conversion_array.to_dataset(name="lookup_loc_techs_conversion")
)
return dataset