Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Write cropped images to output port
self.m_image_out_port.append(images, data_dim=3)
# Update progress bar (cropping of images is finished)
sys.stdout.write('Running CropImagesModule... [DONE]\n')
sys.stdout.flush()
# Save history and copy attributes
history = f'image size [pix] = {self.m_size}'
self.m_image_out_port.add_history('CropImagesModule', history)
self.m_image_out_port.copy_attributes(self.m_image_in_port)
self.m_image_out_port.close_port()
class ScaleImagesModule(ProcessingModule):
"""
Pipeline module for rescaling of an image.
"""
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
image_out_tag: str,
scaling: Union[Tuple[float, float, float],
Tuple[None, None, float],
Tuple[float, float, None]],
pixscale: bool = False) -> None:
"""
Parameters
----------
# Select and crop images in the current chunk
images = self.m_image_in_port[frames[i]:frames[i+1], ]
images = crop_image(images, self.m_center, self.m_size, copy=False)
# Write cropped images to output port
self.m_image_out_port.append(images, data_dim=3)
# Save history and copy attributes
history = f'image size (pix) = {self.m_size}'
self.m_image_out_port.add_history('CropImagesModule', history)
self.m_image_out_port.copy_attributes(self.m_image_in_port)
self.m_image_out_port.close_port()
class ScaleImagesModule(ProcessingModule):
"""
Pipeline module for rescaling of an image.
"""
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
image_out_tag: str,
scaling: Union[Tuple[float, float, float],
Tuple[None, None, float],
Tuple[float, float, None]],
pixscale: bool = False) -> None:
"""
Parameters
----------
try:
autocorr = emcee.autocorr.integrated_time(sampler.get_chain())
print(f'Integrated autocorrelation time = {autocorr}')
except emcee.autocorr.AutocorrError:
autocorr = [np.nan, np.nan, np.nan]
print('The chain is too short to reliably estimate the autocorrelation time. [WARNING]')
self.m_chain_out_port.add_attribute('AUTOCORR_0', autocorr[0], static=True)
self.m_chain_out_port.add_attribute('AUTOCORR_1', autocorr[1], static=True)
self.m_chain_out_port.add_attribute('AUTOCORR_2', autocorr[2], static=True)
self.m_chain_out_port.close_port()
class AperturePhotometryModule(ProcessingModule):
"""
Pipeline module for calculating the counts within a circular area.
"""
__author__ = 'Tomas Stolker'
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
phot_out_tag: str,
radius: float = 0.1,
position: Tuple[float, float] = None) -> None:
"""
Parameters
----------
from typing import Optional, Tuple, Union
import numpy as np
from typeguard import typechecked
from skimage.metrics import structural_similarity, mean_squared_error
from pynpoint.core.processing import ProcessingModule
from pynpoint.util.image import crop_image, pixel_distance, center_pixel
from pynpoint.util.module import progress
from pynpoint.util.remove import write_selected_data, write_selected_attributes
from pynpoint.util.star import star_positions
class RemoveFramesModule(ProcessingModule):
"""
Pipeline module for removing images by their index number.
"""
__author__ = 'Tomas Stolker'
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
selected_out_tag: str,
removed_out_tag: str,
frames: Union[str, range, list, np.ndarray]) -> None:
"""
Parameters
----------
if steps[i] == 1:
new_angles = np.append(new_angles,
[(parang_start[i] + parang_end[i])/2.])
elif steps[i] != 1:
new_angles = np.append(new_angles,
np.linspace(parang_start[i],
parang_end[i],
num=steps[i]))
self.m_data_out_port.add_attribute('PARANG',
new_angles,
static=False)
class SortParangModule(ProcessingModule):
"""
Module to sort the images and attributes with increasing ``INDEX``.
"""
__author__ = 'Tomas Stolker'
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
image_out_tag: str) -> None:
"""
Parameters
----------
name_in : str
Unique name of the module instance.
self.m_scaling_flux))
history = f'scaling = ({self.m_scaling_x:.2f}, {self.m_scaling_y:.2f}, ' \
f'{self.m_scaling_flux:.2f})'
self.m_image_out_port.add_history('ScaleImagesModule', history)
self.m_image_out_port.copy_attributes(self.m_image_in_port)
if self.m_pixscale:
mean_scaling = (self.m_scaling_x+self.m_scaling_y)/2.
self.m_image_out_port.add_attribute('PIXSCALE', pixscale/mean_scaling)
self.m_image_out_port.close_port()
class AddLinesModule(ProcessingModule):
"""
Module to add lines of pixels to increase the size of an image.
"""
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
image_out_tag: str,
lines: Tuple[int, int, int, int]) -> None:
"""
Parameters
----------
name_in : str
Unique name of the module instance.
image_in_tag : str
progress(i, nframes, 'Subtracting background...', start_time)
subtract = self.m_image_in_port[i] - self.m_image_in_port[(i + self.m_shift) % nframes]
if self.m_image_in_port.tag == self.m_image_out_port.tag:
self.m_image_out_port[i] = subtract
else:
self.m_image_out_port.append(subtract)
history = f'shift = {self.m_shift}'
self.m_image_out_port.copy_attributes(self.m_image_in_port)
self.m_image_out_port.add_history('SimpleBackgroundSubtractionModule', history)
self.m_image_out_port.close_port()
class MeanBackgroundSubtractionModule(ProcessingModule):
"""
Pipeline module for mean background subtraction. Only applicable on data obtained with
dithering.
"""
__author__ = 'Markus Bonse, Tomas Stolker'
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
image_out_tag: str,
shift: Optional[int] = None,
cubes: int = 1) -> None:
"""
name_in : str
-------
bool
Module validation.
str
Module name.
"""
if isinstance(module, ReadingModule):
tags.extend(module.get_all_output_tags())
elif isinstance(module, WritingModule):
for tag in module.get_all_input_tags():
if tag not in tags:
return False, module.name
elif isinstance(module, ProcessingModule):
tags.extend(module.get_all_output_tags())
for tag in module.get_all_input_tags():
if tag not in tags:
return False, module.name
else:
return False, None
return True, None
x_pos, y_pos = self.m_position[0], self.m_position[1]
print(f'Image {j+1:03d}/{nimages} -> (x, y) = ({x_pos:.2f}, {y_pos:.2f}), S/N = {snr:.2f}, FPF = {fpf:.2e}')
sep_ang = cartesian_to_polar(center, y_pos, x_pos)
result = np.column_stack((x_pos, y_pos, sep_ang[0]*pixscale, sep_ang[1], snr, fpf))
self.m_snr_out_port.append(result, data_dim=2)
history = f'aperture (arcsec) = {self.m_aperture*pixscale:.2f}'
self.m_snr_out_port.copy_attributes(self.m_image_in_port)
self.m_snr_out_port.add_history('FalsePositiveModule', history)
self.m_snr_out_port.close_port()
class MCMCsamplingModule(ProcessingModule):
"""
Pipeline module to measure the separation, position angle, and contrast of a planet with
injection of negative artificial planets and sampling of the posterior distribution with
``emcee``, an affine invariant Markov chain Monte Carlo (MCMC) ensemble sampler.
"""
__author__ = 'Tomas Stolker'
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
psf_in_tag: str,
chain_out_tag: str,
param: Tuple[float, float, float],
bounds: Tuple[Tuple[float, float], Tuple[float, float], Tuple[float, float]],
None
"""
# create list of supported wavelets
supported = []
for family in pywt.families():
supported += pywt.wavelist(family)
# check if wavelet is supported
if wavelet not in supported:
raise ValueError(f'DWT supports only {supported} as input wavelet.')
self.m_wavelet = wavelet
class WaveletTimeDenoisingModule(ProcessingModule):
"""
Pipeline module for speckle subtraction in the time domain by using CWT or DWT wavelet
shrinkage (see Bonse et al. 2018).
"""
__author__ = 'Markus Bonse, Tomas Stolker'
@typechecked
def __init__(self,
name_in: str,
image_in_tag: str,
image_out_tag: str,
wavelet_configuration: Union[CwtWaveletConfiguration, DwtWaveletConfiguration],
padding: str = 'zero',
median_filter: bool = False,
threshold_function: str = 'soft') -> None: