Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
diff = msq_threshold
while diff >= msq_threshold or iteration < min_iterations:
if iteration >= max_iterations:
logger.info(f'Iteration {iteration} beyond max_iterations '
f'{max_iterations}. Stopping ...')
break
s_nom_prev = n.lines.s_nom_opt if iteration else n.lines.s_nom
kwargs['warmstart'] = bool(iteration and ('basis_fn' in n.__dir__()))
network_lopf(n, snapshots, **kwargs)
update_line_params(n, s_nom_prev)
diff = msq_diff(n, s_nom_prev)
iteration += 1
logger.info('Running last lopf with fixed branches, overwrite p_nom '
'for links and s_nom for lines')
ext_links_i = get_extendable_i(n, 'Link')
n.lines[['s_nom', 's_nom_extendable']] = n.lines['s_nom_opt'], False
n.links[['p_nom', 'p_nom_extendable']] = n.links['p_nom_opt'], False
network_lopf(n, snapshots, **kwargs)
n.lines.loc[ext_i, 's_nom_extendable'] = True
n.links.loc[ext_links_i, 'p_nom_extendable'] = True
def define_dispatch_for_extendable_constraints(n, sns, c, attr):
"""
Sets power dispatch constraints for extendable devices for a given
component and a given attribute.
Parameters
----------
n : pypsa.Network
c : str
name of the network component
attr : str
name of the attribute, e.g. 'p'
"""
ext_i = get_extendable_i(n, c)
if ext_i.empty: return
min_pu, max_pu = get_bounds_pu(n, c, sns, ext_i, attr)
operational_ext_v = get_var(n, c, attr)[ext_i]
nominal_v = get_var(n, c, nominal_attrs[c])[ext_i]
rhs = 0
lhs, *axes = linexpr((max_pu, nominal_v), (-1, operational_ext_v),
return_axes=True)
define_constraints(n, lhs, '>=', rhs, c, 'mu_upper', axes=axes, spec=attr)
lhs, *axes = linexpr((min_pu, nominal_v), (-1, operational_ext_v),
return_axes=True)
define_constraints(n, lhs, '<=', rhs, c, 'mu_lower', axes=axes, spec=attr)
def define_objective(n, sns):
"""
Defines and writes out the objective function
"""
# constant for already done investment
nom_attr = nominal_attrs.items()
constant = 0
for c, attr in nom_attr:
ext_i = get_extendable_i(n, c)
constant += n.df(c)[attr][ext_i] @ n.df(c).capital_cost[ext_i]
object_const = write_bound(n, constant, constant)
n.objective_f.write(linexpr((-1, object_const), as_pandas=False)[0])
for c, attr in lookup.query('marginal_cost').index:
cost = (get_as_dense(n, c, 'marginal_cost', sns)
.loc[:, lambda ds: (ds != 0).all()]
.mul(n.snapshot_weightings[sns], axis=0))
if cost.empty: continue
terms = linexpr((cost, get_var(n, c, attr).loc[sns, cost.columns]))
n.objective_f.write(join_exprs(terms))
# investment
for c, attr in nominal_attrs.items():
cost = n.df(c)['capital_cost'][get_extendable_i(n, c)]
if cost.empty: continue
terms = linexpr((cost, get_var(n, c, attr)[cost.index]))
the current and the previous iteration. As soon as this threshold is
undercut, and the number of iterations is bigger than 'min_iterations'
the iterative optimization stops
min_iterations : integer, default 1
Minimal number of iteration to run regardless whether the msq_threshold
is already undercut
max_iterations : integer, default 100
Maximal numbder of iterations to run regardless whether msq_threshold
is already undercut
**kwargs
Keyword arguments of the lopf function which runs at each iteration
'''
n.lines['carrier'] = n.lines.bus0.map(n.buses.carrier)
ext_i = get_extendable_i(n, 'Line')
typed_i = n.lines.query('type != ""').index
ext_untyped_i = ext_i.difference(typed_i)
ext_typed_i = ext_i & typed_i
base_s_nom = (np.sqrt(3) * n.lines['type'].map(n.line_types.i_nom) *
n.lines.bus0.map(n.buses.v_nom))
n.lines.loc[ext_typed_i, 'num_parallel'] = (n.lines.s_nom/base_s_nom)[ext_typed_i]
def update_line_params(n, s_nom_prev):
factor = n.lines.s_nom_opt / s_nom_prev
for attr, carrier in (('x', 'AC'), ('r', 'DC')):
ln_i = (n.lines.query('carrier == @carrier').index & ext_untyped_i)
n.lines.loc[ln_i, attr] /= factor[ln_i]
ln_i = ext_i & typed_i
n.lines.loc[ln_i, 'num_parallel'] = (n.lines.s_nom_opt/base_s_nom)[ln_i]
def msq_diff(n, s_nom_prev):
def define_ramp_limit_constraints(n, sns):
"""
Defines ramp limits for generators wiht valid ramplimit
"""
c = 'Generator'
rup_i = n.df(c).query('ramp_limit_up == ramp_limit_up').index
rdown_i = n.df(c).query('ramp_limit_down == ramp_limit_down').index
if rup_i.empty & rdown_i.empty:
return
fix_i = get_non_extendable_i(n, c)
ext_i = get_extendable_i(n, c)
com_i = n.df(c).query('committable').index.difference(ext_i)
p = get_var(n, c, 'p').loc[sns[1:]]
p_prev = get_var(n, c, 'p').shift(1).loc[sns[1:]]
# fix up
gens_i = rup_i & fix_i
lhs = linexpr((1, p[gens_i]), (-1, p_prev[gens_i]))
rhs = n.df(c).loc[gens_i].eval('ramp_limit_up * p_nom')
define_constraints(n, lhs, '<=', rhs, c, 'mu_ramp_limit_up', spec='nonext.')
# ext up
gens_i = rup_i & ext_i
limit_pu = n.df(c)['ramp_limit_up'][gens_i]
p_nom = get_var(n, c, 'p_nom')[gens_i]
lhs = linexpr((1, p[gens_i]), (-1, p_prev[gens_i]), (-limit_pu, p_nom))
define_constraints(n, lhs, '<=', 0, c, 'mu_ramp_limit_up', spec='ext.')
def define_dispatch_for_extendable_and_committable_variables(n, sns, c, attr):
"""
Initializes variables for power dispatch for a given component and a
given attribute.
Parameters
----------
n : pypsa.Network
c : str
name of the network component
attr : str
name of the attribute, e.g. 'p'
"""
ext_i = get_extendable_i(n, c)
if c == 'Generator':
ext_i = ext_i | n.generators.query('committable').index
if ext_i.empty: return
define_variables(n, -np.inf, np.inf, c, attr, axes=[sns, ext_i], spec='extendables')
def define_generator_status_variables(n, snapshots):
com_i = n.generators.query('committable').index
ext_i = get_extendable_i(n, 'Generator')
if not (ext_i & com_i).empty:
logger.warning("The following generators have both investment optimisation"
f" and unit commitment:\n\n\t{', '.join((ext_i & com_i))}\n\nCurrently PyPSA cannot "
"do both these functions, so PyPSA is choosing investment optimisation "
"for these generators.")
com_i = com_i.difference(ext_i)
if com_i.empty: return
define_binaries(n, (snapshots, com_i), 'Generator', 'status')