Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_algebra_to_onnx(self):
X = numpy.random.randn(5, 4)
beta = numpy.array([1, 2, 3, 4]) / 10
beta32 = beta.astype(numpy.float32)
onnxExpM = OnnxExp(OnnxMatMul('X', beta32))
cst = numpy.ones((1, 3), dtype=numpy.float32)
onnxExpM1 = OnnxAdd(onnxExpM, cst)
onnxPred = OnnxDiv(onnxExpM, onnxExpM1)
inputs = {'X': X[:1].astype(numpy.float32)}
model_onnx = onnxPred.to_onnx(inputs)
s1 = str(model_onnx)
model_onnx = onnxPred.to_onnx(inputs)
s2 = str(model_onnx)
assert s1 == s2
nin = list(onnxExpM1.enumerate_initial_types())
nno = list(onnxExpM1.enumerate_nodes())
nva = list(onnxExpM1.enumerate_variables())
self.assertEqual(len(nin), 0)
self.assertEqual(len(nno), 3)
self.assertEqual(len(nva), 0)
def generate_onnx_graph(dim, nbnode, input_name='X1'):
i1 = input_name
for i in range(nbnode - 1):
i2 = (np.ones((1, dim)) * nbnode * 10).astype(np.float32)
node = OnnxAdd(i1, i2)
i1 = node
i2 = (np.ones((1, dim)) * nbnode * 10).astype(np.float32)
node = OnnxAdd(i1, i2, output_names=['Y'])
onx = node.to_onnx([(input_name, FloatTensorType((None, dim)))],
outputs=[('Y', FloatTensorType())])
return onx
def test_onnx_example_cdist_in_mink(self):
x = np.array([1, 2, 4, 5, 5, 4]).astype(np.float32).reshape((3, 2))
x2 = np.array([1.1, 2.1, 4.01, 5.01, 5.001, 4.001, 0, 0]).astype(
np.float32).reshape((4, 2))
cop = OnnxAdd(
'input', 'input', op_version=onnx.defs.onnx_opset_version())
cop2 = OnnxIdentity(
onnx_cdist(cop, x2, dtype=np.float32,
metric="minkowski", p=2,
op_version=onnx.defs.onnx_opset_version()),
output_names=['cdist'],
op_version=onnx.defs.onnx_opset_version())
model_def = cop2.to_onnx(
inputs=[('input', FloatTensorType([None, None]))],
outputs=[('cdist', FloatTensorType())])
sess = InferenceSession(model_def.SerializeToString())
res = sess.run(None, {'input': x})
exp = scipy_cdist(x * 2, x2, metric="minkowski")
assert_almost_equal(exp, res[0], decimal=5)
if isinstance(kernel, (RBF, ExpSineSquared, RationalQuadratic)):
onnx_zeros = _zero_vector_of_size(X, keepdims=0, dtype=dtype,
op_version=op_version)
if isinstance(kernel, RBF):
return OnnxAdd(onnx_zeros,
np.array([1], dtype=dtype),
output_names=output_names,
op_version=op_version)
else:
return OnnxAdd(onnx_zeros, np.array([1], dtype=dtype),
output_names=output_names, op_version=op_version)
if isinstance(kernel, DotProduct):
t_sigma_0 = py_make_float_array(kernel.sigma_0 ** 2, dtype=dtype)
return OnnxSqueeze(
OnnxAdd(OnnxReduceSumSquare(X, axes=[1], op_version=op_version),
t_sigma_0, op_version=op_version),
output_names=output_names, axes=[1],
op_version=op_version)
raise RuntimeError("Unable to convert diag method for "
"class {}.".format(type(kernel)))
alpha=-2., beta=0., op_version=opv)
mp = np.sum(op.means_ ** 2, 1) * precisions
log_prob = OnnxAdd(mp, OnnxAdd(xmp, outer, op_version=opv),
op_version=opv)
else:
raise RuntimeError("Unknown op.covariance_type='{}'. Upgrade "
"to a mroe recent version of skearn-onnx "
"or raise an issue.".format(op.covariance_type))
# -.5 * (cst + log_prob) + log_det
cst = np.array([n_features * np.log(2 * np.pi)])
add = OnnxAdd(cst, log_prob, op_version=opv)
mul = OnnxMul(add, np.array([-0.5]), op_version=opv)
if isinstance(log_det, float):
log_det = np.array([log_det])
weighted_log_prob = OnnxAdd(OnnxAdd(mul, log_det, op_version=opv),
log_weights, op_version=opv)
# labels
labels = OnnxArgMax(weighted_log_prob, axis=1,
output_names=out[:1], op_version=opv)
# def _estimate_log_prob_resp():
# np.exp(log_resp)
# weighted_log_prob = self._estimate_weighted_log_prob(X)
# log_prob_norm = logsumexp(weighted_log_prob, axis=1)
# with np.errstate(under='ignore'):
# log_resp = weighted_log_prob - log_prob_norm[:, np.newaxis]
log_prob_norm = OnnxReduceLogSumExp(
weighted_log_prob, axes=[1], op_version=opv)
log_resp = OnnxSub(weighted_log_prob, log_prob_norm, op_version=opv)
# (n_components, n_features)
# precisions = precisions_chol ** 2
# log_prob = (np.sum((means ** 2 * precisions), 1) -
# 2. * np.dot(X, (means * precisions).T) +
# np.dot(X ** 2, precisions.T))
precisions = op.precisions_cholesky_ ** 2
mp = np.sum((op.means_ ** 2 * precisions), 1)
zeros = np.zeros((n_components, ))
xmp = OnnxGemm(X, (op.means_ * precisions).T, zeros,
alpha=-2., beta=0., op_version=opv)
term = OnnxGemm(OnnxMul(X, X, op_version=opv),
precisions.T, zeros, alpha=1., beta=0.,
op_version=opv)
log_prob = OnnxAdd(OnnxAdd(mp, xmp, op_version=opv),
term, op_version=opv)
elif op.covariance_type == 'spherical':
# shape(op.means_) = (n_components, n_features)
# shape(op.precisions_cholesky_) = (n_components, )
# precisions = precisions_chol ** 2
# log_prob = (np.sum(means ** 2, 1) * precisions -
# 2 * np.dot(X, means.T * precisions) +
# np.outer(row_norms(X, squared=True), precisions))
zeros = np.zeros((n_components, ))
precisions = op.precisions_cholesky_ ** 2
normX = OnnxReduceSumSquare(X, axes=[1], op_version=opv)
outer = OnnxGemm(normX, precisions[np.newaxis, :], zeros,
alpha=1., beta=1., op_version=opv)
def convert_kernel_diag(kernel, X, output_names=None, dtype=None,
optim=None, op_version=None):
if isinstance(kernel, Sum):
return OnnxAdd(
convert_kernel_diag(
kernel.k1, X, dtype=dtype, optim=optim, op_version=op_version),
convert_kernel_diag(
kernel.k2, X, dtype=dtype, optim=optim, op_version=op_version),
output_names=output_names)
if isinstance(kernel, Product):
return OnnxMul(
convert_kernel_diag(
kernel.k1, X, dtype=dtype, optim=optim, op_version=op_version),
convert_kernel_diag(
kernel.k2, X, dtype=dtype, optim=optim, op_version=op_version),
output_names=output_names, op_version=op_version)
if isinstance(kernel, ConstantKernel):
onnx_zeros = _zero_vector_of_size(X, keepdims=0, dtype=dtype,
# shape(op.precisions_cholesky_) = (n_components, )
# precisions = precisions_chol ** 2
# log_prob = (np.sum(means ** 2, 1) * precisions -
# 2 * np.dot(X, means.T * precisions) +
# np.outer(row_norms(X, squared=True), precisions))
zeros = np.zeros((n_components, ))
precisions = op.precisions_cholesky_ ** 2
normX = OnnxReduceSumSquare(X, axes=[1], op_version=opv)
outer = OnnxGemm(normX, precisions[np.newaxis, :], zeros,
alpha=1., beta=1., op_version=opv)
xmp = OnnxGemm(X, (op.means_.T * precisions), zeros,
alpha=-2., beta=0., op_version=opv)
mp = np.sum(op.means_ ** 2, 1) * precisions
log_prob = OnnxAdd(mp, OnnxAdd(xmp, outer, op_version=opv),
op_version=opv)
else:
raise RuntimeError("Unknown op.covariance_type='{}'. Upgrade "
"to a mroe recent version of skearn-onnx "
"or raise an issue.".format(op.covariance_type))
# -.5 * (cst + log_prob) + log_det
cst = np.array([n_features * np.log(2 * np.pi)])
add = OnnxAdd(cst, log_prob, op_version=opv)
mul = OnnxMul(add, np.array([-0.5]), op_version=opv)
if isinstance(log_det, float):
log_det = np.array([log_det])
weighted_log_prob = OnnxAdd(OnnxAdd(mul, log_det, op_version=opv),
log_weights, op_version=opv)
# labels