Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_list_order():
""" https://github.com/telegraphic/hickle/issues/26 """
d = [np.arange(n + 1) for n in range(20)]
hickle.dump(d, 'test.h5')
d_hkl = hickle.load('test.h5')
try:
for ii, xx in enumerate(d):
assert d[ii].shape == d_hkl[ii].shape
for ii, xx in enumerate(d):
assert np.allclose(d[ii], d_hkl[ii])
except AssertionError:
print(d[ii], d_hkl[ii])
raise
def test_astropy_angle():
for uu in ['radian', 'degree']:
a = Angle(1.02, unit=uu)
hkl.dump(a, "test_ap.h5")
b = hkl.load("test_ap.h5")
assert a == b
assert a.unit == b.unit
def test_astropy_time_array():
times = ['1999-01-01T00:00:00.123456789', '2010-01-01T00:00:00']
t1 = Time(times, format='isot', scale='utc')
hkl.dump(t1, "test_ap2.h5")
t2 = hkl.load("test_ap2.h5")
print(t1)
print(t2)
assert t1.value.shape == t2.value.shape
for ii in range(len(t1)):
assert t1.value[ii] == t2.value[ii]
assert t1.format == t2.format
assert t1.scale == t2.scale
times = [58264, 58265, 58266]
t1 = Time(times, format='mjd', scale='utc')
hkl.dump(t1, "test_ap2.h5")
t2 = hkl.load("test_ap2.h5")
print(t1)
print(t2)
def data_split(inputfile,reads_count):
data = hkl.load(inputfile)
reads_count= hkl.load(reads_count)
X = data['mat']
X_kspec = data['kmer']
reads_count = np.array(reads_count)
y = np.mean(reads_count, axis = 1)
y = np.log(y+1e-3)
rs = ShuffleSplit(len(y), n_iter=1,random_state = 1)
X_kspec = X_kspec.reshape((X_kspec.shape[0],1024,4))
X = np.concatenate((X,X_kspec), axis = 1)
X = X[:,np.newaxis]
X = X.transpose((0,1,3,2))
for train_idx, test_idx in rs:
X_train = X[train_idx,:]
y_train = y[train_idx]
X_test = X[test_idx,:]
y_test = y[test_idx]
def load_val_data():
return hickle.load(os.path.join('./data/val', 'val.features0.hkl'))
model_params = {"feat_dim": FEAT_DIM,
"word_embed": train_dataset.word_matrix,
"lstm_steps": SEQUENCE_LENGTH,
"architecture": FLAGS.architecture}
if task == 'FrameQA':
model_params["vocabulary_size"] = len(train_dataset.idx2word)
model_params["answer_size"] = len(train_dataset.idx2ans)
model_params.update(FLAGS.__flags)
if FLAGS.checkpoint_path:
checkpoint = FLAGS.checkpoint_path
params_path = os.path.join(os.path.dirname(checkpoint), '%s_%s_param.hkl' % (FLAGS.task.lower(), FLAGS.name.lower()))
log.info("Restored parameter set from {}".format(params_path))
model_params = hkl.load(open(params_path))
model_params["att_hidden_dim"] = FLAGS.att_hidden_dim
model_params["hidden_dim"] = FLAGS.hidden_dim
model = Model.from_dict(model_params)
model.print_params()
video = tf.placeholder(tf.float32, [FLAGS.batch_size] + list(train_dataset.get_video_feature_dimension()))
video_mask = tf.placeholder(tf.float32, [FLAGS.batch_size, SEQUENCE_LENGTH])
answer = tf.placeholder(tf.int32, [FLAGS.batch_size, 1])
if (task == 'Count') or (task == 'FrameQA'):
question = tf.placeholder(tf.int32, [FLAGS.batch_size, SEQUENCE_LENGTH])
question_mask = tf.placeholder(tf.int32, [FLAGS.batch_size, SEQUENCE_LENGTH])
else:
question = tf.placeholder(tf.int32, [FLAGS.batch_size, Model.MULTICHOICE_COUNT, SEQUENCE_LENGTH])
# lr: learning rate
cascade_forest_builder = CascadeForestBuilder(n_landmarks=68,n_forests=10,n_trees=500,
tree_depth=5,n_perturbations=20,n_test_split=20,n_pixels=400,kappa=.3,lr=.1)
# training model
model = cascade_forest_builder.build(ibugin, train_gt_shapes, train_boxes)
# save model
hickle.dump(model, "./model/ert_ibug_training.hkl")
# test model
ibug_exam = util.read_images("./ibug_test/image_060_1.*",normalise=True)
ibug_exam_shapes = util.get_gt_shapes(ibug_exam)
ibug_exam_boxes = util.get_bounding_boxes(ibug_exam, ibug_exam_shapes, face_detector)
ibug_exam = ibug_exam[0]
model = hickle.load("../model/ert_ibug_training.hkl")
init_shapes, fin_shapes = model.apply(ibug_exam,[ibug_exam_boxes[0]])
try:
ibug_exam.landmarks.pop('dlib_0')
except:
pass
ibug_exam_gt = deepcopy(ibug_exam)
ibug_exam_gt.view_landmarks()
ibug_exam.landmarks['PTS'].points = fin_shapes[0].points
ibug_exam_predict = deepcopy(ibug_exam)
ibug_exam_predict.view_landmarks(marker_face_colour='y',marker_edge_colour='y')
def classify(tensor):
predictions = F.softmax(f(Variable(tensor.unsqueeze(0), volatile=True), params))
probs, idx = predictions.data.view(-1).topk(k=5, dim=0, sorted=True)
return ['%.2f: %s' % (p, synset_words[i]) for p, i in zip(probs, idx)]
cap = cv2.VideoCapture(0)
assert cap.isOpened()
WINNAME = 'torch-OpenCV ImageNet classification demo'
cv2.namedWindow(winname=WINNAME, flags=cv2.WINDOW_AUTOSIZE)
status, frame = cap.read()
params = hkl.load('./nin-export.hkl')
params = {k: Variable(torch.from_numpy(v)) for k, v in params.iteritems()}
while True:
image = T.Scale(640)(frame)
predictions = classify(tr(frame))
for i, line in enumerate(predictions):
cv2.putText(img=image, text=line, org=(20, 20 + i * 25),
fontFace=cv2.FONT_HERSHEY_DUPLEX,
fontScale=0.5, color=(205, 0, 0), thickness=1)
cv2.imshow(winname=WINNAME, mat=image)
if cv2.waitKey(delay=30) != 255:
break
cap.read(image=frame)
if flag_para_load:
# load by self or the other process
# wait for the copying to finish
msg = recv_queue.get()
assert msg == 'copy_finished'
if count < len(minibatch_range):
ind_to_read = minibatch_range[count]
name_to_read = str(train_filenames[ind_to_read])
send_queue.put(name_to_read)
send_queue.put(get_rand3d())
else:
batch_img = hkl.load(str(train_filenames[minibatch_index])) - img_mean
param_rand = get_rand3d()
batch_img = crop_and_mirror(batch_img, param_rand, flag_batch=flag_batch)
shared_x.set_value(batch_img)
batch_label = train_labels[minibatch_index * batch_size:
(minibatch_index + 1) * batch_size]
shared_y.set_value(batch_label)
cost_ij = train_model()
return cost_ij
def __init__(self, data_filename, source_filename, num_timestamps):
super(KittiData, self).__init__()
self.data_filename = data_filename
self.source_filename = source_filename
self.num_timestamps = num_timestamps
self.data = hkl.load(data_filename)
self.source = hkl.load(source_filename)
self.start_index = []
cur_index = 0
# 将相同场景的视频帧合并在一起
while cur_index < len(self.source) - self.num_timestamps + 1:
if self.source[cur_index] == self.source[cur_index+self.num_timestamps-1]:
self.start_index.append(cur_index)
cur_index += self.num_timestamps
else:
cur_index += 1