Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@testing.parametrized(
equal=([2, 3, 1], ""),
missing=((1, 2), [" - missing element(s): {3}."]),
additional=((1, 4, 3, 2), [" - additional element(s): {4}."]),
both=((1, 2, 4), [" - additional element(s): {4}.", " - missing element(s): {3}."]),
)
def test_assert_set_equal(estimate: Iterable[int], message: str) -> None:
reference = {1, 2, 3}
try:
testing.assert_set_equal(estimate, reference)
except AssertionError as error:
if not message:
raise AssertionError("An error has been raised while it should not.")
np.testing.assert_equal(error.args[0].split("\n")[1:], message)
else:
if message:
raise AssertionError("An error should have been raised.")
@testing.parametrized(
tanh=(transforms.TanhBound(0, 5), [2, 4], None),
tanh_err=(transforms.TanhBound(0, 5), [2, 4, 6], ValueError),
clipping=(transforms.Clipping(0), [2, 4, 6], None),
clipping_err=(transforms.Clipping(0), [-2, 4, 6], ValueError),
arctan=(transforms.ArctanBound(0, 5), [2, 4, 5], None),
arctan_err=(transforms.ArctanBound(0, 5), [-1, 4, 5], ValueError),
cumdensity=(transforms.CumulativeDensity(), [0, .5], None),
cumdensity_err=(transforms.CumulativeDensity(), [-0.1, .5], ValueError),
)
def test_out_of_bound(transform: transforms.Transform, x: List[float], expected: Optional[Type[Exception]]) -> None:
if expected is None:
transform.backward(np.array(x))
else:
with pytest.raises(expected):
transform.backward(np.array(x))
@testing.parametrized(
arity2=(2, [0, -4, 0, 4, 0, 0], [0, 1, .5], [0, 1, 0]),
arity2_1=(2, [0, 40], [1], [1]),
arity3=(3, [0, -4, 0, 0, 4, 0], [1, 1], [0, 1]), # first is 0 or 2, second is 1
arity2_0_sum=(2, [0, 0], [.5], [0]), # first is 0 or 2, second is 1
pinf_case=(2, [0, np.inf], [1], [1]),
nan_case=(2, [np.nan, 0], [1], [1]),
ninf_case=(2, [-np.inf, 0], [1], [1]),
all_ninf_case=(2, [-np.inf, -np.inf], [.5], [0]),
)
def test_softmax_discretization(arity: int, data: List[float], expected: List[float],
deterministic_expected: List[float]) -> None:
coeffs = np.array(data, copy=True)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
output = np.mean([discretization.softmax_discretization(coeffs, arity=arity) for _ in range(1000)], axis=0)
np.testing.assert_array_equal(coeffs, data, err_msg="Input data was modified")
@testing.parametrized(
empty=([], [], [])
)
def test_split_data(tokens: List[Variable], data: List[float], expected: List[List[float]]) -> None:
instru = mvar.Instrumentation(*tokens)
output = instru._split_data(np.array(data))
testing.printed_assert_equal(output, expected)
@testing.parametrized(**{name: (name, func) for name, func in corefuncs.registry.items()})
def test_core_function(name: str, func: Callable[..., Any]) -> None:
x = np.random.normal(0, 1, 100)
outputs = []
for _ in range(2):
np.random.seed(12)
outputs.append(func(x))
np.testing.assert_equal(outputs[0], outputs[1], f'Function {name} is not deterministic')
@testing.parametrized(
void=([], []),
one=(["a"], []),
two=([1, 2], [(1, 2)]),
three=([1, 2, 3], [(1, 2), (2, 3)]),
)
def test_pairwise(iterator: Iterable[Any], expected: List[Tuple[Any, ...]]) -> None:
output = list(tools.pairwise(iterator))
testing.printed_assert_equal(output, expected)
@testing.parametrized(
w3_batch=(True, ['s0', 's1', 's2', 'u0', 'u1', 'u2', 's3', 's4', 'u3', 'u4']),
w3_steady=(False, ['s0', 's1', 's2', 'u2', 's3', 'u1', 's4', 'u0', 'u3', 'u4']), # u0 and u1 are delayed
)
def test_batch_mode_parameter(batch_mode: bool, expected: List[str]) -> None:
func = Function(dimension=1)
optim = test_base.LoggingOptimizer(3)
with patch.object(xpbase.OptimizerSettings, "instanciate", return_value=optim):
xp = xpbase.Experiment(func, optimizer="OnePlusOne", budget=10, num_workers=3, batch_mode=batch_mode)
xp._run_with_error()
testing.printed_assert_equal(optim.logs, expected)
@testing.parametrized(
w1_batch=(1, True, ['s0', 'u0', 's1', 'u1', 's2', 'u2', 's3', 'u3', 's4', 'u4']),
w1_steady=(1, False, ['s0', 'u0', 's1', 'u1', 's2', 'u2', 's3', 'u3', 's4', 'u4']), # no difference (normal, since worker=1)
w3_batch=(3, True, ['s0', 's1', 's2', 'u0', 'u1', 'u2', 's3', 's4', 'u3', 'u4']),
w3_steady=(3, False, ['s0', 's1', 's2', 'u0', 'u1', 'u2', 's3', 's4', 'u3', 'u4']), # not really steady TODO change this behavior
# w3_steady=(3, False, ['s0', 's1', 's2', 'u0', 's3', 'u1', 's4', 'u2', 'u3', 'u4']), # This is what we would like
)
def test_batch_and_steady_optimization(num_workers: int, batch_mode: bool, expected: List[Tuple[str, float]]) -> None:
# tests the suggestion (s) and update (u) patterns
# the w3_steady is unexpected. It is designed to be efficient with a non-sequential executor, but because
# of it, it is acting like batch mode when sequential...
optim = LoggingOptimizer(num_workers=num_workers)
func = CounterFunction()
with warnings.catch_warnings():
warnings.simplefilter("ignore")
optim.minimize(func, verbosity=2, batch_mode=batch_mode)
testing.printed_assert_equal(optim.logs, expected)
@testing.parametrized(
empty=([], [], [])
)
def test_split_data(tokens: List[utils.Variable[Any]], data: List[float], expected: List[List[float]]) -> None:
output = utils.split_data(data, tokens)
testing.printed_assert_equal(output, expected)