Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--profile",action="store_true")
parser.add_argument("--unittest",action="store_true")
parser.add_argument("--epochs",type=int,default=10)
parser.add_argument("--devtype",choices=["cpu","gpu"],default="cpu")
args = parser.parse_args()
cgt.update_config(default_device=cgt.core.Device(devtype=args.devtype), backend="native")
batchsize = 64
Xshape = (batchsize, 3, 32, 32)
X = cgt.tensor4("X", fixed_shape = Xshape)
y = cgt.vector("y", fixed_shape = (batchsize,), dtype='i4')
conv1 = nn.SpatialConvolution(3, 32, kernelshape=(5,5), pad=(2,2),
weight_init=nn.IIDGaussian(std=1e-4))(X)
relu1 = nn.rectify(conv1)
pool1 = nn.max_pool_2d(relu1, kernelshape=(3,3), stride=(2,2))
conv2 = nn.SpatialConvolution(32, 32, kernelshape=(5,5), pad=(2,2),
weight_init=nn.IIDGaussian(std=0.01))(pool1)
relu2 = nn.rectify(conv2)
pool2 = nn.max_pool_2d(relu2, kernelshape=(3,3), stride=(2,2))
conv3 = nn.SpatialConvolution(32, 64, kernelshape=(5,5), pad=(2,2),
weight_init=nn.IIDGaussian(std=0.01))(pool2)
def _subtensor4(x, slis, y):
for (ax,sli) in enumerate(slis):
if _is1dintarray(sli):
if y is None:
return core.Result(core.GetFancySli(ax), [x, sli])
else:
return core.Result(core.IncFancySli(ax), [x, sli, y])
assert 0, "should be unreachable"
def pullback(self, inputs, output, goutput):
raise cgt.core.Todo
def shp_apply(self, inputs):
def add_transports(nodelist, node2dev, node2shape):
node2child = defaultdict(list)
for node in nodelist:
for par in node.parents:
node2child[par].append(node)
# XXX look at native compilation info, gpu deref mask
for node in nodelist:
dev = node2dev[node]
dev2copy = {}
for child in node2child[node]:
childdev = node2dev[child]
if not childdev == dev:
if childdev not in dev2copy:
nodecopy = core.Result(core.Transport(childdev), [node])
node2dev[nodecopy] = childdev
dev2copy[childdev] = nodecopy
node2shape[nodecopy] = node2shape[node]
replace_parents(child, node, dev2copy[childdev])
def conv2d(x_BKRC, f_LKrc, kernelshape, pad=(0,0), stride=(1,1)):
devtype = cgt.get_config()["default_device"].devtype
L,K,r,c = f_LKrc.shape
if devtype == "gpu":
b_1K11 = cgt.zeros((1,L,1,1), cgt.floatX)
return core.Result(cudnn_ops.CudnnConvForward(pad[0],pad[1],stride[0],stride[1]), [x_BKRC, f_LKrc, b_1K11])
else:
assert devtype == "cpu"
col_BmnZ = im2col(x_BKRC, kernelshape, pad, stride)
f_LZ = f_LKrc.reshape([L, K*r*c])
B,m,n,Z = col_BmnZ.shape
col_Bmn_Z = col_BmnZ.reshape([B*m*n, Z])
col_Bmn_L = core.Result(core.Mul22(False,True), [col_Bmn_Z, f_LZ])
return col_Bmn_L.reshape([B,m,n,L]).transpose([0,3,1,2])
def get_native_compile_info(self, input_types, devtype):
code = r"""
CGT_EXPORT_C void $function(conv_closure* cl, cgtArray** reads, cgtArray* write) {
max_pool_pullback<%(cdtype)s>(reads[0], reads[1], reads[2], reads[3], write);
}"""%dict(cdtype=core.np2c[input_types[0].dtype])
return core.NativeCompileInfo(code, closure_triples=info2closure(self.info), includes=["pooling.h"])
def _function_listout(inputs, outputs, dbg = None, updates=None, givens=None):
if isinstance(updates,dict): updates=updates.items()
if updates is None: updates = []
else: assert (isinstance(updates, list) and
all(isinstance(a,tuple) and len(a)==2
and isinstance(a[0], core.Node) and isinstance(a[1], core.Node)
for a in updates)), "updates should be a list of pairs (before, after)"
if givens is None: givens = []
else: assert all(before.is_data() for (before,_) in updates), "lhs of updates must be Data instances"
if dbg: raise core.Todo("debug functionality is broken")
outputs = [cgt.make_tuple(*x) if isinstance(x, tuple) else x for x in outputs]
interp = run_compilation_pipeline(inputs, outputs, updates, givens)
return interp
def sample(self, p, shape=None):
p = core.as_node(p)
shape = shape or cgt.shape(p)
return cgt.rand(*shape) <= p
def noop_byref(x):
return cgt.core.Result(SleepByRef(), [x])
def typ_apply(self, _inputs):
return core.TensorType(cgt.floatX, 4)