Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
metrics : list of str
List of metrics to be computed.
Returns
-------
list
List of datamodel.MetricResult for each
datamodel.ProcessedForecastObservation
"""
calc_metrics = []
for proc_fxobs in processed_pairs:
# determine type of metrics to calculate
if isinstance(proc_fxobs.original.forecast,
(datamodel.ProbabilisticForecast,
datamodel.ProbabilisticForecastConstantValue)):
try:
calc_metrics.append(calculate_probabilistic_metrics(
proc_fxobs,
categories,
metrics))
except RuntimeError as e:
logger.error('Failed to calculate probabilistic metrics'
' for %s: %s', proc_fxobs.name, e)
elif isinstance(proc_fxobs.original.forecast, datamodel.EventForecast):
try:
calc_metrics.append(calculate_event_metrics(
proc_fxobs, categories, metrics
))
except RuntimeError as e:
logger.error('Failed to calculate event metrics for %s: %s',
----------
modeling_parameters : datamodel.FixedTiltModelingParameters or
datamodel.SingleAxisModelingParameters
Returns
-------
function
Function that accepts two arguments (apparent_zenith, azimuth)
and returns three series (surface_tilt, surface_azimuth, aoi)
Raises
------
TypeError if modeling_parameters is invalid.
"""
if isinstance(modeling_parameters,
datamodel.FixedTiltModelingParameters):
return partial(
aoi_fixed,
modeling_parameters.surface_tilt,
modeling_parameters.surface_azimuth
)
elif isinstance(modeling_parameters,
datamodel.SingleAxisModelingParameters):
return partial(
aoi_tracking,
modeling_parameters.axis_tilt,
modeling_parameters.axis_azimuth,
modeling_parameters.max_rotation_angle,
modeling_parameters.backtrack,
modeling_parameters.ground_coverage_ratio
)
else:
obs = self.get_observation(o['observation'])
pair = datamodel.ForecastObservation(
fx, obs, normalization=norm, uncertainty=unc,
reference_forecast=ref_fx, cost=cost)
elif 'aggregate' in o:
agg = self.get_aggregate(o['aggregate'])
pair = datamodel.ForecastAggregate(
fx, agg, normalization=norm, uncertainty=unc,
reference_forecast=ref_fx, cost=cost)
else:
raise ValueError('must provide observation or aggregate in all'
'object_pairs')
pairs.append(pair)
rep_params['object_pairs'] = tuple(pairs)
req_dict['report_parameters'] = rep_params
return datamodel.Report.from_dict(req_dict)
res = _apply_event_metric_func(
metric_, group.forecast, group.observation
)
# Change category label of the group from numbers
# to e.g. January or Monday
if category == 'month':
cat = calendar.month_abbr[cat]
elif category == 'weekday':
cat = calendar.day_abbr[cat]
metric_vals.append(datamodel.MetricValue(
category, metric_, str(cat), res))
out['values'] = _sort_metrics_vals(metric_vals,
datamodel.ALLOWED_EVENT_METRICS)
calc_metrics = datamodel.MetricResult.from_dict(out)
return calc_metrics
Parameters
----------
report_id : string
UUID of the report to retrieve
Returns
-------
datamodel.Report
"""
req = self.get(f'/reports/{report_id}')
resp = req.json()
raw = resp.pop('raw_report', None)
report = self.process_report_dict(resp)
if raw is not None:
raw_report = datamodel.RawReport.from_dict(raw)
processed_fxobs = self.get_raw_report_processed_data(
report_id, raw_report, resp['values'])
report = report.replace(raw_report=raw_report.replace(
processed_forecasts_observations=processed_fxobs))
return report
handle_func : function
Function that handles how `quality_flags` will be used.
See solarforecastarbiter.metrics.preprocessing.exclude as an
example.
Returns
-------
validated_obs : pandas.Series
The validated timeseries data as pandas.Series.
counts : dict
Dict where keys are qfilter.quality_flags and values
are integers indicating the number of points filtered
for the given flag.
"""
# List of flags from filter
if not isinstance(qfilter, datamodel.QualityFlagFilter):
raise TypeError(f"{qfilter} not a QualityFlagFilter")
filters = qfilter.quality_flags
if obs_df.empty:
return obs_df.value, {f: 0 for f in filters}
else:
validation_df = quality_mapping.convert_mask_into_dataframe(
obs_df['quality_flag'])
validation_df = validation_df[list(filters)]
validated_obs = handle_func(obs_df.value, validation_df)
counts = validation_df.astype(int).sum(axis=0).to_dict()
return validated_obs, counts
Fields must be kind and the names of the forecasts
category : str
One of the available metrics grouping categories (e.g., total)
Returns
-------
figs : dict of figures
"""
palette = cycle(PALETTE)
tools = 'pan,xwheel_zoom,box_zoom,reset,save'
fig_kwargs = dict(tools=tools, toolbar_location='above')
figs = {}
width = 0.8
human_category = datamodel.ALLOWED_CATEGORIES[category]
metric_name = datamodel.ALLOWED_DETERMINISTIC_METRICS[metric]
fig_kwargs['x_axis_label'] = human_category
fig_kwargs['y_axis_label'] = metric_name
filter_ = ((np.asarray(cds.data['category']) == category) &
(np.asarray(cds.data['metric']) == metric))
# Special handling for x-axis with dates
if category == 'date':
fig_kwargs['x_axis_type'] = 'datetime'
width = width * pd.Timedelta(days=1)
fig_kwargs['x_range'] = DataRange1d()
elif category == 'month':
fig_kwargs['x_range'] = FactorRange(
factors=calendar.month_abbr[1:])
elif category == 'weekday':
def infer_timezone(report_parameters):
# maybe not ideal when comparing across sites. might need explicit
# tz options ('infer' or spec IANA tz) in report interface.
fxobs_0 = report_parameters.object_pairs[0]
if isinstance(fxobs_0, datamodel.ForecastObservation):
timezone = fxobs_0.observation.site.timezone
else:
timezone = fxobs_0.aggregate.timezone
return timezone
Get Aggregate metadata from the API for the given aggregate_id
Parameters
----------
aggregate_id : string
UUID of the aggregate to get metadata for
Returns
-------
datamodel.Aggregate
"""
req = self.get(f'/aggregates/{aggregate_id}/metadata')
agg_dict = req.json()
for o in agg_dict['observations']:
o['observation'] = self.get_observation(o['observation_id'])
return datamodel.Aggregate.from_dict(agg_dict)
def _fx_name(forecast, data_object):
# TODO: add code to ensure fx names are unique
forecast_name = forecast.name
if isinstance(forecast, datamodel.ProbabilisticForecastConstantValue):
if forecast.axis == 'x':
forecast_name += \
f' Prob(x <= {forecast.constant_value} {forecast.units})'
else:
forecast_name += f' Prob(f <= x) = {forecast.constant_value}%'
if forecast_name == data_object.name:
forecast_name += ' Forecast'
return forecast_name