def test_close_global(self):
"""Test the :func:`psyplot.project.close` function"""
psy.register_plotter('test_plotter', module='test_plotter',
with psy.open_dataset(bt.get_file('')) as ds:
time = ds.time.values
lev = ds.lev.values
mp0 = psy.plot.test_plotter(bt.get_file(''), name='t2m',
lev=[0, 1]).main
mp1 = psy.project()
psy.plot.test_plotter(bt.get_file(''), name='t2m',
time=[1, 2])
mp2 = psy.project()
sp1 = psy.plot.test_plotter(bt.get_file(''), name='t2m',
time=[3, 4])
sp2 = psy.plot.test_plotter(bt.get_file(''), name='t2m',
lev=[2, 3])
# some checks in the beginning
self.assertEqual(len(mp0), 2)
self.assertEqual(len(mp1), 2)
self.assertEqual(len(mp2), 4)
self.assertEqual(mp0[0].lev.values, lev[0])
self.assertEqual(mp0[1].lev.values, lev[1])
self.assertEqual(mp1[0].time.values, time[1])
self.assertEqual(mp1[1].time.values, time[2])
self.assertEqual(mp2[0].time.values, time[3])
self.assertEqual(mp2[1].time.values, time[4])
self.assertEqual(mp2[2].lev.values, lev[2])
self.assertEqual(mp2[3].lev.values, lev[3])
self.assertIs(psy.gcp(True), mp2)
self.assertIs(psy.gcp(), sp2)
# close the current subproject
def test_oncpchange_signal(self):
"""Test whether the correct signal is fired"""
psy.register_plotter('test_plotter', module='test_plotter',
check_mains = []
projects = []
def check(p):
ds = psy.open_dataset(bt.get_file('')).load()
sp = psy.plot.test_plotter(ds, name='t2m', lev=[0, 1])
# the signal should have been fired 2 times, one times from the
# subproject, one times from the project
self.assertEqual(len(check_mains), 2)
self.assertIn(False, check_mains)
self.assertIn(True, check_mains)
self.assertEqual(len(projects[0]), 2, msg=str(projects[0]))
self.assertEqual(len(projects[1]), 2, msg=str(projects[1]))
# try scp
check_mains = []
projects = []
p = sp[1:]
self.assertEqual(check_mains, [False],
msg="projects: %s" % (projects, ))
self.assertIs(projects[0], p)
def test_plotter_registration(self):
"""Test the registration of a plotter"""
import_plotter=True, module='test_plotter',
self.assertTrue(hasattr(psy.plot, 'test_plotter'))
self.assertIs(psy.plot.test_plotter.plotter_cls, tp.TestPlotter)
psy.plot.test_plotter.print_func = str
self.assertEqual(psy.plot.test_plotter.fmt1(), tp.SimpleFmt.__doc__)
psy.plot.test_plotter.print_func = None
# test the warning
if not six.PY2:
with self.assertWarnsRegex(UserWarning, "not_existent_module"):
psy.register_plotter('something', "not_existent_module",
'not_important', import_plotter=True)
self.assertFalse(hasattr(psy.Project, 'test_plotter'))
self.assertFalse(hasattr(psy.plot, 'test_plotter'))
variables = ds.vname.values
indicators = self.task_config.indicators
nml = self.namelist
errs = self.err_nml_keys
vmeta = OrderedDict([(key, vmeta[key]) for key in ds.vname.values])
for nml_key in nml:
for i, variable in enumerate(variables):
for ind in indicators:
other_keys = {key: range(ds[key].size)
for key in map(get_key,
set(nml) - {nml_key})}
label = ', '.join(
map('{0}=%({0})s'.format, other_keys))
other_keys['vname'] = i
fmt = self.fmt.copy()
ds, name=ind, dims=other_keys, fmt=fmt,
coord=nml_key, legendlabels=label,
attrs={'vname_long': vmeta.get(variable)})
return psy.gcp(True)[:]
[r1[1], 0.],
[0., r2[0]],
[r2[0], limits[1]],
fit=['poly1', 'poly5', 'poly5', 'poly1'],
_range_label(None, r1[1]),
_range_label(r1[1], 0.0),
_range_label(0.0, r2[0]),
_range_label(r2[0], None),
fmt=self.sd_fit_fmt, method='sel')
psy.gcp(True)(ax=ax).share(keys=['xlim', 'ylim'])
# ---- dry days ----
ax = next(axes)
ds, name=base + '_dry', ax=ax,
coord=v + '_dry',
ylabel='%(long_name)s [$^\circ$C]\non %(state)s days',
text=[(middle, 0.03, '%(xlong_name)s [$^\circ$C]', 'fig', dict(
weight='bold', ha='center'))], fmt=self.sd_hist_fmt)
# bars
psy.plot.barplot(ds, name=base + '_dry_mean', ax=ax,
color='k', alpha=0.5, widths='data')
limits = ax.get_xlim()
psy.plot.linreg(ds, name=base + '_dry_mean', ax=ax,
slice(*r1), # extrapolation < 0
slice(None, 0.0), # polynomial < 0
slice(0.0, None), # polynomial > 0
slice(*r2), # extrapolation > 0
def create_project(self, ds):
import psyplot.project as psy
import seaborn as sns
for name, (vref, vsim) in zip(self.names, self.all_variables):
self.logger.debug('Creating plots of %s', vsim)
kwargs = dict(precision=0.1) if vref.startswith('prcp') else {}
psy.plot.densityreg(ds, name='all_' + vsim, coord='all_' + vref,
fmt=self.fmt, title='All percentiles',
arr_names=['%s_all' % name],
arr_names = ['%s_%1.2f' % (name, p) for p in ds.pctl.values]
psy.plot.densityreg(ds, name=vsim, coord=vref, fmt=self.fmt,
arr_names=arr_names, pctl=range(ds.pctl.size),
return psy.gcp(True)[:]
nml.pop(vname + '_bias_coeffs', None)
for letter in ['L', 'k', 'x0']:
nml[vname + '_slope_bias_' + letter] = float(arr.attrs[letter])
else: # polynomial fit
for letter in ['L', 'k', 'x0']:
nml.pop(vname + '_slope_bias_' + letter, None)
nml[vname + '_bias_coeffs'] = [
float(arr.attrs.get('c%i' % i, 0.0)) for i in range(6)]
# --- intercept bias correction
if osp.exists(project_output) and not new_project:
sp2 = mp.linreg(name='intercept')
sp1 = psy.plot.lineplot(ds, name='intercept', coord='unorm',
linewidth=0, marker='o', legend=False)
sp2 = psy.plot.linreg(
ds, name='intercept', ax=sp1[0],
coord='unorm', fit=exponential_function,
'$\\mathrm{{Simulated}}\\, \\mathrm{{%s}} - '
'\\mathrm{{Observed}}\\, \\mathrm{{%s}}$ [m/s]') % (
vname, vname),
'$\\mathrm{{Simulated}} - \\mathrm{{Observed}} ='
'e^{{%(a)1.4f \\cdot x %(b)+1.4f}}$'),
legend={'fontsize': 'medium', 'loc': 'upper left'},
xlabel='Random number $x$ from normal distribution')
arr = sp2.plotters[0].plot_data[0]
if 'a' in arr.attrs:
nml.pop(vname + '_intercept_bias_coeffs', None)
for letter in ['a', 'b']:
nml[vname + '_intercept_bias_' + letter] = float(
if vname == 'tmin':
ylabel += ' [$^\circ$C]'
# --- slope bias correction
if osp.exists(project_output) and not new_project:
mp = psy.Project.load_project(project_output, datasets=[ds])
sp2 = mp.linreg
import seaborn as sns
sp1 = psy.plot.lineplot(ds, name=what, coord='unorm',
linewidth=0, marker='o', legend=False)
label = '$%s = %s$' % (diff_symbol, ' '.join(
'%(c{})+4.3f{}'.format(i, get_symbol(i))
for i in range(deg + 1)))
sp2 = psy.plot.linreg(
ds, name=what, ax=sp1[0].ax,
coord='unorm', fit='poly' + str(int(deg)),
legend={'fontsize': 'large', 'loc': 'upper left'},
xlabel='Random number from normal distribution')
sp2.share(sp1[0], ['color', 'xlim', 'ylim'])
attrs = sp2.plotters[0].plot_data[0].attrs
nml = self.exp_config['namelist']['weathergen_ctl']
nml[vname + '_bias_coeffs'] = [
float(attrs.get('c%i' % i, 0.0)) for i in range(6)]
nml[vname + '_bias_min'] = float(ds.unorm.min().values)
nml[vname + '_bias_max'] = float(ds.unorm.max().values)
# --- save the data'Saving plots to %s', plot_output)
import matplotlib.pyplot as plt
import seaborn as sns
import psyplot.project as psy
axes = np.concatenate([
plt.subplots(1, 2, figsize=(12, 4))[1] for _ in range(4)])
for fig in set(ax.get_figure() for ax in axes):
middle = (
axes[0].get_position().x0 + axes[1].get_position().x1) / 2.
axes = iter(axes)
variables = ['tmin', 'tmax']
# mean fits
for v in variables:
ds, name=v + '_wet_means', ax=next(axes),
coord=v + '_means',
ylabel='%(long_name)s [$^\circ$C]\non %(state)s days',
text=[(middle, 0.03, '%(xlong_name)s [$^\circ$C]', 'fig', dict(
weight='bold', ha='center'))], fmt=self.fmt)
ds, name=v + '_dry_means', ax=next(axes),
coord=v + '_means',
ylabel='on %(state)s days', fmt=self.fmt)
# sd fits
for v in variables:
ax = next(axes)
base = v + 'stddev'
# ---- wet days----
# density plot
