Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def convert_truncated_svd(scope, operator, container):
# Create alias for the scikit-learn truncated SVD model we
# are going to convert
svd = operator.raw_operator
# Transpose [K, C] matrix to [C, K], where C/K is the
# input/transformed feature dimension
transform_matrix = svd.components_.transpose()
transform_matrix_name = scope.get_unique_variable_name('transform_matrix')
# Put the transformation into an ONNX tensor
container.add_initializer(
transform_matrix_name, onnx_proto.TensorProto.FLOAT,
transform_matrix.shape, transform_matrix.flatten())
input_name = operator.inputs[0].full_name
if isinstance(operator.inputs[0].type, Int64TensorType):
cast_output_name = scope.get_unique_variable_name('cast_output')
apply_cast(scope, input_name, cast_output_name, container,
to=onnx_proto.TensorProto.FLOAT)
input_name = cast_output_name
if operator.type == 'SklearnTruncatedSVD':
# Create the major operator, a matrix multiplication.
container.add_node(
'MatMul', [input_name, transform_matrix_name],
operator.outputs[0].full_name, name=operator.full_name)
else: # PCA
def _guess_type_proto(data_type, dims):
# This could be moved to onnxconverter_common.
if data_type == onnx_proto.TensorProto.FLOAT:
return FloatTensorType(dims)
elif data_type == onnx_proto.TensorProto.DOUBLE:
return DoubleTensorType(dims)
elif data_type == onnx_proto.TensorProto.STRING:
return StringTensorType(dims)
elif data_type == onnx_proto.TensorProto.INT64:
return Int64TensorType(dims)
elif data_type == onnx_proto.TensorProto.INT32:
return Int32TensorType(dims)
elif data_type == onnx_proto.TensorProto.BOOL:
return BooleanTensorType(dims)
else:
raise NotImplementedError(
"Unsupported data_type '{}'. You may raise an issue "
"at https://github.com/onnx/sklearn-onnx/issues."
"".format(data_type))
array_feature_extractor_result_name = scope.get_unique_variable_name(
'array_feature_extractor_result')
container.add_node(
'ArrayFeatureExtractor', [classes_name, argmax_output_name],
array_feature_extractor_result_name, op_domain='ai.onnx.ml',
name=scope.get_unique_operator_name('ArrayFeatureExtractor'))
output_shape = (-1,)
if class_type == onnx_proto.TensorProto.INT32:
cast2_result_name = scope.get_unique_variable_name('cast2_result')
reshaped_result_name = scope.get_unique_variable_name(
'reshaped_result')
apply_cast(scope, array_feature_extractor_result_name,
cast2_result_name, container,
to=onnx_proto.TensorProto.FLOAT)
apply_reshape(scope, cast2_result_name, reshaped_result_name,
container, desired_shape=output_shape)
apply_cast(scope, reshaped_result_name, output_full_name, container,
to=onnx_proto.TensorProto.INT64)
else: # string labels
apply_reshape(scope, array_feature_extractor_result_name,
output_full_name, container, desired_shape=output_shape)
dtype = container.dtype
attrs = {'name': scope.get_unique_operator_name(op_type)}
attrs['coefficients'] = op.coef_.astype(dtype).ravel()
attrs['intercepts'] = (op.intercept_.astype(dtype)
if isinstance(op.intercept_, collections.Iterable)
else np.array([op.intercept_], dtype=dtype))
if len(op.coef_.shape) == 2:
attrs['targets'] = op.coef_.shape[0]
input_name = operator.input_full_names
if type(operator.inputs[0].type) == Int64TensorType:
cast_input_name = scope.get_unique_variable_name('cast_input')
apply_cast(scope, operator.input_full_names, cast_input_name,
container,
to=(onnx_proto.TensorProto.FLOAT
if dtype == np.float32
else onnx_proto.TensorProto.DOUBLE))
input_name = cast_input_name
container.add_node(op_type, input_name,
operator.output_full_names, op_domain='ai.onnx.ml',
**attrs)
apply_concat(scope, proba_list,
merged_proba_name, container, axis=0)
if has_proba:
container.add_node('ReduceMean', merged_proba_name,
final_proba_name,
name=scope.get_unique_operator_name('ReduceMean'),
axes=[0], keepdims=0)
else:
n_estimators_name = scope.get_unique_variable_name('n_estimators')
class_labels_name = scope.get_unique_variable_name('class_labels')
equal_result_name = scope.get_unique_variable_name('equal_result')
cast_output_name = scope.get_unique_variable_name('cast_output')
reduced_proba_name = scope.get_unique_variable_name('reduced_proba')
container.add_initializer(
n_estimators_name, onnx_proto.TensorProto.FLOAT, [],
[len(model.estimators_)])
container.add_initializer(
class_labels_name, onnx_proto.TensorProto.INT64,
[1, 1, len(model.estimators_[0].classes_)],
model.estimators_[0].classes_)
container.add_node('Equal', [class_labels_name, merged_proba_name],
equal_result_name,
name=scope.get_unique_operator_name('Equal'))
apply_cast(scope, equal_result_name, cast_output_name,
container, to=onnx_proto.TensorProto.FLOAT)
container.add_node('ReduceSum', cast_output_name,
reduced_proba_name,
name=scope.get_unique_operator_name('ReduceSum'),
axes=[0], keepdims=0)
apply_div(scope, [reduced_proba_name, n_estimators_name],
def _decision_function(scope, operator, container, model):
"""Predict for linear model.
score = X * coefficient + intercept
"""
coef_name = scope.get_unique_variable_name('coef')
intercept_name = scope.get_unique_variable_name('intercept')
matmul_result_name = scope.get_unique_variable_name(
'matmul_result')
score_name = scope.get_unique_variable_name('score')
coef = model.coef_.T
container.add_initializer(coef_name, onnx_proto.TensorProto.FLOAT,
coef.shape, coef.ravel())
container.add_initializer(intercept_name, onnx_proto.TensorProto.FLOAT,
model.intercept_.shape, model.intercept_)
input_name = operator.inputs[0].full_name
if type(operator.inputs[0].type) == Int64TensorType:
cast_input_name = scope.get_unique_variable_name('cast_input')
apply_cast(scope, operator.input_full_names, cast_input_name,
container, to=onnx_proto.TensorProto.FLOAT)
input_name = cast_input_name
container.add_node(
'MatMul', [input_name, coef_name],
matmul_result_name,
name=scope.get_unique_operator_name('MatMul'))
container.add_initializer(comb_name, onnx_proto.TensorProto.INT64,
[len(comb)], list(comb))
container.add_node(
'ArrayFeatureExtractor',
[operator.inputs[0].full_name, comb_name], col_name,
name=scope.get_unique_operator_name('ArrayFeatureExtractor'),
op_domain='ai.onnx.ml')
reduce_prod_input = col_name
if (operator.inputs[0].type._get_element_onnx_type()
== onnx_proto.TensorProto.INT64):
float_col_name = scope.get_unique_variable_name('col')
apply_cast(scope, col_name, float_col_name, container,
to=onnx_proto.TensorProto.FLOAT)
reduce_prod_input = float_col_name
container.add_node(
'ReduceProd', reduce_prod_input, prod_name,
axes=[1], name=scope.get_unique_operator_name('ReduceProd'))
transformed_columns[i] = prod_name
last_feat = prod_name
if unit_name is not None:
shape_name = scope.get_unique_variable_name('shape')
container.add_node('Shape', last_feat, shape_name)
container.add_node('ConstantOfShape', shape_name, unit_name,
value=make_tensor(
'ONE', TensorProto.FLOAT, [1], [1.]),
op_version=9)
matmul_result_name = scope.get_unique_variable_name(
'matmul_result')
score_name = scope.get_unique_variable_name('score')
coef = model.coef_.T
container.add_initializer(coef_name, onnx_proto.TensorProto.FLOAT,
coef.shape, coef.ravel())
container.add_initializer(intercept_name, onnx_proto.TensorProto.FLOAT,
model.intercept_.shape, model.intercept_)
input_name = operator.inputs[0].full_name
if type(operator.inputs[0].type) == Int64TensorType:
cast_input_name = scope.get_unique_variable_name('cast_input')
apply_cast(scope, operator.input_full_names, cast_input_name,
container, to=onnx_proto.TensorProto.FLOAT)
input_name = cast_input_name
container.add_node(
'MatMul', [input_name, coef_name],
matmul_result_name,
name=scope.get_unique_operator_name('MatMul'))
apply_add(scope, [matmul_result_name, intercept_name],
score_name, container, broadcast=0)
return score_name
svd = operator.raw_operator
# Transpose [K, C] matrix to [C, K], where C/K is the
# input/transformed feature dimension
transform_matrix = svd.components_.transpose()
transform_matrix_name = scope.get_unique_variable_name('transform_matrix')
# Put the transformation into an ONNX tensor
container.add_initializer(
transform_matrix_name, onnx_proto.TensorProto.FLOAT,
transform_matrix.shape, transform_matrix.flatten())
input_name = operator.inputs[0].full_name
if isinstance(operator.inputs[0].type, Int64TensorType):
cast_output_name = scope.get_unique_variable_name('cast_output')
apply_cast(scope, input_name, cast_output_name, container,
to=onnx_proto.TensorProto.FLOAT)
input_name = cast_output_name
if operator.type == 'SklearnTruncatedSVD':
# Create the major operator, a matrix multiplication.
container.add_node(
'MatMul', [input_name, transform_matrix_name],
operator.outputs[0].full_name, name=operator.full_name)
else: # PCA
if svd.mean_ is not None:
mean_name = scope.get_unique_variable_name('mean')
sub_result_name = scope.get_unique_variable_name('sub_result')
container.add_initializer(mean_name, onnx_proto.TensorProto.FLOAT,
svd.mean_.shape, svd.mean_)
# Subtract mean from input tensor
def _ty_astype(cst):
dtype = cst.dtype
if dtype == np.float32:
ty = onnx_proto.TensorProto.FLOAT
astype = np.float64
elif dtype == np.float64:
ty = onnx_proto.TensorProto.DOUBLE
astype = np.float64
elif dtype == np.int64:
ty = onnx_proto.TensorProto.INT64
astype = np.int64
elif dtype == np.int32:
ty = onnx_proto.TensorProto.INT32
astype = np.int64
elif dtype == np.bool:
ty = onnx_proto.TensorProto.BOOL
astype = np.bool
else:
st = str(dtype).lower()
if st.startswith('u') or st.startswith("