Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'not cyclic_state_of_charge')
sus_i = sus.index
if not sus.empty:
coeff_val = (-sus.carrier.map(emissions), get_var(n, 'StorageUnit',
'state_of_charge').loc[sns[-1], sus_i])
vals = linexpr(coeff_val, as_pandas=False)
lhs = lhs + '\n' + join_exprs(vals)
rhs -= sus.carrier.map(emissions) @ sus.state_of_charge_initial
# stores
n.stores['carrier'] = n.stores.bus.map(n.buses.carrier)
stores = n.stores.query('carrier in @emissions.index and not e_cyclic')
if not stores.empty:
coeff_val = (-stores.carrier.map(emissions), get_var(n, 'Store', 'e')
.loc[sns[-1], stores.index])
vals = linexpr(coeff_val, as_pandas=False)
lhs = lhs + '\n' + join_exprs(vals)
rhs -= stores.carrier.map(emissions) @ stores.e_initial
con = write_constraint(n, lhs, glc.sense, rhs, axes=pd.Index([name]))
set_conref(n, con, 'GlobalConstraint', 'mu', name)
# for the next two to we need a line carrier
if len(n.global_constraints) > len(glcs):
n.lines['carrier'] = n.lines.bus0.map(n.buses.carrier)
# expansion limits
glcs = n.global_constraints.query('type == '
'"transmission_volume_expansion_limit"')
substr = lambda s: re.sub('[\[\]\(\)]', '', s)
for name, glc in glcs.iterrows():
car = [substr(c.strip()) for c in glc.carrier_attribute.split(',')]
lhs = ''
if lhs == '': continue
sense = glc.sense
rhs = glc.constant
con = write_constraint(n, lhs, sense, rhs, axes=pd.Index([name]))
set_conref(n, con, 'GlobalConstraint', 'mu', name)
# expansion cost limits
glcs = n.global_constraints.query('type == '
'"transmission_expansion_cost_limit"')
for name, glc in glcs.iterrows():
car = [substr(c.strip()) for c in glc.carrier_attribute.split(',')]
lhs = ''
for c, attr in (('Line', 's_nom'), ('Link', 'p_nom')):
ext_i = n.df(c).query(f'carrier in @car and {attr}_extendable').index
if ext_i.empty: continue
v = linexpr((n.df(c).capital_cost[ext_i], get_var(n, c, attr)[ext_i]),
as_pandas=False)
lhs += '\n' + join_exprs(v)
if lhs == '': continue
sense = glc.sense
rhs = glc.constant
con = write_constraint(n, lhs, sense, rhs, axes=pd.Index([name]))
set_conref(n, con, 'GlobalConstraint', 'mu', name)
----------
n : pypsa.Network
c : str
name of the network component
attr : str
name of the attribute, e.g. 'p'
pnl : bool, default True
Whether variable which should be fixed is time-dependent
"""
if pnl:
if attr + '_set' not in n.pnl(c): return
fix = n.pnl(c)[attr + '_set'].unstack().dropna()
if fix.empty: return
lhs = linexpr((1, get_var(n, c, attr).unstack()[fix.index]), as_pandas=False)
constraints = write_constraint(n, lhs, '=', fix).unstack().T
else:
if attr + '_set' not in n.df(c): return
fix = n.df(c)[attr + '_set'].dropna()
if fix.empty: return
lhs = linexpr((1, get_var(n, c, attr)[fix.index]), as_pandas=False)
constraints = write_constraint(n, lhs, '=', fix)
set_conref(n, constraints, c, f'mu_{attr}_set')
"""
# constant for already done investment
nom_attr = nominal_attrs.items()
constant = 0
for c, attr in nom_attr:
ext_i = get_extendable_i(n, c)
constant += n.df(c)[attr][ext_i] @ n.df(c).capital_cost[ext_i]
object_const = write_bound(n, constant, constant)
n.objective_f.write(linexpr((-1, object_const), as_pandas=False)[0])
for c, attr in lookup.query('marginal_cost').index:
cost = (get_as_dense(n, c, 'marginal_cost', sns)
.loc[:, lambda ds: (ds != 0).all()]
.mul(n.snapshot_weightings[sns], axis=0))
if cost.empty: continue
terms = linexpr((cost, get_var(n, c, attr).loc[sns, cost.columns]))
n.objective_f.write(join_exprs(terms))
# investment
for c, attr in nominal_attrs.items():
cost = n.df(c)['capital_cost'][get_extendable_i(n, c)]
if cost.empty: continue
terms = linexpr((cost, get_var(n, c, attr)[cost.index]))
n.objective_f.write(join_exprs(terms))
attr : str
name of the attribute, e.g. 'p'
"""
ext_i = get_extendable_i(n, c)
if ext_i.empty: return
min_pu, max_pu = get_bounds_pu(n, c, sns, ext_i, attr)
operational_ext_v = get_var(n, c, attr)[ext_i]
nominal_v = get_var(n, c, nominal_attrs[c])[ext_i]
rhs = 0
lhs, *axes = linexpr((max_pu, nominal_v), (-1, operational_ext_v),
return_axes=True)
define_constraints(n, lhs, '>=', rhs, c, 'mu_upper', axes=axes, spec=attr)
lhs, *axes = linexpr((min_pu, nominal_v), (-1, operational_ext_v),
return_axes=True)
define_constraints(n, lhs, '<=', rhs, c, 'mu_lower', axes=axes, spec=attr)
def masked_term(coeff, var, cols):
return linexpr((coeff[cols], var[cols]))\
.reindex(index=axes[0], columns=axes[1], fill_value='').values
# ext down
gens_i = rdown_i & ext_i
limit_pu = n.df(c)['ramp_limit_down'][gens_i]
p_nom = get_var(n, c, 'p_nom')[gens_i]
lhs = linexpr((1, p[gens_i]), (-1, p_prev[gens_i]), (limit_pu, p_nom))
define_constraints(n, lhs, '>=', 0, c, 'mu_ramp_limit_down', spec='ext.')
# com down
gens_i = rdown_i & com_i
if not gens_i.empty:
limit_shut = n.df(c).loc[gens_i].eval('ramp_limit_shut_down * p_nom')
limit_down = n.df(c).loc[gens_i].eval('ramp_limit_down * p_nom')
status = get_var(n, c, 'status').loc[sns[1:], gens_i]
status_prev = get_var(n, c, 'status').shift(1).loc[sns[1:], gens_i]
lhs = linexpr((1, p[gens_i]), (-1, p_prev[gens_i]),
(limit_down - limit_shut, status), (limit_shut, status_prev))
define_constraints(n, lhs, '>=', 0, c, 'mu_ramp_limit_down', spec='com.')
gens = n.generators.query('carrier in @emissions.index')
if not gens.empty:
em_pu = gens.carrier.map(emissions)/gens.efficiency
em_pu = n.snapshot_weightings.to_frame() @ em_pu.to_frame('weightings').T
vals = linexpr((em_pu, get_var(n, 'Generator', 'p')[gens.index]),
as_pandas=False)
lhs += join_exprs(vals)
# storage units
sus = n.storage_units.query('carrier in @emissions.index and '
'not cyclic_state_of_charge')
sus_i = sus.index
if not sus.empty:
coeff_val = (-sus.carrier.map(emissions), get_var(n, 'StorageUnit',
'state_of_charge').loc[sns[-1], sus_i])
vals = linexpr(coeff_val, as_pandas=False)
lhs = lhs + '\n' + join_exprs(vals)
rhs -= sus.carrier.map(emissions) @ sus.state_of_charge_initial
# stores
n.stores['carrier'] = n.stores.bus.map(n.buses.carrier)
stores = n.stores.query('carrier in @emissions.index and not e_cyclic')
if not stores.empty:
coeff_val = (-stores.carrier.map(emissions), get_var(n, 'Store', 'e')
.loc[sns[-1], stores.index])
vals = linexpr(coeff_val, as_pandas=False)
lhs = lhs + '\n' + join_exprs(vals)
rhs -= stores.carrier.map(emissions) @ stores.e_initial
con = write_constraint(n, lhs, glc.sense, rhs, axes=pd.Index([name]))
set_conref(n, con, 'GlobalConstraint', 'mu', name)
def define_objective(n, sns):
"""
Defines and writes out the objective function
"""
# constant for already done investment
nom_attr = nominal_attrs.items()
constant = 0
for c, attr in nom_attr:
ext_i = get_extendable_i(n, c)
constant += n.df(c)[attr][ext_i] @ n.df(c).capital_cost[ext_i]
object_const = write_bound(n, constant, constant)
n.objective_f.write(linexpr((-1, object_const), as_pandas=False)[0])
for c, attr in lookup.query('marginal_cost').index:
cost = (get_as_dense(n, c, 'marginal_cost', sns)
.loc[:, lambda ds: (ds != 0).all()]
.mul(n.snapshot_weightings[sns], axis=0))
if cost.empty: continue
terms = linexpr((cost, get_var(n, c, attr).loc[sns, cost.columns]))
n.objective_f.write(join_exprs(terms))
# investment
for c, attr in nominal_attrs.items():
cost = n.df(c)['capital_cost'][get_extendable_i(n, c)]
if cost.empty: continue
terms = linexpr((cost, get_var(n, c, attr)[cost.index]))
n.objective_f.write(join_exprs(terms))