Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_tensorproto_typemap():
'''
get the typemap for all the tensor proto data types
'''
datatypes = [value.name for value in onnx_proto._TENSORPROTO_DATATYPE.values]
typemap = dict((dt.lower(), getattr(onnx_proto.TensorProto, dt)) for dt in datatypes)
return typemap
def convert(context, sk_node, inputs):
string_vocabulary = []
int64_vocabulary = []
key_type = value_type = None
nb = NodeBuilder(context, 'DictVectorizer', op_domain='ai.onnx.ml')
for feature_name in sk_node.feature_names_:
if utils.is_string_type(feature_name):
string_vocabulary.append(feature_name)
key_type = onnx_proto.TensorProto.STRING
value_type = onnx_proto.TensorProto.FLOAT
elif utils.is_numeric_type(feature_name):
int64_vocabulary.append(feature_name)
key_type = onnx_proto.TensorProto.INT64
value_type = onnx_proto.TensorProto.FLOAT
else:
raise ValueError("Invalid or unsupported DictVectorizer type.")
if len(string_vocabulary) > 0:
nb.add_attribute('string_vocabulary', string_vocabulary)
if len(int64_vocabulary) > 0:
nb.add_attribute('int64_vocabulary', int64_vocabulary)
nb.extend_inputs(inputs)
nb.add_output(model_util.make_tensor_value_info(nb.name, value_type, [len(sk_node.feature_names_)]))
def convert(context, sk_node, inputs):
attr_pairs = _get_default_tree_classifier_attribute_pairs()
classes = sk_node.classes_
if utils.is_numeric_type(sk_node.classes_):
class_labels = utils.cast_list(int, classes)
attr_pairs['classlabels_int64s'] = class_labels
output_type = onnx_proto.TensorProto.INT64
else:
class_labels = utils.cast_list(str, classes)
attr_pairs['classlabels_strings'] = class_labels
output_type = onnx_proto.TensorProto.STRING
_add_tree_to_attribute_pairs(attr_pairs, True, sk_node.tree_, 0, 1., 0, True)
nb = NodeBuilder(context, "TreeEnsembleClassifier", op_domain='ai.onnx.ml')
for k, v in attr_pairs.items():
if isinstance(v, list) and len(v) == 0:
continue
nb.add_attribute(k, v)
nb.extend_inputs(inputs)
output_dim = [1]
gru_op_name = scope.get_unique_operator_name('GRU')
gru_attrs = {'name': gru_op_name}
gru_inputs = []
gru_outputs = []
# Resahpe CoreML variable into ONNX format for feeding it into ONNX GRU
gru_x_reshape_name = scope.get_unique_variable_name(gru_op_name + '_X_reshape')
apply_reshape(scope, operator.inputs[0].full_name, gru_x_reshape_name, container, desired_shape=[-1, 1, input_size])
gru_inputs.append(gru_x_reshape_name)
# Create weight matrices of GRU and add it into ONNX GRU's input list
matrices_w = np.concatenate([params.updateGateWeightMatrix.floatValue,
params.resetGateWeightMatrix.floatValue,
params.outputGateWeightMatrix.floatValue])
matrices_w_name = scope.get_unique_variable_name(gru_op_name + '_W')
container.add_initializer(matrices_w_name, onnx_proto.TensorProto.FLOAT,
[1, 3 * hidden_size, input_size], matrices_w)
gru_inputs.append(matrices_w_name)
# Create recursion matrices of GRU and add it into ONNX GRU's input list
matrices_r = np.concatenate([params.updateGateRecursionMatrix.floatValue,
params.resetGateRecursionMatrix.floatValue,
params.outputGateRecursionMatrix.floatValue])
matrices_r_name = scope.get_unique_variable_name(gru_op_name + '_R')
container.add_initializer(matrices_r_name, onnx_proto.TensorProto.FLOAT,
[1, 3 * hidden_size, hidden_size], matrices_r)
gru_inputs.append(matrices_r_name)
if params.hasBiasVectors:
# Create bias vectors of GRU and add them into ONNX GRU's input list
vectors_b = np.concatenate([params.updateGateBiasVector.floatValue,
params.resetGateBiasVector.floatValue,
def convert(context, sk_node, inputs):
nb = NodeBuilder(context, "LabelEncoder", op_domain='ai.onnx.ml')
nb.add_attribute('classes_strings', [str(c) for c in sk_node.classes_])
nb.extend_inputs(inputs)
try:
if inputs[0].type.tensor_type.elem_type == onnx_proto.TensorProto.STRING:
output_type = onnx_proto.TensorProto.INT64
nb.add_attribute('default_int64', -1)
elif inputs[0].type.tensor_type.elem_type == onnx_proto.TensorProto.INT64:
output_type = onnx_proto.TensorProto.STRING
nb.add_attribute('default_string', '__unknown__')
else:
raise ValueError()
except AttributeError as e:
raise ValueError('Invalid or missing input type for LabelEncoder.')
try:
output_dim = [d.dim_value for d in inputs[0].type.tensor_type.shape.dim]
except AttributeError as e:
raise ValueError('Invalid or missing input dimension for LabelEncoder.')
nb.add_output(model_util.make_tensor_value_info(nb.name, output_type, output_dim))
return nb.make_node()
lstm_attrs = {'name': operator.full_name}
# Reshape Keras input format into ONNX input format
lstm_x_name = scope.get_unique_variable_name(operator.full_name + '_X')
apply_transpose(scope, operator.inputs[0].full_name, lstm_x_name, container, perm=[1, 0, 2])
lstm_input_names.append(lstm_x_name)
# Allocate input transformation matrix in ONNX and add its name into LSTM input list
tensor_w_name = scope.get_unique_variable_name(operator.full_name + '_W')
container.add_initializer(tensor_w_name, onnx_proto.TensorProto.FLOAT,
[2, 4 * hidden_size, input_size], np.concatenate([W_x, W_x_back]).flatten())
lstm_input_names.append(tensor_w_name)
# Allocate hidden transformation matrix in ONNX and add its name into LSTM input list
tensor_r_name = scope.get_unique_variable_name(operator.full_name + '_R')
container.add_initializer(tensor_r_name, onnx_proto.TensorProto.FLOAT,
[2, 4 * hidden_size, hidden_size], np.concatenate([W_h, W_h_back]).flatten())
lstm_input_names.append(tensor_r_name)
# Add bias vectors at different places in the original LSTM if needed
if b is not None:
tensor_b_name = scope.get_unique_variable_name(operator.full_name + '_B')
container.add_initializer(tensor_b_name, onnx_proto.TensorProto.FLOAT, [2, 8 * hidden_size],
np.concatenate([b, b_back]).flatten())
lstm_input_names.append(tensor_b_name)
else:
lstm_input_names.append('') # the name of a non-existing optional variable is an empty string
# sequence_lens, this input is not used when converting Keras Bidirectional.
lstm_input_names.append('')
# need the zero initializer to correct some engine shape inference bug.
# Load the embedding matrix. Its shape is outputChannels-by-inputDim.
weights = np.array(params.weights.floatValue).reshape(params.outputChannels, params.inputDim)
weights_name = scope.get_unique_variable_name(gather_op_name + '_W') # 1st input of Gather
container.add_initializer(weights_name, onnx_proto.TensorProto.FLOAT,
[params.inputDim, params.outputChannels], weights.transpose().flatten().tolist())
# To support the bias term in an embedding (if exists), we need to create one extra node.
if params.hasBias:
# Put the embedded result onto a temporal tensor
gather_output_name = scope.get_unique_variable_name(gather_op_name + '_output')
container.add_node('Gather', [weights_name, casted_input_name], gather_output_name, **gather_attrs)
# Load the bias vector into an initializer
bias_name = scope.get_unique_variable_name(gather_op_name + '_bias')
bias_axis, bias_shape = deduce_broadcast_axis_and_shape(container.target_opset, [params.outputChannels])
container.add_initializer(bias_name, onnx_proto.TensorProto.FLOAT,
bias_shape, params.bias.floatValue)
# Create an addition operator to add bias (shape: [C]) into Gather's tensor (shape: [N, C])
apply_add(scope, [gather_output_name, bias_name], operator.outputs[0].full_name, container, axis=1, broadcast=1)
else:
# This case has no bias, so we just output the result produced by the embedding node.
container.add_node('Gather', [weights_name, casted_input_name], operator.output_full_names, **gather_attrs)
# Add a zero initializer to initial hidden state so that this variable becomes optional
container.add_initializer(operator.inputs[1].full_name, onnx_proto.TensorProto.FLOAT,
operator.inputs[1].type.shape,
np.zeros(shape=operator.inputs[1].type.shape).flatten())
else:
lstm_inputs.append('')
# Provide ONNX LSTM the initial cell state when necessary
if len(operator.inputs) > 2:
lstm_c_init_reshape_name = scope.get_unique_variable_name(lstm_op_name + '_c_init_reshape')
apply_reshape(scope, operator.inputs[2].full_name, lstm_c_init_reshape_name, container,
desired_shape=[1, 1, hidden_size])
lstm_inputs.append(lstm_c_init_reshape_name)
# Add a zero initializer to initial cell state so that this variable becomes optional
container.add_initializer(operator.inputs[2].full_name, onnx_proto.TensorProto.FLOAT,
operator.inputs[2].type.shape,
np.zeros(shape=operator.inputs[2].type.shape).flatten())
else:
lstm_inputs.append('')
# Add peephole vector when presenting
if lstm_params.hasPeepholeVectors:
vectors_p = np.concatenate([lstm_weights.inputGatePeepholeVector.floatValue,
lstm_weights.outputGatePeepholeVector.floatValue,
lstm_weights.forgetGatePeepholeVector.floatValue])
vectors_p_name = scope.get_unique_variable_name(lstm_op_name + '_P')
container.add_initializer(vectors_p_name, onnx_proto.TensorProto.FLOAT,
[1, 3 * hidden_size], vectors_p)
lstm_inputs.append(vectors_p_name)
else:
lstm_inputs.append('')
def convert_sparkml_one_hot_encoder(scope, operator, container):
op = operator.raw_operator
C = operator.inputs[0].type.shape[1]
# encoded_slot_sizes[i] is the number of output coordinates associated with the ith categorical feature
# Variable names produced by one-hot encoders. Each of them is the encoding result of a categorical feature.
final_variable_names = []
final_variable_lengths = []
for i in range(0, len(op.categorySizes)):
catSize = op.categorySizes[i]
cats = range(0, catSize)
# Put a feature index we want to encode to a tensor
index_variable_name = scope.get_unique_variable_name('target_index')
container.add_initializer(index_variable_name, onnx_proto.TensorProto.INT64, [1], [i])
# Extract the categorical feature from the original input tensor
extracted_feature_name = scope.get_unique_variable_name('extracted_feature_at_' + str(i))
extractor_type = 'ArrayFeatureExtractor'
extractor_attrs = {'name': scope.get_unique_operator_name(extractor_type)}
container.add_node(extractor_type, [operator.inputs[0].full_name, index_variable_name],
extracted_feature_name, op_domain='ai.onnx.ml', **extractor_attrs)
# Encode the extracted categorical feature as a one-hot vector
encoder_type = 'OneHotEncoder'
encoder_attrs = {'name': scope.get_unique_operator_name(encoder_type), 'cats_int64s': cats}
encoded_feature_name = scope.get_unique_variable_name('encoded_feature_at_' + str(i))
container.add_node(encoder_type, extracted_feature_name, encoded_feature_name, op_domain='ai.onnx.ml',
**encoder_attrs)
# Collect features produce by one-hot encoders