Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_GH9():
x_old = np.array([1.5, 2.5, 3.5, 4.5, 5.5, 6.5])
y_old = np.array([10, 10, 10, 10, 10])
x_new = np.array([1.7, 2.27332857, 2.84665714, 3.41998571,
3.99331429, 4.56664286])
y_new = rebin.rebin(x_old, y_old, x_new)
assert_allclose(y_new,
[5.7332857] * 5)
# with uncertainties
y_old = np.array([11., 12., 13., 14., 15.])
y_old = unp.uarray(y_old, 0.1 * y_old)
# rebin
y_new = rebin.rebin_piecewise_constant(x_old, y_old, x_new)
# compute answer here to check rebin
y_old_ave = y_old / np.diff(x_old)
y_new_here = np.array(
[y_old_ave[0] * (x_new[1] - x_new[0]),
y_old_ave[0] * (x_old[1] - x_new[1]) +
y_old_ave[1] * (x_new[2] - x_old[1]),
y_old_ave[1] * (x_new[3] - x_new[2]),
y_old_ave[1] * (x_old[2] - x_new[3]) +
y_old_ave[2] * (x_new[4] - x_old[2]),
def test_array_and_matrix_creation():
"Test of custom array creation"
arr = unumpy.uarray(([1, 2], [0.1, 0.2]))
assert arr[1].nominal_value == 2
assert arr[1].std_dev() == 0.2
# Same thing for matrices:
mat = unumpy.umatrix(([1, 2], [0.1, 0.2]))
assert mat[0, 1].nominal_value == 2
assert mat[0, 1].std_dev() == 0.2
def test_array_comparisons():
"Test of array and matrix comparisons"
arr = unumpy.uarray(([1, 2], [1, 4]))
assert numpy.all((arr == [arr[0], 4]) == [True, False])
# For matrices, 1D arrays are converted to 2D arrays:
mat = unumpy.umatrix(([1, 2], [1, 4]))
assert numpy.all((mat == [mat[0, 0], 4]) == [True, False])
y_old = 1.+np.sin(x_old[:-1]*np.pi)
# compute spline ----------------------------------
x_mids = x_old[:-1] + 0.5*np.ediff1d(x_old)
xx = np.hstack([x_old[0], x_mids, x_old[-1]])
yy = np.hstack([y_old[0], y_old, y_old[-1]])
# build spline
spl = splrep(xx, yy)
area_old = np.array(
[ splint(x_old[i],x_old[i+1], spl) for i in range(m) ])
# with uncertainties
y_old = unp.uarray(y_old, 0.1*y_old*uniform((m,)))
# computing subbin areas
area_subbins = np.zeros((subbins.size-1,))
for i in range(area_subbins.size):
a, b = subbins[i:i+2]
a = max([a,x_old[0]])
b = min([b,x_old[-1]])
if b>a:
area_subbins[i] = splint(a, b, spl)
# summing subbin contributions in y_new_ref
a = np.zeros((x_new.size-1,))
y_new_ref = unp.uarray(a,a)
y_new_ref[1] = y_old[0] * area_subbins[2] / area_old[0]
y_new_ref[2] = y_old[0] * area_subbins[3] / area_old[0]
y_new_ref[3] = y_old[0] * area_subbins[4] / area_old[0]
def test_x2_in_x1_2():
"""
x2 has a couple of bins, each of which span more than one original bin
"""
# old size
m = 10
# bin edges
x_old = np.linspace(0., 1., m+1)
x_new = np.array([0.25, 0.55, 0.75])
# some arbitrary distribution
y_old = 1. + np.sin(x_old[:-1]*np.pi) / np.ediff1d(x_old)
y_old = unp.uarray(y_old, 0.1*y_old*uniform((m,)))
# rebin
y_new = rebin.rebin(x_old, y_old, x_new, interp_kind='piecewise_constant')
# compute answer here to check rebin
y_new_here = unp.uarray(np.zeros(2), np.zeros(2))
y_new_here[0] = 0.5 * y_old[2] + y_old[3] + y_old[4] + 0.5 * y_old[5]
y_new_here[1] = 0.5 * y_old[5] + y_old[6] + 0.5 * y_old[7]
assert_allclose(unp.nominal_values(y_new),
unp.nominal_values(y_new_here))
# mean or nominal value comparison
assert_allclose(unp.std_devs(y_new),
unp.std_devs(y_new_here))
Uncertain data
==============
This example shows how OMAS can seamlessly handle unceratain data leveraging the `uncertainties` Python package
"""
from omas import *
import os
import numpy
import uncertainties.unumpy as unumpy
from uncertainties import ufloat
# generate some uncertain data
ods = ODS()
ods['equilibrium.time'] = [0.0]
ods['equilibrium.time_slice[0].global_quantities.ip'] = ufloat(3, 0.1)
ods['thomson_scattering.channel[0].t_e.data'] = unumpy.uarray([1, 2, 3], [.1, .2, .3])
ods['thomson_scattering.channel[0].n_e.data'] = numpy.array([1., 2., 3.])
ods['thomson_scattering.time'] = numpy.linspace(0, 1, 3)
ods['thomson_scattering.ids_properties.homogeneous_time'] = 1
# save/load from pickle
print('== PKL ==')
try:
__file__
except NameError:
import inspect
__file__ = inspect.getfile(lambda: None)
omas_testing_directory = omas_testdir()
save_omas_pkl(ods, omas_testdir(__file__) + '/test.pkl')
ods = load_omas_pkl(omas_testdir(__file__) + '/test.pkl')
print(ods)
If you do not have errors in the flux:
>>> lamb,fnu=mjy(xdata,ydata,dist)
:param dist: distance in Mpc
"""
import uncertainties.unumpy as unumpy
c=29979245800. # speed of light in CGS
dist=dist*3.085677581e24 # Mpc -> cm
nu=10**lognu
lamb=c/nu*1e4 # cm -> micron
if llerr!=None:
lllerr=unumpy.uarray(ll,llerr)
else:
lllerr=ll
lnuerr=10**lllerr/nu
fluxerr=lnuerr/(1e-26*4.*numpy.pi*dist**2) # Lnu (erg/s/Hz) -> Fnu (mJy)
if llerr!=None:
fluxerr=unumpy.log10(fluxerr)
return numpy.log10(lamb),unumpy.nominal_values(fluxerr),unumpy.std_devs(fluxerr)
else:
return numpy.log10(lamb),numpy.log10(fluxerr)
def gradient(self, x):
sx = unp.uarray((x, np.inf))
sf = self.f(sx)
return np.array([sf.derivatives[var] for var in sx])
# Check if measure exists
if name not in self.measures:
raise KeyError(name)
distribution = self._get_measure_distribution(name)
distribution_fields = self._get_measure_distribution_fields(name).values.transpose()
stats = self._stats_registry.distributions.get_scipy_repr(distribution)
if isinstance(stats, tuple):
model = stats[0]
if model:
params = {
param: f(*distribution_fields) for param, f in stats[1].items()
}
return pd.Series(uarray(model.mean(**params), model.std(**params)), name=name, index=self.raw.index)
elif stats:
return pd.Series(stats(*distribution_fields), name=name, index=self.raw.index)
return distribution_fields[0] # If no stats, return raw sum field
# Check if measure exists
if name not in self.measures:
raise KeyError(name)
distribution = self._get_measure_distribution(name)
distribution_fields = self._get_measure_distribution_fields(name).values.transpose()
stats = self._stats_registry.distributions.get_scipy_repr(distribution)
if isinstance(stats, tuple):
model = stats[0]
if model:
params = {
param: f(*distribution_fields) for param, f in stats[1].items()
}
return pd.Series(uarray(model.mean(**params), model.std(**params)), name=name, index=self.raw.index)
elif stats:
return pd.Series(stats(*distribution_fields), name=name, index=self.raw.index)
return distribution_fields[0] # If no stats, return raw sum field