Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
print('\nCompute image %s' % basename(self.cfg.params[self.cfg.lfp_path]))
# decode light field image
obj = LfpReader(self.cfg, self.sta)
ret = obj.main()
# use third of original image size (to prevent Travis from stopping due to memory error)
crop_h, crop_w = obj.lfp_img.shape[0] // 3, obj.lfp_img.shape[1] // 3
crop_h, crop_w = crop_h + crop_h % 2, crop_w + crop_w % 2 # use even number for correct Bayer arrangement
lfp_img = obj.lfp_img[crop_h:-crop_h, crop_w:-crop_w]
del obj
self.assertEqual(True, ret)
# create output data folder
mkdir_p(self.cfg.exp_path, self.cfg.params[self.cfg.opt_prnt])
if not self.cfg.cond_meta_file():
# automatic calibration data selection
obj = CaliFinder(self.cfg, self.sta)
ret = obj.main()
wht_img = obj.wht_bay[crop_h:-crop_h, crop_w:-crop_w] if obj.wht_bay is not None else obj.wht_bay
del obj
self.assertEqual(True, ret)
meta_cond = not (exists(self.cfg.params[self.cfg.cal_meta]) and self.cfg.params[self.cfg.cal_meta].lower().endswith('json'))
if meta_cond or self.cfg.params[self.cfg.opt_cali]:
# perform centroid calibration
obj = LfpCalibrator(wht_img, self.cfg, self.sta)
ret = obj.main()
self.cfg = obj.cfg
def __init__(self, *args, **kwargs):
super(PlenoptiCamTester, self).__init__(*args, **kwargs)
# refer to folder where data will be stored
self.fp = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data')
mkdir_p(self.fp) if not os.path.exists(self.fp) else None
def crop_imgs(folder, coords_lists):
img_tiles, files = list(), list()
exts = ('tif', 'tiff', 'png')
misc.mkdir_p(os.path.join(folder, 'auto-crop'))
files = [f for f in os.listdir(folder) if f.endswith(exts) and not f.startswith('.')]
files.sort()
for i, file in enumerate(files):
coords_nested = coords_lists[i]
for j, coords in enumerate(coords_nested):
if coords[0] != 0 and coords[1] != 0:
cy, cx, h, w = coords
img = misc.load_img_file(os.path.join(folder, file))
tile = img[cy-h//2:cy+h//2, cx-w//2:cx+w//2, ...]
img_tiles.append(tile)
fn, ext = os.path.splitext(file)
misc.save_img_file(tile, os.path.join(folder, 'auto-crop', fn+'_crop'+str(j)), 'png', tag=True)
return img_tiles, files
def save_refo_slice(self, a, refo_img, folder_path=None, file_type=None, string=None):
string = 'subpixel_' if self.cfg.params[self.cfg.opt_refi] and string is None else string
if folder_path is None:
folder_path = os.path.join(self.cfg.exp_path, 'refo_' + string + str(self._M) + 'px')
misc.mkdir_p(folder_path)
# write image file
misc.save_img_file(refo_img, os.path.join(folder_path, str(a)), file_type=file_type)
return True
def export_viewpoints(self, type='tiff'):
# print status
self.sta.status_msg('Write viewpoint images', self.cfg.params[self.cfg.opt_prnt])
self.sta.progress(None, self.cfg.params[self.cfg.opt_prnt])
ptc_leng = self.cfg.params[self.cfg.ptc_leng]
# create folder
folderpath = os.path.join(self.cfg.exp_path, 'viewpoints_'+str(ptc_leng)+'px')
misc.mkdir_p(folderpath)
# normalize image array to 16-bit unsigned integer
vp_img_arr = misc.Normalizer(self.vp_img_arr).uint16_norm()
# export viewpoint images as image files
for j in range(ptc_leng):
for i in range(ptc_leng):
misc.save_img_file(vp_img_arr[j, i], os.path.join(folderpath, str(j) + '_' + str(i)),
file_type=type, tag=self.cfg.params[self.cfg.opt_dbug])
# print status
percentage = (((j*self._M+i+1)/self._M**2)*100)
self.sta.progress(percentage, self.cfg.params[self.cfg.opt_prnt])
if self.sta.interrupt:
def download_data(self, url, fp=None):
""" download data form provided url string """
# path handling
self.fp = fp if fp is not None else self.fp
mkdir_p(self.fp) if not os.path.exists(self.fp) else None
# skip download if file exists
if os.path.exists(os.path.join(self.fp, os.path.basename(url))):
print('Download skipped as %s already exists' % os.path.basename(url))
return None
print('Downloading file %s to %s' % (os.path.basename(url), self.fp))
with open(os.path.join(self.fp, os.path.basename(url)), 'wb') as f:
# establish internet connection for data download
try:
r = requests.get(url, stream=True)
total_length = r.headers.get('content-length')
except requests.exceptions.ConnectionError:
raise (Exception('Check your internet connection, which is required for downloading test data.'))
def main(self):
# check interrupt status
if self.sta.interrupt:
return False
# print status
self.sta.status_msg('Scheimpflug extraction \n', self.cfg.params[self.cfg.opt_prnt])
self.sta.progress(None, self.cfg.params[self.cfg.opt_prnt])
# create output folder
misc.mkdir_p(self.fp)
# iterate through directions
for direction in list(c.PFLU_VALS):
self.cfg.params[self.cfg.opt_pflu] = direction
self.sta.status_msg(direction+':')
if self.refo_stack is not None:
self.scheimpflug_from_stack()
elif self.lfp_img is not None:
self.scheimpflug_from_scratch()
return True
self.cmd_wid.btn_list[3].config(text='Stop')
# read light field photo and calibration source paths
self.fetch_paths()
# remove output folder if option is set
misc.rmdir_p(self.cfg.exp_path) if self.cfg.params[self.cfg.dir_remo] else None
# remove calibrated light-field if calibration or devignetting option is set
if self.cfg.params[self.cfg.opt_cali] or self.cfg.params[self.cfg.opt_vign]:
misc.rm_file(join(self.cfg.exp_path, 'lfp_img_align.pkl'))
if self.cfg.params[self.cfg.opt_cali]:
misc.rm_file(self.cfg.params[self.cfg.cal_meta])
# create output data folder (prevent override)
misc.mkdir_p(self.cfg.exp_path, self.cfg.params[self.cfg.opt_prnt])
# put tasks in the job queue to be run
for task_info in (
(self.load_lfp, self.cfg.cond_load_limg, self.cfg.params[self.cfg.lfp_path]),
(self.auto_find, self.cfg.cond_auto_find),
(self.load_lfp, self.cfg.cond_load_wimg, self.cfg.params[self.cfg.cal_path], True),
(self.cal, self.cfg.cond_perf_cali),
(self.cfg.load_cal_data, self.cfg.cond_lfp_align),
(self.lfp_align, self.cfg.cond_lfp_align),
(self.lfp_extract, True),
(self.lfp_refo, self.cfg.params[self.cfg.opt_refo]),
(self.finish, True)
):
self.job_queue.put(task_info)
# start polling