Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
reference = np.median(reference, axis=0)
self.apply_function_to_images(_subtract_psf,
self.m_image_in_port,
self.m_res_out_port,
'Classical ADI',
func_args=(parang_thres, self.m_nreference, reference))
self.m_res_in_port = self.add_input_port(self.m_res_out_port._m_tag)
im_res = self.m_res_in_port.get_all()
res_rot = np.zeros(im_res.shape)
for i, item in enumerate(parang):
res_rot[i, ] = rotate(im_res[i, ], item, reshape=False)
stack = combine_residuals(self.m_residuals,
res_rot,
residuals=im_res,
angles=parang)
self.m_stack_out_port.set_all(stack)
if self.m_threshold:
history = f'threshold [deg] = {parang_thres:.2f}'
else:
history = 'threshold [deg] = None'
self.m_res_out_port.copy_attributes(self.m_image_in_port)
self.m_res_out_port.add_history('ClassicalADIModule', history)
self.m_stack_out_port.copy_attributes(self.m_image_in_port)
self.m_stack_out_port.add_history('ClassicalADIModule', history)
sep, ang, mag = param
fake = fake_planet(images=images,
psf=psf,
parang=parang-extra_rot,
position=(sep/pixscale, ang),
magnitude=mag,
psf_scaling=psf_scaling)
im_res_rot, im_res_derot = pca_psf_subtraction(images=fake*mask,
angles=-1.*parang+extra_rot,
pca_number=pca_number,
indices=indices)
res_stack = combine_residuals(method=residuals,
res_rot=im_res_derot,
residuals=im_res_rot,
angles=parang)
chi_square = merit_function(residuals=res_stack[0, ],
merit=merit,
aperture=aperture,
sigma=0.,
var_noise=var_noise)
return -0.5*chi_square
# Inject the fake planet
fake = fake_planet(images=images,
psf=psf,
parang=parang,
position=(position[0], position[1]),
magnitude=mag,
psf_scaling=psf_scaling)
# Run the PSF subtraction
_, im_res = pca_psf_subtraction(images=fake*mask,
angles=-1.*parang+extra_rot,
pca_number=pca_number)
# Stack the residuals
im_res = combine_residuals(method=residuals, res_rot=im_res)
# Measure the flux of the fake planet
flux_out, _, _, _ = false_alarm(image=im_res[0, ],
x_pos=xy_fake[0],
y_pos=xy_fake[1],
size=aperture,
ignore=False)
# Calculate the amount of self-subtraction
attenuation = flux_out/flux_in
# Calculate the detection limit
contrast = sigma*t_noise/(attenuation*star)
# The flux_out can be negative, for example if the aperture includes self-subtraction regions
if contrast > 0.:
# Inject the fake planet
fake = fake_planet(images=images,
psf=psf,
parang=parang,
position=(position[0], position[1]),
magnitude=mag,
psf_scaling=psf_scaling)
# Run the PSF subtraction
_, im_res = pca_psf_subtraction(images=fake*mask,
angles=-1.*parang+extra_rot,
pca_number=pca_number)
# Stack the residuals
im_res = combine_residuals(method=residuals, res_rot=im_res)
# Measure the flux of the fake planet
flux_out, _, _, _ = false_alarm(image=im_res[0, ],
x_pos=xy_fake[0],
y_pos=xy_fake[1],
size=aperture,
ignore=False)
# Calculate the amount of self-subtraction
attenuation = flux_out/flux_in
# Calculate the detection limit
contrast = sigma*t_noise/(attenuation*star)
# The flux_out can be negative, for example if the aperture includes self-subtraction regions
if contrast > 0.:
# 2.) mean residuals
if self.m_res_mean_out_port is not None:
if self.m_processing_type in ['SDI+ADI', 'ADI+SDI']:
out_array_mean[i, j] = combine_residuals(method='mean',
res_rot=res_rot,
angles=parang)
else:
out_array_mean[i] = combine_residuals(method='mean',
res_rot=res_rot,
angles=parang)
# 3.) median residuals
if self.m_res_median_out_port is not None:
if self.m_processing_type in ['SDI+ADI', 'ADI+SDI']:
out_array_medi[i, j] = combine_residuals(method='median',
res_rot=res_rot,
angles=parang)
else:
out_array_medi[i] = combine_residuals(method='median',
res_rot=res_rot,
angles=parang)
# 4.) noise-weighted residuals
if self.m_res_weighted_out_port is not None:
if self.m_processing_type in ['SDI+ADI', 'ADI+SDI']:
out_array_weig[i, j] = combine_residuals(method='weighted',
res_rot=res_rot,
residuals=residuals,
angles=parang)
if self.m_requirements[0]:
res_output[0, ] = combine_residuals(method='mean',
res_rot=res_rot)
if self.m_requirements[1]:
res_output[1, ] = combine_residuals(method='median',
res_rot=res_rot)
if self.m_requirements[2]:
res_output[2, ] = combine_residuals(method='weighted',
res_rot=res_rot,
residuals=residuals,
angles=self.m_angles)
if self.m_requirements[3]:
res_output[3, ] = combine_residuals(method='clipped',
res_rot=res_rot)
sys.stdout.write('.')
sys.stdout.flush()
return TaskResult(res_output, tmp_task.m_job_parameter[0])
async_results = []
# Create temporary files
tmp_im_str = os.path.join(working_place, 'tmp_images.npy')
tmp_psf_str = os.path.join(working_place, 'tmp_psf.npy')
np.save(tmp_im_str, images)
np.save(tmp_psf_str, psf)
mask = create_mask(images.shape[-2:], (self.m_cent_size, self.m_edge_size))
_, im_res = pca_psf_subtraction(images=images*mask,
angles=-1.*parang+self.m_extra_rot,
pca_number=self.m_pca_number)
noise = combine_residuals(method=self.m_residuals, res_rot=im_res)
pool = mp.Pool(cpu)
for pos in positions:
async_results.append(pool.apply_async(contrast_limit,
args=(tmp_im_str,
tmp_psf_str,
noise,
mask,
parang,
self.m_psf_scaling,
self.m_extra_rot,
self.m_pca_number,
self.m_threshold,
self.m_aperture,
self.m_residuals,
res_output = np.zeros((4, res_rot.shape[-2], res_rot.shape[-1]))
else:
res_output = np.zeros((4, len(self.m_star_reshape),
res_rot.shape[-2], res_rot.shape[-1]))
if self.m_requirements[0]:
res_output[0, ] = combine_residuals(method='mean',
res_rot=res_rot)
if self.m_requirements[1]:
res_output[1, ] = combine_residuals(method='median',
res_rot=res_rot)
if self.m_requirements[2]:
res_output[2, ] = combine_residuals(method='weighted',
res_rot=res_rot,
residuals=residuals,
angles=self.m_angles)
if self.m_requirements[3]:
res_output[3, ] = combine_residuals(method='clipped',
res_rot=res_rot)
sys.stdout.write('.')
sys.stdout.flush()
return TaskResult(res_output, tmp_task.m_job_parameter[0])
im_shape=None,
indices=None)
else:
# PSF subtraction with separate reference data (RDI)
im_reshape = np.reshape(fake*mask, (im_shape[0], im_shape[1]*im_shape[2]))
im_res_rot, im_res_derot = pca_psf_subtraction(images=im_reshape,
angles=-1.*parang+self.m_extra_rot,
pca_number=n_components,
pca_sklearn=sklearn_pca,
im_shape=im_shape,
indices=None)
# Collapse the residuals of the PSF subtraction
res_stack = combine_residuals(method=self.m_residuals,
res_rot=im_res_derot,
residuals=im_res_rot,
angles=parang)
# Appedn the collapsed residuals to the output port
self.m_res_out_port[count].append(res_stack, data_dim=3)
# Calculate the chi-square for the tested position and contrast
chi_square = merit_function(residuals=res_stack[0, ],
merit=self.m_merit,
aperture=aperture,
sigma=self.m_sigma,
var_noise=var_noise)
# Apply the extra rotation to the y and x position
# The returned position is given as (y, x)