Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
params = operator.raw_operator.simpleRecurrent
input_size = params.inputVectorSize
hidden_size = params.outputVectorSize
X_name = operator.inputs[0].full_name
X_reshape_name = scope.get_unique_variable_name('X')
apply_reshape(scope, X_name, X_reshape_name, container, desired_shape=[-1, 1, input_size])
rnn_op_name = scope.get_unique_operator_name('RNN')
rnn_attrs = {'name': rnn_op_name}
rnn_inputs = [X_reshape_name]
# Load RNN's weight matrix and add it into RNN's input list
rnn_w_name = scope.get_unique_variable_name(rnn_op_name + '_W')
container.add_initializer(rnn_w_name, onnx_proto.TensorProto.FLOAT,
[1, hidden_size, input_size], params.weightMatrix.floatValue)
rnn_inputs.append(rnn_w_name)
# Load RNN's recursion matrix and add it into RNN's input list
rnn_r_name = scope.get_unique_variable_name(rnn_op_name + '_R')
container.add_initializer(rnn_r_name, onnx_proto.TensorProto.FLOAT,
[1, hidden_size, hidden_size], params.recursionMatrix.floatValue)
rnn_inputs.append(rnn_r_name)
if params.hasBiasVector:
# Load RNN's bias vector and add it into RNN's input list
rnn_b_name = scope.get_unique_variable_name(rnn_op_name + '_B')
rnn_b_content = np.concatenate([params.biasVector.floatValue, np.zeros(hidden_size)]).flatten()
container.add_initializer(rnn_b_name, onnx_proto.TensorProto.FLOAT, [1, 2 * hidden_size], rnn_b_content)
rnn_inputs.append(rnn_b_name)
else:
theta_np = numpy.array(op.theta.toArray())
transposed_theta = numpy.transpose(theta_np)
raw_predictions_tensor = scope.get_unique_variable_name('raw_predictions_tensor')
if model_type == 'bernoulli':
negTheta = numpy.log(1.0 - numpy.exp(theta_np))
thetaMinusLogTheta = numpy.transpose(theta_np - negTheta)
ones = numpy.ones((theta_np.shape[1], 1))
negThetaSum = numpy.matmul(negTheta, ones).flatten()
thetaMinusLogTheta_tensor = scope.get_unique_variable_name('thetaMinusLogTheta_tensor')
container.add_initializer(thetaMinusLogTheta_tensor, onnx_proto.TensorProto.FLOAT,
list(thetaMinusLogTheta.shape), thetaMinusLogTheta.flatten().tolist())
negThetaSum_tensor = scope.get_unique_variable_name('negThetaSum_tensor')
container.add_initializer(negThetaSum_tensor, onnx_proto.TensorProto.FLOAT,
list(negThetaSum.shape), negThetaSum.flatten().tolist())
prior_tensor = scope.get_unique_variable_name('prior_tensor')
container.add_initializer(prior_tensor, onnx_proto.TensorProto.FLOAT, [len(op.pi)], op.pi.flatten().tolist())
probability1_output = scope.get_unique_variable_name('temp_probability')
container.add_node('MatMul', [operator.input_full_names[0], thetaMinusLogTheta_tensor], probability1_output,
op_domain='ai.onnx',
name=scope.get_unique_operator_name('MatMul'),
op_version=9)
probability2_output = scope.get_unique_variable_name('temp_probability')
container.add_node('Add', [probability1_output, prior_tensor], probability2_output,
op_domain='ai.onnx',
name=scope.get_unique_operator_name('Add'),
op_version=7)
container.add_node('Add', [probability2_output,negThetaSum_tensor], raw_predictions_tensor,
op_domain='ai.onnx',
name=scope.get_unique_operator_name('Add'),
op_version=7)
else:
probability1_output = scope.get_unique_variable_name('temp_probability')
"""
Loads an ONNX model to a ProtoBuf object.
:param file_path: ONNX file (full file name)
:return: ONNX model.
Example:
::
from onnxmltools.utils import load_model
onnx_model = load_model("SqueezeNet.onnx")
"""
if not path.exists(file_path):
raise FileNotFoundError("{0} was not found.".format(file_path))
model = onnx_proto.ModelProto()
with open(file_path, 'rb') as f:
model.ParseFromString(f.read())
return model
if model.WhichOneof('ClassLabels') == 'stringClassLabels':
labels = list(s.encode('utf-8') for s in model.stringClassLabels.vector)
label_type = onnx_proto.TensorProto.STRING
elif model.WhichOneof('ClassLabels') == 'int64ClassLabels':
labels = list(int(i) for i in model.int64ClassLabels.vector)
label_type = onnx_proto.TensorProto.INT64
else:
raise ValueError('Unknown label type found')
elif model_type == 'pipelineClassifier':
model = operator.raw_operator.pipelineClassifier
if model.WhichOneof('ClassLabels') == 'stringClassLabels':
labels = list(s.encode('utf-8') for s in model.pipelineClassifier.stringClassLabels.vector)
label_type = onnx_proto.TensorProto.STRING
elif model.WhichOneof('ClassLabels') == 'int64ClassLabels':
labels = list(int(i) for i in model.int64ClassLabels.vector)
label_type = onnx_proto.TensorProto.INT64
else:
raise ValueError('Unknown label type found')
else:
raise ValueError('Only neural network classifiers and pipeline classifiers are supported')
# Use a Constant operator to load and output all labels as a tensor
label_loader_name = scope.get_unique_operator_name('LabelLoader')
label_buffer_name = scope.get_unique_variable_name('ClassLabels')
label_loader_value = helper.make_tensor(label_buffer_name, label_type, [len(labels)], labels)
apply_constant(scope, [label_buffer_name], container,
operator_name=label_loader_name, value=label_loader_value)
# Extract most possible label index
label_id_extractor_name = scope.get_unique_operator_name('LabelIndexExtractor')
label_id_extractor_attrs = {'name': label_id_extractor_name}
label_id_extractor_attrs['axis'] = 1
class_labels = [str(i) for i in classes]
classifier_attrs['classlabels_strings'] = class_labels
elif all(isinstance(i, (numbers.Real, bool, np.bool_)) for i in classes):
class_labels = [int(i) for i in classes]
classifier_attrs['classlabels_ints'] = class_labels
else:
raise RuntimeError('Label vector must be a string or a integer tensor')
label_name = operator.outputs[0].full_name
probability_tensor_name = scope.get_unique_variable_name('probability_tensor')
if op.__class__.__name__ == 'LinearSVC' and op.classes_.shape[0] <= 2:
raw_scores_tensor_name = scope.get_unique_variable_name('raw_scores_tensor')
positive_class_index_name = scope.get_unique_variable_name('positive_class_index')
container.add_initializer(positive_class_index_name, onnx_proto.TensorProto.INT64,
[], [1])
container.add_node(classifier_type, operator.inputs[0].full_name,
[label_name, raw_scores_tensor_name],
op_domain='ai.onnx.ml', **classifier_attrs)
container.add_node('ArrayFeatureExtractor', [raw_scores_tensor_name, positive_class_index_name],
operator.outputs[1].full_name, name=scope.get_unique_operator_name('ArrayFeatureExtractor'),
op_domain='ai.onnx.ml')
else:
container.add_node(classifier_type, operator.inputs[0].full_name,
[label_name, probability_tensor_name],
op_domain='ai.onnx.ml', **classifier_attrs)
# Make sure the probability sum is 1 over all classes
if multi_class > 0 and op.__class__.__name__ != 'LinearSVC':
normalized_probability_tensor_name = scope.get_unique_variable_name(probability_tensor_name + '_normalized')
lstm_inputs = []
lstm_outputs = []
# Reshape input feature vector in CoreML format into ONNX format
lstm_x_reshape_name = scope.get_unique_variable_name(lstm_op_name + '_X_reshape')
apply_reshape(scope, operator.inputs[0].full_name, lstm_x_reshape_name, container,
desired_shape=[-1, 1, input_size])
lstm_inputs.append(lstm_x_reshape_name)
# Allocate LSTM's weight matrices and add them into ONNX LSTM's input list
matrices_w = np.concatenate([lstm_weights.inputGateWeightMatrix.floatValue,
lstm_weights.outputGateWeightMatrix.floatValue,
lstm_weights.forgetGateWeightMatrix.floatValue,
lstm_weights.blockInputWeightMatrix.floatValue])
matrices_w_name = scope.get_unique_variable_name(lstm_op_name + '_W')
container.add_initializer(matrices_w_name, onnx_proto.TensorProto.FLOAT,
[1, 4 * hidden_size, input_size], matrices_w)
lstm_inputs.append(matrices_w_name)
# Allocate LSTM's recursion weight matrices and add them into ONNX LSTM's input list
matrices_r = np.concatenate([lstm_weights.inputGateRecursionMatrix.floatValue,
lstm_weights.outputGateRecursionMatrix.floatValue,
lstm_weights.forgetGateRecursionMatrix.floatValue,
lstm_weights.blockInputRecursionMatrix.floatValue])
matrices_r_name = scope.get_unique_variable_name(lstm_op_name + '_R')
container.add_initializer(matrices_r_name, onnx_proto.TensorProto.FLOAT,
[1, 4 * hidden_size, hidden_size], matrices_r)
lstm_inputs.append(matrices_r_name)
# Handle bias vectors
vectors_b = np.zeros(shape=(8, hidden_size))
if lstm_params.hasBiasVectors:
attr_pairs = _get_default_tree_classifier_attribute_pairs()
classes = sk_node.classes_
if sk_node.n_classes_ == 2:
transform = 'LOGISTIC'
base_values = [sk_node.init_.prior]
else:
transform = 'SOFTMAX'
base_values = sk_node.init_.priors
attr_pairs['base_values'] = base_values
attr_pairs['post_transform'] = transform
if utils.is_numeric_type(classes):
class_labels = utils.cast_list(int, classes)
attr_pairs['classlabels_int64s'] = class_labels
output_type = onnx_proto.TensorProto.INT64
else:
class_labels = utils.cast_list(str, classes)
attr_pairs['classlabels_strings'] = class_labels
output_type = onnx_proto.TensorProto.STRING
tree_weight = sk_node.learning_rate
if sk_node.n_classes_ == 2:
for i in range(sk_node.n_estimators):
tree_id = i
tree = sk_node.estimators_[i][0].tree_
_add_tree_to_attribute_pairs( attr_pairs, True, tree, tree_id, tree_weight, 0, False)
else:
for i in range(sk_node.n_estimators):
for c in range(sk_node.n_classes_):
tree_id = i * sk_node.n_classes_ + c
tree = sk_node.estimators_[i][c].tree_
X_name = context.get_unique_name('X')
pre_nb1.add_output(X_name)
nb = NodeBuilder(context, 'LSTM')
builder_list.append(nb)
nb.add_input(X_name)
W = np.concatenate([lstm_weights[0].inputGateWeightMatrix.floatValue,
lstm_weights[0].outputGateWeightMatrix.floatValue,
lstm_weights[0].forgetGateWeightMatrix.floatValue,
lstm_weights[0].blockInputWeightMatrix.floatValue])
WB = np.concatenate([lstm_weights[1].inputGateWeightMatrix.floatValue,
lstm_weights[1].outputGateWeightMatrix.floatValue,
lstm_weights[1].forgetGateWeightMatrix.floatValue,
lstm_weights[1].blockInputWeightMatrix.floatValue])
tensor_w = model_util.make_tensor('W', onnx_proto.TensorProto.FLOAT,
[2, 4 * hidden_size, input_size],
np.concatenate([W, WB]))
nb.add_initializer(tensor_w)
R = np.concatenate([lstm_weights[0].inputGateRecursionMatrix.floatValue,
lstm_weights[0].outputGateRecursionMatrix.floatValue,
lstm_weights[0].forgetGateRecursionMatrix.floatValue,
lstm_weights[0].blockInputRecursionMatrix.floatValue])
RB = np.concatenate([lstm_weights[1].inputGateRecursionMatrix.floatValue,
lstm_weights[1].outputGateRecursionMatrix.floatValue,
lstm_weights[1].forgetGateRecursionMatrix.floatValue,
lstm_weights[1].blockInputRecursionMatrix.floatValue])
tensor_r = model_util.make_tensor('R', onnx_proto.TensorProto.FLOAT,
[2, 4 * hidden_size, hidden_size],
np.concatenate([R, RB]))
nb.add_initializer(tensor_r)
bias = [params.redBias, params.greenBias, params.blueBias]
elif color_space == 'Bgr8':
bias = [params.blueBias, params.greenBias, params.redBias]
else:
raise ValueError('Unknown color space for tensor {}'.format(operator.inputs[0].full_name))
if container.target_opset < 9:
attrs = {'name': operator.full_name, 'scale': params.channelScale}
attrs['bias'] = bias
container.add_node('ImageScaler', [operator.inputs[0].full_name], [operator.outputs[0].full_name], **attrs)
else:
# In comments below, assume input tensor is X, the scale scalar is a, the bias vector is b.
# Store the scalar, a, used to scale all elements in the input tensor.
aName = scope.get_unique_variable_name(operator.full_name + '_scale')
container.add_initializer(aName, onnx_proto.TensorProto.FLOAT, [1], [params.channelScale])
# Store the bias vector. It will be added into the input tensor.
bName = scope.get_unique_variable_name(operator.full_name + '_bias')
container.add_initializer(bName, onnx_proto.TensorProto.FLOAT, [len(bias), 1, 1], bias)
# Compute Z = a * X.
zName = scope.get_unique_variable_name(operator.full_name + '_scaled')
apply_mul(scope, [operator.input_full_names[0], aName], zName, container)
# Compute Y = Z + b, which is the final output.
apply_add(scope, [bName, zName], operator.output_full_names[0], container)