Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
angle_x=(-0.8, 0.8), angle_y=(-0.8, 0.8), angle_z=(-0.8, 0.8),
do_scale=True, scale=(0.9, 1.5), border_mode_data='constant',
border_cval_data=0,
order_data=3,
border_mode_seg='constant', border_cval_seg=0,
order_seg=0, random_crop=True, p_el_per_sample=0.2,
p_rot_per_sample=0.2, p_scale_per_sample=0.2))
if self.Config.DAUG_RESAMPLE:
tfs.append(SimulateLowResolutionTransform(zoom_range=(0.5, 1), p_per_sample=0.2))
if self.Config.DAUG_NOISE:
tfs.append(GaussianNoiseTransform(noise_variance=(0, 0.05), p_per_sample=0.2))
if self.Config.DAUG_MIRROR:
tfs.append(MirrorTransform())
if self.Config.DAUG_FLIP_PEAKS:
tfs.append(FlipVectorAxisTransform())
tfs.append(NumpyToTensor(keys=["data", "seg"], cast_to="float"))
# num_cached_per_queue 1 or 2 does not really make a difference
batch_gen = MultiThreadedAugmenter(batch_generator, Compose(tfs), num_processes=num_processes,
num_cached_per_queue=1, seeds=None, pin_memory=True)
return batch_gen # data: (batch_size, channels, x, y), seg: (batch_size, channels, x, y)
def create_data_gen_pipeline(cf, patient_data, do_aug=True, **kwargs):
"""
create mutli-threaded train/val/test batch generation and augmentation pipeline.
:param patient_data: dictionary containing one dictionary per patient in the train/test subset.
:param is_training: (optional) whether to perform data augmentation (training) or not (validation/testing)
:return: multithreaded_generator
"""
# create instance of batch generator as first element in pipeline.
data_gen = BatchGenerator(cf, patient_data, **kwargs)
my_transforms = []
if do_aug:
if cf.da_kwargs["mirror"]:
mirror_transform = Mirror(axes=cf.da_kwargs['mirror_axes'])
my_transforms.append(mirror_transform)
spatial_transform = SpatialTransform(patch_size=cf.patch_size[:cf.dim],
patch_center_dist_from_border=cf.da_kwargs['rand_crop_dist'],
do_elastic_deform=cf.da_kwargs['do_elastic_deform'],
alpha=cf.da_kwargs['alpha'], sigma=cf.da_kwargs['sigma'],
do_rotation=cf.da_kwargs['do_rotation'], angle_x=cf.da_kwargs['angle_x'],
angle_y=cf.da_kwargs['angle_y'], angle_z=cf.da_kwargs['angle_z'],
do_scale=cf.da_kwargs['do_scale'], scale=cf.da_kwargs['scale'],
random_crop=cf.da_kwargs['random_crop'])
my_transforms.append(spatial_transform)
else:
my_transforms.append(CenterCropTransform(crop_size=cf.patch_size[:cf.dim]))
my_transforms.append(ConvertSegToBoundingBoxCoordinates(cf.dim, cf.roi_items, False, cf.class_specific_seg))
def create_data_gen_pipeline(patient_data, cf, do_aug=True):
"""
create mutli-threaded train/val/test batch generation and augmentation pipeline.
:param patient_data: dictionary containing one dictionary per patient in the train/test subset.
:param is_training: (optional) whether to perform data augmentation (training) or not (validation/testing)
:return: multithreaded_generator
"""
# create instance of batch generator as first element in pipeline.
data_gen = BatchGenerator(patient_data, batch_size=cf.batch_size, cf=cf)
# add transformations to pipeline.
my_transforms = []
if do_aug:
mirror_transform = Mirror(axes=np.arange(2, cf.dim+2, 1))
my_transforms.append(mirror_transform)
spatial_transform = SpatialTransform(patch_size=cf.patch_size[:cf.dim],
patch_center_dist_from_border=cf.da_kwargs['rand_crop_dist'],
do_elastic_deform=cf.da_kwargs['do_elastic_deform'],
alpha=cf.da_kwargs['alpha'], sigma=cf.da_kwargs['sigma'],
do_rotation=cf.da_kwargs['do_rotation'], angle_x=cf.da_kwargs['angle_x'],
angle_y=cf.da_kwargs['angle_y'], angle_z=cf.da_kwargs['angle_z'],
do_scale=cf.da_kwargs['do_scale'], scale=cf.da_kwargs['scale'],
random_crop=cf.da_kwargs['random_crop'])
my_transforms.append(spatial_transform)
else:
my_transforms.append(CenterCropTransform(crop_size=cf.patch_size[:cf.dim]))
my_transforms.append(ConvertSegToBoundingBoxCoordinates(cf.dim, get_rois_from_seg_flag=False, class_specific_seg_flag=cf.class_specific_seg_flag))
all_transforms = Compose(my_transforms)
def create_data_gen_pipeline(cf, patient_data, do_aug=True, sample_pids_w_replace=True):
"""
create mutli-threaded train/val/test batch generation and augmentation pipeline.
:param patient_data: dictionary containing one dictionary per patient in the train/test subset
:param test_pids: (optional) list of test patient ids, calls the test generator.
:param do_aug: (optional) whether to perform data augmentation (training) or not (validation/testing)
:return: multithreaded_generator
"""
data_gen = BatchGenerator(cf, patient_data, sample_pids_w_replace=sample_pids_w_replace)
my_transforms = []
if do_aug:
if cf.da_kwargs["mirror"]:
mirror_transform = Mirror(axes=cf.da_kwargs['mirror_axes'])
my_transforms.append(mirror_transform)
spatial_transform = SpatialTransform(patch_size=cf.patch_size[:cf.dim],
patch_center_dist_from_border=cf.da_kwargs['rand_crop_dist'][:2],
do_elastic_deform=cf.da_kwargs['do_elastic_deform'],
alpha=cf.da_kwargs['alpha'], sigma=cf.da_kwargs['sigma'],
do_rotation=cf.da_kwargs['do_rotation'], angle_x=cf.da_kwargs['angle_x'],
angle_y=cf.da_kwargs['angle_y'], angle_z=cf.da_kwargs['angle_z'],
do_scale=cf.da_kwargs['do_scale'], scale=cf.da_kwargs['scale'],
random_crop=cf.da_kwargs['random_crop'],
border_mode_data=cf.da_kwargs['border_mode_data'])
my_transforms.append(spatial_transform)
gamma_transform = GammaTransform(gamma_range=cf.da_kwargs["gamma_range"], invert_image=False,
per_channel=False, retain_stats=False)
my_transforms.append(gamma_transform)
else:
def create_data_gen_pipeline(cf, patient_data, do_aug=True, sample_pids_w_replace=True):
"""
create mutli-threaded train/val/test batch generation and augmentation pipeline.
:param patient_data: dictionary containing one dictionary per patient in the train/test subset
:param test_pids: (optional) list of test patient ids, calls the test generator.
:param do_aug: (optional) whether to perform data augmentation (training) or not (validation/testing)
:return: multithreaded_generator
"""
data_gen = BatchGenerator(cf, patient_data, sample_pids_w_replace=sample_pids_w_replace)
my_transforms = []
if do_aug:
if cf.da_kwargs["mirror"]:
mirror_transform = Mirror(axes=cf.da_kwargs['mirror_axes'])
my_transforms.append(mirror_transform)
if cf.da_kwargs["gamma_transform"]:
gamma_transform = GammaTransform(gamma_range=cf.da_kwargs["gamma_range"], invert_image=False,
per_channel=False, retain_stats=True)
my_transforms.append(gamma_transform)
if cf.dim == 3:
# augmentations with desired effect on z-dimension
spatial_transform = SpatialTransform(patch_size=cf.patch_size,
patch_center_dist_from_border=cf.da_kwargs['rand_crop_dist'],
do_elastic_deform=False,
do_rotation=cf.da_kwargs['do_rotation'], angle_x=cf.da_kwargs['angle_x'],
angle_y=cf.da_kwargs['angle_y'], angle_z=cf.da_kwargs['angle_z'],
do_scale=cf.da_kwargs['do_scale'], scale=cf.da_kwargs['scale'],
random_crop=cf.da_kwargs['random_crop'],
border_mode_data=cf.da_kwargs['border_mode_data'])
my_transforms.append(spatial_transform)
def create_data_gen_pipeline(cf, patient_data, is_training=True):
""" create multi-threaded train/val/test batch generation and augmentation pipeline.
:param cf: configs object.
:param patient_data: dictionary containing one dictionary per patient in the train/test subset.
:param is_training: (optional) whether to perform data augmentation (training) or not (validation/testing)
:return: multithreaded_generator
"""
BG_name = "train" if is_training else "val"
data_gen = BatchGenerator_merged(cf, patient_data, name=BG_name) if cf.training_gts=='merged' else \
BatchGenerator_sa(cf, patient_data, name=BG_name)
# add transformations to pipeline.
my_transforms = []
if is_training:
if cf.da_kwargs["mirror"]:
mirror_transform = Mirror(axes=cf.da_kwargs['mirror_axes'])
my_transforms.append(mirror_transform)
spatial_transform = SpatialTransform(patch_size=cf.patch_size[:cf.dim],
patch_center_dist_from_border=cf.da_kwargs['rand_crop_dist'],
do_elastic_deform=cf.da_kwargs['do_elastic_deform'],
alpha=cf.da_kwargs['alpha'], sigma=cf.da_kwargs['sigma'],
do_rotation=cf.da_kwargs['do_rotation'], angle_x=cf.da_kwargs['angle_x'],
angle_y=cf.da_kwargs['angle_y'], angle_z=cf.da_kwargs['angle_z'],
do_scale=cf.da_kwargs['do_scale'], scale=cf.da_kwargs['scale'],
random_crop=cf.da_kwargs['random_crop'])
my_transforms.append(spatial_transform)
else:
my_transforms.append(CenterCropTransform(crop_size=cf.patch_size[:cf.dim]))
if cf.create_bounding_box_targets:
my_transforms.append(ConvertSegToBoundingBoxCoordinates(cf.dim, cf.roi_items, False, cf.class_specific_seg))