Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if normalize:
data_normalized = normalized_copy(data_to_cluster)
else:
data_normalized = data_to_cluster
if 'file=' in clustering_func:
file = clustering_func.split('=')[1]
if ':' in file:
file, column = file.rsplit(':', 1)
else:
column = None
df = model_run.timeseries_data[file]
if isinstance(df, pd.Series) and column is not None:
exceptions.warn(
'{} given as time clustering column, but only one column to '
'choose from in {}.'.format(column, file)
)
clusters = df.resample('1D').mean()
elif isinstance(df, pd.DataFrame) and column is None:
raise exceptions.ModelError(
'No time clustering column given, but multiple columns found in '
'{0}. Choose one column and add it to {1} as {1}:name_of_column.'
.format(file, clustering_func)
)
elif isinstance(df, pd.DataFrame) and column not in df.columns:
raise KeyError(
'time clustering column {} not found in {}.'.format(column, file)
)
elif isinstance(df, pd.DataFrame):
clusters = df.loc[:, column].groupby(pd.Grouper(freq='1D')).unique()
sklearn.KMeans(k).AgglomerativeClustering(X). Allows user to access
specific attributes, for detailed statistical analysis.
"""
if timesteps is not None:
data = data.loc[{'timesteps': timesteps}]
else:
timesteps = data.timesteps.values
X = reshape_for_clustering(data, tech, variables)
if func == 'kmeans':
if not k:
k = hartigan_n_clusters(X)
exceptions.warn(
'Used Hartigan\'s rule to determine that'
'a good number of clusters is {}.'.format(k)
)
clustered_data = sk_cluster.KMeans(k).fit(X)
elif func == 'hierarchical':
if not k:
raise exceptions.ModelError(
'Cannot undertake hierarchical clustering without a predefined '
'number of clusters (k)'
)
clustered_data = sk_cluster.AgglomerativeClustering(k).fit(X)
# Determine the cluster membership of each day
day_clusters = clustered_data.labels_
def save_csv(model_data, path, dropna=True):
"""
If termination condition was not optimal, filters inputs only, and
warns that results will not be saved.
"""
os.makedirs(path, exist_ok=False)
# a MILP model which optimises to within the MIP gap, but does not fully
# converge on the LP relaxation, may return as 'feasible', not 'optimal'
if ('termination_condition' not in model_data.attrs or
model_data.attrs['termination_condition'] in ['optimal', 'feasible']):
data_vars = model_data.data_vars
else:
data_vars = model_data.filter_by_attrs(is_result=0).data_vars
exceptions.warn(
'Model termination condition was not optimal, saving inputs only.'
)
for var in data_vars:
in_out = 'results' if model_data[var].attrs['is_result'] else 'inputs'
out_path = os.path.join(path, '{}_{}.csv'.format(in_out, var))
series = split_loc_techs(model_data[var], return_as='Series')
if dropna:
series = series.dropna()
series.to_csv(out_path, header=True)
def check_optimality(self):
termination = self._model._model_data.attrs.get(
'termination_condition', 'did_not_yet_run')
# a MILP model which optimises to within the MIP gap, but does not fully
# converge on the LP relaxation, may return as 'feasible', not 'optimal'
if termination not in ['optimal', 'did_not_yet_run', 'feasible']:
warn('Model termination condition was not optimal. Plotting may fail!')
)
model_data['storage_initial'].attrs['is_result'] = 0.0
exceptions.warn(
'Initial stored energy not defined, set to zero for all '
'loc::techs in loc_techs_store, for use in iterative optimisation'
)
# Operated units is carried over between iterations, so must be defined in a milp model
if ('loc_techs_milp' in model_data.dims.keys() and
'operated_units' not in model_data.data_vars.keys()):
model_data['operated_units'] = (
xr.DataArray([0 for loc_tech in model_data.loc_techs_milp.values],
dims='loc_techs_milp')
)
model_data['operated_units'].attrs['is_result'] = 1
model_data['operated_units'].attrs['operate_param'] = 1
exceptions.warn(
'daily operated units not defined, set to zero for all '
'loc::techs in loc_techs_milp, for use in iterative optimisation'
)
comments, warnings, errors = checks.check_operate_params(model_data)
exceptions.print_warnings_and_raise_errors(warnings=warnings, errors=errors)
# Initialize our variables
solver = run_config['solver']
solver_io = run_config.get('solver_io', None)
solver_options = run_config.get('solver_options', None)
save_logs = run_config.get('save_logs', None)
window = run_config['operation']['window']
horizon = run_config['operation']['horizon']
window_to_horizon = horizon - window
])
caps = cap_max.update(cap_equals)
for cap in caps.data_vars.values():
cap.attrs['is_result'] = 1
cap.attrs['operate_param'] = 1
model_data.update(caps)
# Storage initial is carried over between iterations, so must be defined along with storage
if ('loc_techs_store' in model_data.dims.keys() and
'storage_initial' not in model_data.data_vars.keys()):
model_data['storage_initial'] = (
xr.DataArray([0.0 for loc_tech in model_data.loc_techs_store.values],
dims='loc_techs_store')
)
model_data['storage_initial'].attrs['is_result'] = 0.0
exceptions.warn(
'Initial stored energy not defined, set to zero for all '
'loc::techs in loc_techs_store, for use in iterative optimisation'
)
# Operated units is carried over between iterations, so must be defined in a milp model
if ('loc_techs_milp' in model_data.dims.keys() and
'operated_units' not in model_data.data_vars.keys()):
model_data['operated_units'] = (
xr.DataArray([0 for loc_tech in model_data.loc_techs_milp.values],
dims='loc_techs_milp')
)
model_data['operated_units'].attrs['is_result'] = 1
model_data['operated_units'].attrs['operate_param'] = 1
exceptions.warn(
'daily operated units not defined, set to zero for all '
'loc::techs in loc_techs_milp, for use in iterative optimisation'
)
var.attrs['is_result'] = 0
new_model_data = xr.merge((results, inputs))
new_model_data.attrs.update(model_data.attrs)
# Only add coordinates from the original model_data that don't already exist
new_coords = [
i for i in model_data.coords.keys() if i not in new_model_data.coords.keys()
]
new_model_data = new_model_data.update(model_data[new_coords])
# Reorganise the coordinates so that model data and new model data share
# the same order of items in each dimension
new_model_data = new_model_data.reindex(model_data.coords)
exceptions.warn(
'The results of rerunning the backend model are only available within '
'the Calliope model returned by this function call.'
)
new_calliope_model = calliope.Model(config=None, model_data=new_model_data)
new_calliope_model._timings = timings
return new_calliope_model