Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(self) -> None:
"""
Run method of the module. Applies a Gaussian filter to the spatial dimensions of the
images.
Returns
-------
NoneType
None
"""
memory = self._m_config_port.get_attribute('MEMORY')
pixscale = self._m_config_port.get_attribute('PIXSCALE')
nimages = self.m_image_in_port.get_shape()[0]
frames = memory_frames(memory, nimages)
sigma = (self.m_fwhm/pixscale) / (2.*math.sqrt(2.*math.log(2.))) # [pix]
start_time = time.time()
for i, _ in enumerate(frames[:-1]):
progress(i, len(frames[:-1]), 'Applying Gaussian filter...', start_time)
images = self.m_image_in_port[frames[i]:frames[i+1], ]
im_filter = gaussian_filter(images, (0, sigma, sigma))
self.m_image_out_port.append(im_filter, data_dim=3)
history = f'fwhm [arcsec] = {self.m_fwhm}'
self.m_image_out_port.add_history('GaussianFilterModule', history)
self.m_image_out_port.copy_attributes(self.m_image_in_port)
def _stack_subsets(nimages: int,
im_shape: Tuple[int, ...],
parang: np.ndarray) -> Tuple[Tuple[int, ...], np.ndarray, np.ndarray]:
im_new = None
parang_new = None
if self.m_stacking is not None:
if self.m_max_rotation is not None:
frames = stack_angles(self.m_stacking, parang, self.m_max_rotation)
else:
frames = memory_frames(self.m_stacking, nimages)
nimages_new = np.size(frames)-1
if parang is None:
parang_new = None
else:
parang_new = np.zeros(nimages_new)
im_new = np.zeros((nimages_new, im_shape[1], im_shape[2]))
start_time = time.time()
for i in range(nimages_new):
progress(i, nimages_new, 'Stacking subsets of images...', start_time)
if parang is not None:
def _initialize(ndim: int,
npix: int) -> Tuple[int, np.ndarray, Optional[np.ndarray]]:
if ndim == 2:
nimages = 1
elif ndim == 3:
nimages = self.m_image_in_port.get_shape()[0]
if self.m_stack == 'median':
frames = np.array([0, nimages])
else:
frames = memory_frames(memory, nimages)
if self.m_stack == 'mean':
im_tot = np.zeros((npix, npix))
else:
im_tot = None
return nimages, frames, im_tot
def run(self) -> None:
"""
Run method of the module. Adds lines of zero-value pixels to increase the size of an image.
Returns
-------
NoneType
None
"""
self.m_image_out_port.del_all_attributes()
self.m_image_out_port.del_all_data()
memory = self._m_config_port.get_attribute('MEMORY')
nimages = self.m_image_in_port.get_shape()[0]
frames = memory_frames(memory, nimages)
shape_in = self.m_image_in_port.get_shape()
shape_out = (shape_in[-2]+int(self.m_lines[2])+int(self.m_lines[3]),
shape_in[-1]+int(self.m_lines[0])+int(self.m_lines[1]))
self.m_lines[1] = shape_out[1] - self.m_lines[1] # right side of image
self.m_lines[3] = shape_out[0] - self.m_lines[3] # top side of image
start_time = time.time()
for i in range(len(frames[:-1])):
progress(i, len(frames[:-1]), 'Running AddLinesModule...', start_time)
image_in = self.m_image_in_port[frames[i]:frames[i+1], ]
if self.m_subset_size is None:
if self.m_range is None:
frames = [0, self.m_data_port.get_shape()[0]]
else:
frames = [self.m_range[0], self.m_range[1]]
else:
if self.m_range is None:
nimages = self.m_data_port.get_shape()[0]
frames = memory_frames(self.m_subset_size, nimages)
else:
nimages = self.m_range[1] - self.m_range[0]
frames = memory_frames(self.m_subset_size, nimages)
frames = np.asarray(frames) + self.m_range[0]
for i, _ in enumerate(frames[:-1]):
data_select = self.m_data_port[frames[i]:frames[i+1], ]
if len(frames) == 2:
fits.writeto(out_name, data_select, header, overwrite=self.m_overwrite)
else:
filename = f'{out_name[:-5]}{i:03d}.fits'
fits.writeto(filename, data_select, header, overwrite=self.m_overwrite)
print(' [DONE]')
self.m_data_port.close_port()
-------
NoneType
None
"""
cpu = self._m_config_port.get_attribute('CPU')
# list all files ending with .fits.Z in the input location
files = []
for item in os.listdir(self.m_input_location):
if item.endswith('.fits.Z'):
files.append(os.path.join(self.m_input_location, item))
if files:
# subdivide the file indices by number of CPU
indices = memory_frames(cpu, len(files))
start_time = time.time()
for i, _ in enumerate(indices[:-1]):
progress(i, len(indices[:-1]), 'Uncompressing NEAR data...', start_time)
# select subset of compressed files
subset = files[indices[i]:indices[i+1]]
# create a list of threads to uncompress CPU number of files
# each file is processed by a different thread
threads = []
for filename in subset:
thread = threading.Thread(target=self._uncompress_file, args=(filename, ))
threads.append(thread)
# start the threads
"""
Run method of the module. Add the images from the two database tags on a frame-by-frame
basis.
Returns
-------
NoneType
None
"""
if self.m_image_in1_port.get_shape() != self.m_image_in2_port.get_shape():
raise ValueError('The shape of the two input tags has to be the same.')
memory = self._m_config_port.get_attribute('MEMORY')
nimages = self.m_image_in1_port.get_shape()[0]
frames = memory_frames(memory, nimages)
start_time = time.time()
for i, _ in enumerate(frames[:-1]):
progress(i, len(frames[:-1]), 'Adding images...', start_time)
images1 = self.m_image_in1_port[frames[i]:frames[i+1], ]
images2 = self.m_image_in2_port[frames[i]:frames[i+1], ]
self.m_image_out_port.append(self.m_scaling*(images1+images2), data_dim=3)
history = f'scaling = {self.m_scaling}'
self.m_image_out_port.add_history('AddImagesModule', history)
self.m_image_out_port.copy_attributes(self.m_image_in1_port)
self.m_image_out_port.close_port()
def run(self) -> None:
"""
Run method of the module. Creates a master dark with the same shape as the science
data and subtracts the dark frame from the science data.
Returns
-------
NoneType
None
"""
memory = self._m_config_port.get_attribute('MEMORY')
nimages = self.m_image_in_port.get_shape()[0]
frames = memory_frames(memory, nimages)
dark = self.m_dark_in_port.get_all()
master = _master_frame(data=np.mean(dark, axis=0),
im_shape=self.m_image_in_port.get_shape())
start_time = time.time()
for i in range(len(frames[:-1])):
progress(i, len(frames[:-1]), 'Subtracting the dark current...', start_time)
images = self.m_image_in_port[frames[i]:frames[i+1], ]
self.m_image_out_port.append(images - master, data_dim=3)
history = f'dark_in_tag = {self.m_dark_in_port.tag}'