Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_roi_align_sym():
dtype = np.float32
N, C, H, W = 2, 3, 4, 4
data = np.arange(N * C * H * W).astype(dtype).reshape((N, C, H, W))
rois = np.array([[0, 1, 1, 3, 3], [1, 2, 2, 3, 3]], dtype=dtype)
data_sym = mx.sym.Variable('data')
rois_sym = mx.sym.Variable('rois')
output_sym = mobula.op.ROIAlign(data=data_sym, rois=rois_sym, pooled_size=(
2, 2), spatial_scale=1.0, sampling_ratio=1)
output_sym = mx.sym.MakeLoss(output_sym)
exe = output_sym.simple_bind(
ctx=mx.context.current_context(), data=data.shape, rois=rois.shape)
exe.forward(data=data, rois=rois)
res = exe.outputs[0].asnumpy()
exe.backward()
mx.nd.waitall()
def _lstm_unroll_base(num_lstm_layer, num_hidden):
""" Returns symbol for LSTM model up to loss/softmax"""
param_cells = []
last_states = []
for i in range(num_lstm_layer):
param_cells.append(LSTMParam(i2h_weight=mx.sym.Variable("l%d_i2h_weight" % i),
i2h_bias=mx.sym.Variable("l%d_i2h_bias" % i),
h2h_weight=mx.sym.Variable("l%d_h2h_weight" % i),
h2h_bias=mx.sym.Variable("l%d_h2h_bias" % i)))
state = LSTMState(c=mx.sym.Variable("l%d_init_c" % i),
h=mx.sym.Variable("l%d_init_h" % i))
last_states.append(state)
assert len(last_states) == num_lstm_layer
hp = Hyperparams()
net = eval(hp.network+'.get_sym')()
channel,height,width = hp.img_shape
seq_len=int( net.infer_shape(data=(1,channel,height,width))[1][0][-1] )
print("seq_len:",seq_len)
wordvec=mx.sym.split(data=net, axis=3, num_outputs=seq_len, squeeze_axis=1)
hidden_all = []
for seqidx in range(seq_len):
hidden = wordvec[seqidx]
'''
Generate output symbol for univariate numeric loss
:param label_field_name:
:return: mxnet symbols for predictions and loss
'''
# generate prediction symbol
pred = mx.sym.FullyConnected(
data=latents,
num_hidden=1,
name="label_{}".format(label_field_name))
target = mx.sym.Variable(label_field_name)
# squared loss
loss = mx.sym.sum((pred - target) ** 2.0)
return pred, loss
def gru_unroll(net, num_gru_layer, seq_len, num_hidden_gru_list, dropout=0., is_batchnorm=False, prefix="",
direction="forward", is_bucketing=False):
if num_gru_layer > 0:
param_cells = []
last_states = []
for i in range(num_gru_layer):
param_cells.append(GRUParam(gates_i2h_weight=mx.sym.Variable(prefix + "l%d_i2h_gates_weight" % i),
gates_i2h_bias=mx.sym.Variable(prefix + "l%d_i2h_gates_bias" % i),
gates_h2h_weight=mx.sym.Variable(prefix + "l%d_h2h_gates_weight" % i),
gates_h2h_bias=mx.sym.Variable(prefix + "l%d_h2h_gates_bias" % i),
trans_i2h_weight=mx.sym.Variable(prefix + "l%d_i2h_trans_weight" % i),
trans_i2h_bias=mx.sym.Variable(prefix + "l%d_i2h_trans_bias" % i),
trans_h2h_weight=mx.sym.Variable(prefix + "l%d_h2h_trans_weight" % i),
trans_h2h_bias=mx.sym.Variable(prefix + "l%d_h2h_trans_bias" % i)))
state = GRUState(h=mx.sym.Variable(prefix + "l%d_init_h" % i))
last_states.append(state)
assert (len(last_states) == num_gru_layer)
# declare batchnorm param(gamma,beta) in timestep wise
if is_batchnorm:
batchnorm_gamma = []
batchnorm_beta = []
if is_bucketing:
for l in range(num_gru_layer):
batchnorm_gamma.append(mx.sym.Variable(prefix + "l%d_i2h_gamma" % l))
batchnorm_beta.append(mx.sym.Variable(prefix + "l%d_i2h_beta" % l))
else:
for seqidx in range(seq_len):
batchnorm_gamma.append(mx.sym.Variable(prefix + "t%d_i2h_gamma" % seqidx))
batchnorm_beta.append(mx.sym.Variable(prefix + "t%d_i2h_beta" % seqidx))
def get_cur_test_symbol(self, cfg):
# config alias for convenient
num_classes = cfg.dataset.NUM_CLASSES
num_reg_classes = (2 if cfg.CLASS_AGNOSTIC else num_classes)
num_anchors = cfg.network.NUM_ANCHORS
data_cur = mx.sym.Variable(name="data")
data_key = mx.sym.Variable(name="data_key")
conv_feat = mx.sym.Variable(name="feat_key")
# warp features
flow, scale_map = self.get_flownet(data_cur, data_key)
flow_grid = mx.sym.GridGenerator(data=flow, transform_type='warp', name='flow_grid')
conv_feat = mx.sym.BilinearSampler(data=conv_feat, grid=flow_grid, name='warping_feat')
# L branch
fc6_bias = mx.symbol.Variable('fc6_bias', lr_mult=2.0)
fc6_weight = mx.symbol.Variable('fc6_weight', lr_mult=1.0)
fc6 = mx.symbol.Convolution(
data=conv_feat, kernel=(1, 1), pad=(0, 0), num_filter=1024, name="fc6", bias=fc6_bias, weight=fc6_weight,
workspace=self.workspace)
relu_fc6 = mx.sym.Activation(data=fc6, act_type='relu', name='relu_fc6')
cls_weight = mx.sym.Variable("cls_weight")
cls_bias = mx.sym.Variable("cls_bias")
param_cells = []
last_states = []
for i in range(num_lstm_layer):
param_cells.append(LSTMParam(i2h_weight = mx.sym.Variable("l%d_i2h_weight" % i),
i2h_bias = mx.sym.Variable("l%d_i2h_bias" % i),
h2h_weight = mx.sym.Variable("l%d_h2h_weight" % i),
h2h_bias = mx.sym.Variable("l%d_h2h_bias" % i),
ph2h_weight = mx.sym.Variable("l%d_ph2h_weight" % i),
c2i_bias = mx.sym.Variable("l%d_c2i_bias" % i, shape=(1,num_hidden)),
c2f_bias = mx.sym.Variable("l%d_c2f_bias" % i, shape=(1,num_hidden)),
c2o_bias = mx.sym.Variable("l%d_c2o_bias" % i, shape=(1, num_hidden))
))
state = LSTMState(c=mx.sym.Variable("l%d_init_c" % i),
h=mx.sym.Variable("l%d_init_h" % i))
last_states.append(state)
assert(len(last_states) == num_lstm_layer)
data = mx.sym.Variable('data')
label = mx.sym.Variable('softmax_label')
dataSlice = mx.sym.SliceChannel(data=data, num_outputs=seq_len, squeeze_axis=1)
hidden_all = []
for seqidx in range(seq_len):
hidden = dataSlice[seqidx]
# stack LSTM
for i in range(num_lstm_layer):
if i == 0:
num_reg_classes = (2 if cfg.CLASS_AGNOSTIC else num_classes)
# input init
if is_train:
data = mx.symbol.Variable(name="data")
rois = mx.symbol.Variable(name='rois')
label = mx.symbol.Variable(name='label')
bbox_target = mx.symbol.Variable(name='bbox_target')
bbox_weight = mx.symbol.Variable(name='bbox_weight')
# reshape input
rois = mx.symbol.Reshape(data=rois, shape=(-1, 5), name='rois_reshape')
label = mx.symbol.Reshape(data=label, shape=(-1,), name='label_reshape')
bbox_target = mx.symbol.Reshape(data=bbox_target, shape=(-1, 4 * num_reg_classes), name='bbox_target_reshape')
bbox_weight = mx.symbol.Reshape(data=bbox_weight, shape=(-1, 4 * num_reg_classes), name='bbox_weight_reshape')
else:
data = mx.sym.Variable(name="data")
rois = mx.symbol.Variable(name='rois')
# reshape input
rois = mx.symbol.Reshape(data=rois, shape=(-1, 5), name='rois_reshape')
# shared convolutional layers
#data = mx.sym.Cast(data=data, dtype=np.float16)
conv_feat = self.get_resnet_v1_conv4(data)
# res5
relu1 = self.get_resnet_v1_conv5(conv_feat)
conv_new_1 = mx.sym.Convolution(data=relu1, kernel=(1, 1), num_filter=256, name="conv_new_1")
conv_new_1_relu = mx.sym.Activation(data=conv_new_1, act_type='relu', name='conv_new_1_relu')
#conv_new_1_relu = mx.sym.Cast(data=conv_new_1_relu, dtype=np.float32)
offset_t = mx.contrib.sym.DeformablePSROIPooling(name='offset_t', data=conv_new_1_relu, rois=rois, group_size=1, pooled_size=7,
sample_per_part=4, no_trans=True, part_size=7, output_dim=256, spatial_scale=0.0625)
offset = mx.sym.FullyConnected(name='offset', data=offset_t, num_hidden=7 * 7 * 2, lr_mult=0.01)
def __init__(self,
config: ConvolutionalEncoderConfig,
prefix: str = C.CNN_ENCODER_PREFIX) -> None:
super().__init__(config.dtype)
self.config = config
# initialize the weights of the linear transformation required for the residual connections
self.i2h_weight = mx.sym.Variable('%si2h_weight' % prefix)
# initialize the layers of blocks containing a convolution and a GLU, since
# every layer is shared over all encode calls
self.layers = [convolution.ConvolutionBlock(
config.cnn_config,
pad_type='centered',
prefix="%s%d_" % (prefix, i)) for i in range(config.num_layers)]
def get_symbol():
num_layers = config.num_layers
num_init_features, growth_rate, block_config = densenet_spec[num_layers]
net = DenseNet(num_init_features, growth_rate, block_config, dropout = config.densenet_dropout)
data = mx.sym.Variable(name='data')
data = data-127.5
data = data*0.0078125
body = net(data)
fc1 = symbol_utils.get_fc1(body, config.emb_size, config.net_output)
return fc1
Dataset type, only cifar10 and imagenet supports
workspace : int
Workspace used in convolution operator
"""
version_se = kwargs.get('version_se', 1)
version_input = kwargs.get('version_input', 1)
assert version_input>=0
version_output = kwargs.get('version_output', 'E')
fc_type = version_output
version_unit = kwargs.get('version_unit', 3)
act_type = kwargs.get('version_act', 'prelu')
memonger = kwargs.get('memonger', False)
print(version_se, version_input, version_output, version_unit, act_type, memonger)
num_unit = len(units)
assert(num_unit == num_stages)
data = mx.sym.Variable(name='data')
if version_input==0:
#data = mx.sym.BatchNorm(data=data, fix_gamma=True, eps=2e-5, momentum=bn_mom, name='bn_data')
data = mx.sym.identity(data=data, name='id')
data = data-127.5
data = data*0.0078125
body = Conv(data=data, num_filter=filter_list[0], kernel=(7, 7), stride=(2,2), pad=(3, 3),
no_bias=True, name="conv0", workspace=workspace)
body = mx.sym.BatchNorm(data=body, fix_gamma=False, eps=2e-5, momentum=bn_mom, name='bn0')
body = Act(data=body, act_type=act_type, name='relu0')
#body = mx.sym.Pooling(data=body, kernel=(3, 3), stride=(2,2), pad=(1,1), pool_type='max')
elif version_input==2:
data = mx.sym.BatchNorm(data=data, fix_gamma=True, eps=2e-5, momentum=bn_mom, name='bn_data')
body = Conv(data=data, num_filter=filter_list[0], kernel=(3,3), stride=(1,1), pad=(1,1),
no_bias=True, name="conv0", workspace=workspace)
body = mx.sym.BatchNorm(data=body, fix_gamma=False, eps=2e-5, momentum=bn_mom, name='bn0')
body = Act(data=body, act_type=act_type, name='relu0')