Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_modes(self):
store = LogWriter(self.dir, sync_cycle=1)
scalars = []
for i in range(10):
with store.mode("mode-%d" % i) as writer:
scalar = writer.scalar("add/scalar0")
scalars.append(scalar)
for scalar in scalars[:-1]:
for i in range(10):
scalar.add_record(i, float(i))
def setUp(self):
self.dir = "./tmp/storage_test"
self.writer = LogWriter(self.dir, sync_cycle=1).as_mode("train")
def test_with_syntax(self):
with self.writer.mode("train") as writer:
scalar = writer.scalar("model/scalar/average")
for i in range(10):
scalar.add_record(i, float(i))
self.writer.save()
self.reader = LogReader(self.dir)
with self.reader.mode("train") as reader:
scalar = reader.scalar("model/scalar/average")
self.assertEqual(scalar.caption(), "train")
def test_check_image(self):
'''
check whether the storage will keep image data consistent
'''
print('check image')
tag = "layer1/check/image1"
image_writer = self.writer.image(tag, 10)
image = Image.open("./dog.jpg")
shape = [image.size[1], image.size[0], 3]
origin_data = np.array(image.getdata()).flatten()
self.writer.save()
self.reader = LogReader(self.dir)
with self.reader.mode("train") as reader:
image_writer.start_sampling()
image_writer.add_sample(shape, list(origin_data))
image_writer.finish_sampling()
# read and check whether the original image will be displayed
image_reader = reader.image(tag)
image_record = image_reader.record(0, 0)
data = image_record.data()
shape = image_record.shape()
PIL_image_shape = (shape[0] * shape[1], shape[2])
data = np.array(data, dtype='uint8').reshape(PIL_image_shape)
print('origin', origin_data.flatten())
print('data', data.flatten())
image_writer = self.writer.image(tag, 10, 1)
num_passes = 10
num_samples = 100
shape = [10, 10, 3]
for pass_ in range(num_passes):
image_writer.start_sampling()
for ins in range(num_samples):
data = np.random.random(shape) * 256
data = np.ndarray.flatten(data)
image_writer.add_sample(shape, list(data))
image_writer.finish_sampling()
self.writer.save()
self.reader = LogReader(self.dir)
with self.reader.mode("train") as reader:
image_reader = reader.image(tag)
self.assertEqual(image_reader.caption(), tag)
self.assertEqual(image_reader.num_records(), num_passes)
image_record = image_reader.record(0, 1)
self.assertTrue(np.equal(image_record.shape(), shape).all())
data = image_record.data()
self.assertEqual(len(data), np.prod(shape))
image_tags = reader.tags("image")
self.assertTrue(image_tags)
self.assertEqual(len(image_tags), 1)
def test_scalar(self):
print('test write')
scalar = self.writer.scalar("model/scalar/min")
# scalar.set_caption("model/scalar/min")
for i in range(10):
scalar.add_record(i, float(i))
print('test read')
self.writer.save()
self.reader = LogReader(self.dir)
with self.reader.mode("train") as reader:
scalar = reader.scalar("model/scalar/min")
self.assertEqual(scalar.caption(), "train")
records = scalar.records()
ids = scalar.ids()
self.assertTrue(
np.equal(records, [float(i) for i in range(10)]).all())
self.assertTrue(np.equal(ids, [float(i) for i in range(10)]).all())
print('records', records)
print('ids', ids)
def _finetune_cls_task(task, data_reader, feed_list, config=None,
do_eval=False):
main_program = task.main_program()
startup_program = task.startup_program()
loss = task.variable("loss")
accuracy = task.variable("accuracy")
num_epoch = config.num_epoch
batch_size = config.batch_size
log_writer = LogWriter(
os.path.join(config.checkpoint_dir, "vdllog"), sync_cycle=1)
place, dev_count = hub.common.get_running_device_info(config)
with fluid.program_guard(main_program, startup_program):
exe = fluid.Executor(place=place)
data_feeder = fluid.DataFeeder(feed_list=feed_list, place=place)
# select strategy
if isinstance(config.strategy, hub.AdamWeightDecayStrategy):
scheduled_lr = config.strategy.execute(loss, main_program,
data_reader, config)
elif isinstance(config.strategy, hub.DefaultStrategy):
config.strategy.execute(loss)
#TODO: add more finetune strategy
_do_memory_optimization(task, config)
#!/user/bin/env python
import os
import random
import numpy as np
from PIL import Image
from visualdl import ROOT, LogWriter
from visualdl.server.log import logger as log
logdir = './scratch_log'
logw = LogWriter(logdir, sync_cycle=30)
# create scalars in mode train and test.
with logw.mode('train') as logger:
scalar0 = logger.scalar("scratch/scalar")
with logw.mode('test') as logger:
scalar1 = logger.scalar("scratch/scalar")
# add scalar records.
last_record0 = 0.
last_record1 = 0.
for step in range(1, 100):
last_record0 += 0.1 * (random.random() - 0.3)
last_record1 += 0.1 * (random.random() - 0.7)
scalar0.add_record(step, last_record0)
scalar1.add_record(step, last_record1)
# =======================================================================
from __future__ import print_function
import numpy as np
from visualdl import LogWriter
import paddle.v2 as paddle
import paddle.v2.fluid as fluid
import paddle.v2.fluid.framework as framework
from paddle.v2.fluid.initializer import NormalInitializer
from paddle.v2.fluid.param_attr import ParamAttr
# create VisualDL logger and directory
logdir = "./tmp"
logwriter = LogWriter(logdir, sync_cycle=10)
# create 'train' run
with logwriter.mode("train") as writer:
# create 'loss' scalar tag to keep track of loss function
loss_scalar = writer.scalar("loss")
with logwriter.mode("train") as writer:
acc_scalar = writer.scalar("acc")
num_samples = 4
with logwriter.mode("train") as writer:
conv_image = writer.image("conv_image", num_samples,
1) # show 4 samples for every 1 step
input_image = writer.image("input_image", num_samples, 1)
with logwriter.mode("train") as writer:
import mobilenet_v2
import paddle as paddle
import paddle.dataset.cifar as cifar
import paddle.fluid as fluid
from visualdl import LogWriter
# 创建记录器
log_writer = LogWriter(dir='log/', sync_cycle=10)
# 创建训练和测试记录数据工具
with log_writer.mode('train') as writer:
train_cost_writer = writer.scalar('cost')
train_acc_writer = writer.scalar('accuracy')
histogram = writer.histogram('histogram', num_buckets=50)
with log_writer.mode('test') as writer:
test_cost_writer = writer.scalar('cost')
test_acc_writer = writer.scalar('accuracy')
# 定义输入层
image = fluid.layers.data(name='image', shape=[3, 32, 32], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
# 获取分类器