Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _display_intermediate_steps(model_onnx, inputs):
import onnxruntime
print("[_display_intermediate_steps] BEGIN")
if isinstance(model_onnx, str):
import onnx
model_onnx = onnx.load(model_onnx)
for name, node in enumerate_model_initializers(model_onnx, add_node=True):
print("INIT: {} - {}".format(name, _guess_type(node)))
for out, node in enumerate_model_node_outputs(model_onnx, add_node=True):
print('-')
print("OUTPUT: {} from {}".format(out, node.name))
step = select_model_inputs_outputs(model_onnx, out)
try:
step_sess = onnxruntime.InferenceSession(step.SerializeToString())
except Exception as e:
raise RuntimeError("Unable to load ONNX model with onnxruntime. "
"Last added node is:\n{}".format(node)) from e
for o in step_sess.get_inputs():
print("IN :", o)
for o in step_sess.get_outputs():
stats_op=stats_op,
)
def forward(self, x):
x = 2.0 * x - 1.0
x = x.view(x.shape[0], -1)
for mod in self.features:
x = mod(x)
out = self.fc(x)
return out
export_onnx_path = "test_output_lfc.onnx"
with torch.no_grad():
lfc = LFC(weight_bit_width=1, act_bit_width=1, in_bit_width=1)
bo.export_finn_onnx(lfc, (1, 1, 28, 28), export_onnx_path)
model = onnx.load(export_onnx_path)
# TODO the following way of testing is highly sensitive to small changes
# in PyTorch ONNX export: the order, names, count... of nodes could
# easily change between different versions, and break this test.
assert len(model.graph.input) == 21
assert len(model.graph.node) == 25
assert len(model.graph.output) == 1
assert model.graph.output[0].type.tensor_type.shape.dim[1].dim_value == 10
act_node = model.graph.node[8]
assert act_node.op_type == "QuantizedHardTanh"
matmul_node = model.graph.node[9]
assert matmul_node.op_type == "MatMul"
assert act_node.output[0] == matmul_node.input[1]
inits = [x.name for x in model.graph.initializer]
qnt_annotations = {
a.tensor_name: a.quant_parameter_tensor_names[0].value
for a in model.graph.quantization_annotation
try:
_compare_expected(cmp_exp,
cmp_out,
sess,
onx,
decimal=decimal,
verbose=verbose,
classes=classes,
**options)
except ExpectedAssertionError as expe:
raise expe
except Exception as e:
if verbose:
import onnx
model = onnx.load(onx)
smodel = "\nJSON ONNX\n" + str(model)
else:
smodel = ""
raise OnnxRuntimeAssertionError(
"Model '{0}' has discrepencies.\n{1}: {2}{3}".format(
onx, type(e), e, smodel))
return output0, lambda_onnx
def onnx2ir(model, output_folder):
# get graph from ONNX model
if isinstance(model, str):
onnx_model = onnx.load(model)
elif isinstance(model, onnx.ModelProto):
onnx_model = model
else:
raise TypeError("Model must be file path to .onnx file or onnx loaded model")
graph = onnx_graph_to_ir_graph(onnx_model.graph)
graph.toFile(output_folder)
def convert_matmul_model(input_model, output_model, only_for_scan=False, share_input_quantization=False, preset_str='asymm8_param0_input1', qcfg_json=None, export_qcfg_json=None):
preset_qcfgs = {'asymm8_param0_input1' : {'W' : dict(QuantizeConfig(signed=1, reserved_bits=0, type_bits=8)),
'X' : dict(QuantizeConfig(signed=0, reserved_bits=1, type_bits=8)),
'Symmetric' : 0},
'symm16_param3_input3' : {'W' : dict(QuantizeConfig(signed=1, reserved_bits=3, type_bits=16)),
'X' : dict(QuantizeConfig(signed=1, reserved_bits=3, type_bits=16)),
'Symmetric' : 1}}
default_qcfg = preset_qcfgs[preset_str]
in_mp = onnx.load(input_model)
qcfg_dict = {}
if qcfg_json and not export_qcfg_json:
with open(qcfg_json, 'r') as f:
qcfg_dict = json.load(f)
out_mp = onnx.ModelProto()
out_mp.CopyFrom(in_mp)
out_mp.ir_version = 5 # update ir version to avoid requirement of initializer in graph input
ensure_opset(out_mp, 10) # bump up to ONNX opset 10, which is required for MatMulInteger
ensure_opset(out_mp, 1, 'com.microsoft') # add MS domain for MatMulInteger16
out_mp.graph.ClearField('node')
nf = NodeFactory(out_mp.graph)
converted_weights = {} # remember MatMul weights that have been converted, in case of sharing
quantized_inputs = {} if share_input_quantization else None # remember quantized inputs that might be able to share between MatMuls
for in_n in in_mp.graph.node:
from onnx.checker import ValidationError
from onnx.checker import check_model
from onnx.version_converter import convert_version
from .onnx_utils import DEFAULT_OP_DOMAIN
from .onnx_utils import graph_ops, graph_weights
from .onnx_utils import inferred_model_value_info
from .onnx_utils import polish_model
from .writer import Program, Writer
logger = logging.getLogger('convert')
# prepare onnx model
logger.info('loading model: %s ...', onnx_model_filename)
onnx_model = onnx.load(onnx_model_filename)
try:
logger.info('checking model ...')
check_model(onnx_model)
if onnx_opset_version is None: # WORKAROUND: RuntimeError: No Adapter For OP
logger.warning(
'opset conversion skipped for onnx_opset_pedantic is OFF')
logger.info('assumed opset version: %d', DEFAULT_ONNX_OPSET_VERSION)
else:
logger.info('using opset version: %d', onnx_opset_version)
onnx_model = convert_version(onnx_model, onnx_opset_version)
except ValidationError as e:
if onnx_opset_pedantic:
raise e
else:
logger.warning('due to onnx_opset_pedantic is OFF')
def load_onnx(filename):
'''
Load a onnx file and return a Graph
@params
filename is a string containing a file name
@return
Loaded in-memory Graph
'''
graph = core.PyGraph()
model = onnx.load(filename)
tensors = dict()
for t in model.graph.input:
dims = list()
for d in t.type.tensor_type.shape.dim:
dims.append(d.dim_value)
weight_data = None
for weight in model.graph.initializer:
if (weight.name == t.name):
weight_data = numpy_helper.to_array(weight)
# We classify an input to be a pure input if we cannot find its weights
if weight_data is None:
tensors[t.name] = graph.new_input(dims=tuple(dims))
else:
tensors[t.name] = graph.new_weight(dims=tuple(dims), data=weight_data)
# Add initializers not in the inputs
version = rt.__version__
if version != '0.4.0':
print("onnxruntime==0.4.0 is required")
return
except:
print(
"onnxruntime is not installed, use \"pip install onnxruntime==0.4.0\"."
)
return
parser = arg_parser()
args = parser.parse_args()
save_dir = args.save_dir
model_dir = os.path.join(save_dir, 'onnx_model_infer.onnx')
model = onnx.load(model_dir)
sess = rt.InferenceSession(model_dir)
inputs_dict = {}
for ipt in sess.get_inputs():
data_dir = os.path.join(save_dir, ipt.name + '.npy')
inputs_dict[ipt.name] = np.load(data_dir, allow_pickle=True)
res = sess.run(None, input_feed=inputs_dict)
for idx, value_info in enumerate(model.graph.output):
np.save(os.path.join(save_dir, value_info.name), res[idx])
def parse(self):
"""
parse onnx
:return:
"""
if self.txt_path is not None:
self._read_file()
else:
self.weights_data = None
self.bias_data = None
onnx_model = onnx.load(self.model_path)
onnx_graph = onnx_model.graph
[nodes, weights, outputs, output_node] = self._parse_onnx_node(onnx_graph, {})
print ('onnx_node')
for node in nodes.values():
print(node['name'], node['type'], node['input'], node['output'])
print ('-------------------------------')
self._cal_shape(nodes, weights)
print('parse onnx graph')
med_mid_graph = self._parse_onnx_graph(nodes, weights)
#delete Unsqueeze Constant Squeeze op
print ('delete extra constant OP')
med_graph = self._delete_ConstantOP(med_mid_graph)
print ('med_graph')
for name in med_graph.keys():
node = med_graph[name]
def rewrite_onnx_file(model_filename, out_filename, new_input_types):
xmodel = onnx.load(model_filename)
xmodel = rewrite_onnx_model(xmodel, new_input_types)
onnx.save(xmodel, out_filename)
return xmodel