Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
limits1space = Space(obs=obs1, axes=axes1, limits=limits1)
limits2space = Space(obs=obs2, axes=axes2, limits=limits2)
limit_fn = lambda x: x
limits1func = Space(obs=obs1, axes=axes1, limits=limit_fn, rect_limits=limits1)
limits2func = limits1func.with_obs(obs2)
limits3 = ([11, 12, 13, 14, 15], [31, 41, 51, 61, 71])
limits4 = ([13, 12, 14, 15, 11], [51, 41, 61, 71, 31])
limits1vector = ([[-21, -11, 91, 11, 21], [-22, -12, 92, 12, 22], [-23, -13, 93, 13, 23]],
[[31, 41, 51, 61, 71], [32, 42, 52, 62, 72], [33, 43, 53, 63, 73]])
limits2vector = ([[91, -11, 11, 21, -21], [92, -12, 12, 22, -22], [93, -13, 13, 23, -23]],
[[51, 41, 61, 71, 31], [52, 42, 62, 72, 32], [53, 43, 63, 73, 33]])
limits1tf = (z.convert_to_tensor([-2, -1, 0, 1, 2]), z.convert_to_tensor([3, 4, 5, 6, 7]))
limits2tf = (z.convert_to_tensor([0, -1, 1, 2, -2]), z.convert_to_tensor([5, 4, 6, 7, 3]))
limits1mixed_any = ([-2, ANY, 0, ANY, 2], [3, 4, 5, ANY, 7])
limits2mixed_any = ([0, ANY, ANY, 2, -2], [5, 4, ANY, 7, 3])
limits1any = ([ANY] * 5, [ANY] * 5)
limits2any = ([ANY] * 5, [ANY] * 5)
limits_to_test = [
[limits1, limits2],
[limits1limit, limits2limit],
[limits1space, limits2space],
[limits1tf, limits2tf],
[limits1mixed_any, limits2mixed_any],
[limits1any, limits2any],
[limits1vector, limits2vector],
def calc_numerics_data_shift():
lower, upper = [], []
for limit in limits:
low, up = limit.rect_limits
lower.append(z.convert_to_tensor(low[:, 0]))
upper.append(z.convert_to_tensor(up[:, 0]))
lower = z.convert_to_tensor(lower)
upper = z.convert_to_tensor(upper)
lower_val = tf.math.reduce_min(lower, axis=0)
upper_val = tf.math.reduce_max(upper, axis=0)
return (upper_val + lower_val) / 2
def create_covariance(mu, sigma):
mu = z.convert_to_tensor(mu)
sigma = z.convert_to_tensor(sigma) # TODO (Mayou36): fix as above?
params_tensor = z.convert_to_tensor(params)
if sigma.shape.ndims > 1:
covariance = sigma
elif sigma.shape.ndims == 1:
covariance = tf.linalg.tensor_diag(z.pow(sigma, 2.))
else:
sigma = tf.reshape(sigma, [1])
covariance = tf.linalg.tensor_diag(z.pow(sigma, 2.))
if not params_tensor.shape[0] == mu.shape[0] == covariance.shape[0] == covariance.shape[1]:
raise ShapeIncompatibleError(f"params_tensor, observation and uncertainty have to have the"
" same length. Currently"
f"param: {params_tensor.shape[0]}, mu: {mu.shape[0]}, "
f"covariance (from uncertainty): {covariance.shape[0:2]}")
return covariance
def inside_rect_limits(x, rect_limits):
if not x.shape.ndims > 1:
raise ValueError("x has ndims <= 1, which is most probably not wanted. The default shape for array-like"
" structures is (nevents, n_obs).")
lower, upper = z.unstack_x(rect_limits, axis=0)
lower = z.convert_to_tensor(lower)
upper = z.convert_to_tensor(upper)
below_upper = tf.reduce_all(input_tensor=tf.less_equal(x, upper), axis=-1) # if all obs inside
above_lower = tf.reduce_all(input_tensor=tf.greater_equal(x, lower), axis=-1)
inside = tf.logical_and(above_lower, below_upper)
return inside
def _rect_limits_tf(self) -> ztyping.RectLimitsTFReturnType:
rect_limits = self._rect_limits
if rect_limits in (None, False):
return rect_limits
lower = z.convert_to_tensor(rect_limits[0])
upper = z.convert_to_tensor(rect_limits[1])
return z.convert_to_tensor((lower, upper))
def _sanitize_x_input(x, n_obs):
x = z.convert_to_tensor(x)
if not x.shape.ndims > 1 and n_obs > 1:
raise ValueError("x has ndims <= 1, which is most probably not wanted. The default shape for array-like"
" structures is (nevents, n_obs).")
elif x.shape.ndims <= 1 and n_obs == 1:
if x.shape.ndims == 0:
x = tf.broadcast_to(x, (1, 1))
else:
x = tf.expand_dims(x, axis=-1)
if tf.get_static_value(x.shape[-1]) != n_obs:
raise ShapeIncompatibleError("n_obs and the last dim of x do not agree. Assuming x has shape (..., n_obs)")
return x
def _analytic_integrate(self, limits, norm_range):
pdfs = self.pdfs
fracs = self.fracs
try:
integrals = [frac * pdf.analytic_integrate(limits=limits) # do NOT propagate the norm_range!
for pdf, frac in zip(pdfs, fracs)]
except AnalyticIntegralNotImplementedError as error:
raise AnalyticIntegralNotImplementedError(
f"analytic_integrate of pdf {self.name} is not implemented in this"
f" SumPDF, as at least one sub-pdf does not implement it.") from error
# TODO(SUM): change the below? broadcast integrals?
# integral = tf.reduce_sum(input_tensor=integrals)
integral = functools.reduce(operator.add, integrals)
# integral = tf.math.accumulate_n(integrals)
return z.convert_to_tensor(integral)
def _rect_limits_tf(self) -> ztyping.RectLimitsTFReturnType:
rect_limits = self._rect_limits
if rect_limits in (None, False):
return rect_limits
lower = z.convert_to_tensor(rect_limits[0])
upper = z.convert_to_tensor(rect_limits[1])
return z.convert_to_tensor((lower, upper))
def inside_rect_limits(x, rect_limits):
if not x.shape.ndims > 1:
raise ValueError("x has ndims <= 1, which is most probably not wanted. The default shape for array-like"
" structures is (nevents, n_obs).")
lower, upper = z.unstack_x(rect_limits, axis=0)
lower = z.convert_to_tensor(lower)
upper = z.convert_to_tensor(upper)
below_upper = tf.reduce_all(input_tensor=tf.less_equal(x, upper), axis=-1) # if all obs inside
above_lower = tf.reduce_all(input_tensor=tf.greater_equal(x, lower), axis=-1)
inside = tf.logical_and(above_lower, below_upper)
return inside