Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
align the PSF patterns.
Returns
-------
NoneType
None
"""
wvl_factor = self.m_line_wvl/self.m_cnt_wvl
width_factor = self.m_line_width/self.m_cnt_width
nimages = self.m_image_in_port.get_shape()[0]
start_time = time.time()
for i in range(nimages):
progress(i, nimages, 'Preparing images for SDI...', start_time)
image = self.m_image_in_port[i, ]
im_scale = width_factor * scale_image(image, wvl_factor, wvl_factor)
if i == 0:
npix_del = im_scale.shape[-1] - image.shape[-1]
if npix_del % 2 == 0:
npix_del_a = int(npix_del/2)
npix_del_b = int(npix_del/2)
else:
npix_del_a = int((npix_del-1)/2)
npix_del_b = int((npix_del+1)/2)
memory = self._m_config_port.get_attribute('MEMORY')
image_in_port = []
im_shape = []
for i, item in enumerate(self.m_image_in_tags):
image_in_port.append(self.add_input_port(item))
im_shape.append(image_in_port[i].get_shape()[-2:])
if len(set(im_shape)) > 1:
raise ValueError('The size of the images should be the same for all datasets.')
count = 0
start_time = time.time()
for i, item in enumerate(self.m_image_in_tags):
progress(i, len(self.m_image_in_tags), 'Combining datasets...', start_time)
nimages = image_in_port[i].get_shape()[0]
frames = memory_frames(memory, nimages)
for j, _ in enumerate(frames[:-1]):
im_tmp = image_in_port[i][frames[j]:frames[j+1], ]
self.m_image_out_port.append(im_tmp)
if self.m_index_init:
index = np.arange(frames[j], frames[j+1], 1) + count
if i == 0 and j == 0:
self.m_image_out_port.add_attribute('INDEX', index, static=False)
else:
for ind in index:
self.m_image_out_port.append_attribute_data('INDEX', ind)
NoneType
None
"""
memory = self._m_config_port.get_attribute('MEMORY')
pixscale = self._m_config_port.get_attribute('PIXSCALE')
nimages = self.m_image_in_port.get_shape()[0]
frames = memory_frames(memory, nimages)
sigma = (self.m_fwhm/pixscale) / (2.*math.sqrt(2.*math.log(2.))) # [pix]
start_time = time.time()
for i, _ in enumerate(frames[:-1]):
progress(i, len(frames[:-1]), 'Applying Gaussian filter...', start_time)
images = self.m_image_in_port[frames[i]:frames[i+1], ]
im_filter = gaussian_filter(images, (0, sigma, sigma))
self.m_image_out_port.append(im_filter, data_dim=3)
history = f'fwhm [arcsec] = {self.m_fwhm}'
self.m_image_out_port.add_history('GaussianFilterModule', history)
self.m_image_out_port.copy_attributes(self.m_image_in_port)
self.m_image_out_port.close_port()
None
"""
memory = self._m_config_port.get_attribute('MEMORY')
nimages = self.m_image_in_port.get_shape()[0]
frames = memory_frames(memory, nimages)
dark = self.m_dark_in_port.get_all()
master = _master_frame(data=np.mean(dark, axis=0),
im_shape=self.m_image_in_port.get_shape())
start_time = time.time()
for i in range(len(frames[:-1])):
progress(i, len(frames[:-1]), 'Subtracting the dark current...', start_time)
images = self.m_image_in_port[frames[i]:frames[i+1], ]
self.m_image_out_port.append(images - master, data_dim=3)
history = f'dark_in_tag = {self.m_dark_in_port.tag}'
self.m_image_out_port.add_history('DarkCalibrationModule', history)
self.m_image_out_port.copy_attributes(self.m_image_in_port)
self.m_image_out_port.close_port()
def run(self) -> None:
"""
Run method of the module. Create list of time stamps, get sky and science images, and
subtract the sky images from the science images.
Returns
-------
NoneType
None
"""
self._create_time_stamp_list()
start_time = time.time()
for i, time_entry in enumerate(self.m_time_stamps):
progress(i, len(self.m_time_stamps), 'Subtracting background...', start_time)
if time_entry.m_im_type == 'SKY':
continue
sky = self.calc_sky_frame(i)
science = self.m_science_in_port[time_entry.m_index, ]
self.m_image_out_port.append(science - sky[None, ], data_dim=3)
history = f'mode = {self.m_mode}'
self.m_image_out_port.copy_attributes(self.m_science_in_port)
self.m_image_out_port.add_history('NoddingBackgroundModule', history)
self.m_image_out_port.close_port()
# Convert size parameter from arcseconds to pixels
pixscale = self.m_image_in_port.get_attribute('PIXSCALE')
print(f'New image size (arcsec) = {self.m_size}')
self.m_size = int(math.ceil(self.m_size / pixscale))
print(f'New image size (pixels) = {self.m_size}')
if self.m_center is not None:
print(f'New image center (x, y) = {self.m_center}')
# Crop images chunk by chunk
start_time = time.time()
for i in range(len(frames[:-1])):
# Update progress bar
progress(i, len(frames[:-1]), 'Cropping images...', start_time)
# Select and crop images in the current chunk
images = self.m_image_in_port[frames[i]:frames[i+1], ]
images = crop_image(images, self.m_center, self.m_size, copy=False)
# Write cropped images to output port
self.m_image_out_port.append(images, data_dim=3)
# Save history and copy attributes
history = f'image size (pix) = {self.m_size}'
self.m_image_out_port.add_history('CropImagesModule', history)
self.m_image_out_port.copy_attributes(self.m_image_in_port)
self.m_image_out_port.close_port()
-------
NoneType
None
"""
if self.m_image_in1_port.get_shape() != self.m_image_in2_port.get_shape():
raise ValueError('The shape of the two input tags has to be the same.')
memory = self._m_config_port.get_attribute('MEMORY')
nimages = self.m_image_in1_port.get_shape()[0]
frames = memory_frames(memory, nimages)
start_time = time.time()
for i, _ in enumerate(frames[:-1]):
progress(i, len(frames[:-1]), 'Adding images...', start_time)
images1 = self.m_image_in1_port[frames[i]:frames[i+1], ]
images2 = self.m_image_in2_port[frames[i]:frames[i+1], ]
self.m_image_out_port.append(self.m_scaling*(images1+images2), data_dim=3)
history = f'scaling = {self.m_scaling}'
self.m_image_out_port.add_history('AddImagesModule', history)
self.m_image_out_port.copy_attributes(self.m_image_in1_port)
self.m_image_out_port.close_port()
y_center[w] = x_center[w]*(y_pos[1]-y_pos[3])/(x_pos[1]-float(x_pos[3])) + \
(y_pos[1]-x_pos[1]*(y_pos[1]-y_pos[3])/(x_pos[1]-float(x_pos[3])))
# Adjust science images
nimages = self.m_image_in_port.get_shape()[-3]
npix = self.m_image_in_port.get_shape()[-2]
nwavelengths = len(wavelength)
start_time = time.time()
for i in range(nimages):
im_storage = []
for j in range(nwavelengths):
im_index = i*nwavelengths + j
progress(im_index, nimages*nwavelengths, 'Centering the images...', start_time)
if ndim == 3:
image = self.m_image_in_port[i, ]
elif ndim == 4:
image = self.m_image_in_port[j, i, ]
shift_yx = np.array([(float(im_shape[-2])-1.)/2. - y_center[j],
(float(im_shape[-1])-1.)/2. - x_center[j]])
if self.m_dither:
index = np.digitize(i, nframes, right=False) - 1
shift_yx[0] -= dither_y[index]
shift_yx[1] -= dither_x[index]
if npix % 2 == 0 and self.m_size is not None:
self.m_threshold,
self.m_aperture,
self.m_residuals,
self.m_snr_inject,
pos)))
pool.close()
start_time = time.time()
# wait for all processes to finish
while mp.active_children():
# number of finished processes
nfinished = sum([i.ready() for i in async_results])
progress(nfinished, len(positions), 'Calculating detection limits...', start_time)
# check if new processes have finished every 5 seconds
time.sleep(5)
if nfinished != len(positions):
sys.stdout.write('\r ')
sys.stdout.write('\rCalculating detection limits... [DONE]\n')
sys.stdout.flush()
# get the results for every async_result object
for item in async_results:
result.append(item.get())
pool.terminate()
os.remove(tmp_im_str)
raise ValueError('Input and output port should have a different tag.')
non_static = self.m_image_in_port.get_all_non_static_attributes()
nframes = self.m_image_in_port.get_attribute('NFRAMES')
if 'PARANG' in non_static:
parang = self.m_image_in_port.get_attribute('PARANG')
else:
parang = None
current = 0
parang_new = []
start_time = time.time()
for i, frames in enumerate(nframes):
progress(i, len(nframes), 'Stacking images per FITS cube...', start_time)
if self.m_combine == 'mean':
im_stack = np.mean(self.m_image_in_port[current:current+frames, ], axis=0)
elif self.m_combine == 'median':
im_stack = np.median(self.m_image_in_port[current:current+frames, ], axis=0)
self.m_image_out_port.append(im_stack, data_dim=3)
if parang is not None:
parang_new.append(np.mean(parang[current:current+frames]))
current += frames
nimages = np.size(nframes)
self.m_image_out_port.copy_attributes(self.m_image_in_port)