Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# * the expression for the fitness function has been rewritten to
# avoid multiple log computations, and to avoid power computations
# * the use of scipy.weave and numexpr has been evaluated. The latter
# gives a big gain (~40%) if used for the fitness function. No other
# gain is obtained by using it anywhere else
# Set numexpr precision to low (more than enough for us), which is
# faster than high
oldaccuracy = numexpr.set_vml_accuracy_mode("low")
numexpr.set_num_threads(1)
numexpr.set_vml_num_threads(1)
# Speed tricks: resolve once for all the functions which will be used
# in the loop
numexpr_evaluate = numexpr.evaluate
numexpr_re_evaluate = numexpr.re_evaluate
# Pre-compute this
aranges = np.arange(N + 1, 0, -1)
for R in range(N):
br = block_length[R + 1]
T_k = (
block_length[: R + 1] - br
) # this looks like it is not used, but it actually is,
# inside the numexpr expression
# N_k: number of elements in each block
# This expression has been simplified for the case of
# unbinned events (i.e., one element in each block)
# It was:
im_min, im_max = (-4.5, 4.5)
h_resolution = 1000
iterations = 500
sub_pixel_sample = 2
record_history_length = 500
h_resolution *= sub_pixel_sample
v_resolution = int(h_resolution * (im_max - im_min) / (re_max - re_min))
initial_zs = (np.expand_dims(np.linspace(re_min, re_max, h_resolution), 0) + 1j * np.expand_dims(np.linspace(im_min, im_max, v_resolution), 1)).flatten()
# Iterate raising to the power
zs = initial_zs
zs = ne.evaluate('initial_zs**zs')
for i in tqdm(range(iterations)):
zs = ne.re_evaluate()
final_zs = np.copy(zs)
# Record periodicities
period = np.zeros_like(final_zs, dtype=np.int32)
for i in tqdm(range(record_history_length)):
zs = ne.evaluate('initial_zs**zs')
close = is_close(final_zs, zs, atol=1e-6)
# If we've found a repeat for a point we've not previously
# found a repeat for, record the period
period[np.logical_and(period==0, close)] = i + 1
# Image array, initially white
rgb = np.ones((h_resolution * v_resolution, 3)) * 255
# Diverging points are black
rest_wavs = rest_wavs_dict.setdefault(
bi, self._sample_wavelengths[bi] * Azp1)
else:
rest_wavs = np.array( # noqa: F841
[czp1 / self._frequencies[li]])
radius_phot = self._radius_phot[li] # noqa: F841
temperature_phot = self._temperature_phot[li] # noqa: F841
if not evaled:
seds.append(ne.evaluate(
'fc * radius_phot**2 / rest_wavs**5 / '
'expm1(xc / rest_wavs / temperature_phot)'))
evaled = True
else:
seds.append(ne.re_evaluate())
seds[-1][np.isnan(seds[-1])] = 0.0
seds = self.add_to_existing_seds(seds, **kwargs)
# Units of `seds` is ergs / s.
return {'sample_wavelengths': self._sample_wavelengths, 'seds': seds}
seds = []
for li, lum in enumerate(self._luminosities):
cur_band = self._bands[li] # noqa: F841
bi = self._band_indices[li]
rest_freqs = [x * zp1 # noqa: F841
for x in self._sample_frequencies[bi]]
wav_arr = np.array(self._sample_wavelengths[bi]) # noqa: F841
radius_phot = self._radius_phots[li] # noqa: F841
temperature_phot = self._temperature_phots[li] # noqa: F841
if li == 0:
sed = ne.evaluate(
'fc * radius_phot**2 * rest_freqs**3 / '
'(exp(xc * rest_freqs / temperature_phot) - 1.0)')
else:
sed = ne.re_evaluate()
sed = np.nan_to_num(sed)
seds.append(list(sed))
seds = self.add_to_existing_seds(seds, **kwargs)
# Units of `seds` is ergs / s / Angstrom.
return {'sample_wavelengths': self._sample_wavelengths,
self.key('seds'): seds}
ab = rest_wavs < cwave_ac # noqa: F841
tpi = tp[li] # noqa: F841
rp2i = rp2[li] # noqa: F841
if not evaled:
# Absorbed blackbody: 0% transmission at 0 Angstroms 100% at
# >3000 Angstroms.
sed = ne.evaluate(
"where(ab, fc * (rp2i / cwave_ac / "
"rest_wavs ** 4) / expm1(xc / rest_wavs / tpi), "
"fc * (rp2i / rest_wavs ** 5) / "
"expm1(xc / rest_wavs / tpi))"
)
evaled = True
else:
sed = ne.re_evaluate()
sed[np.isnan(sed)] = 0.0
seds[li] = sed
uniq_times = np.unique(self._times)
tsort = np.argsort(self._times)
uniq_is = np.searchsorted(self._times, uniq_times, sorter=tsort)
lu = len(uniq_times)
norms = self._luminosities[
uniq_is] / (fc / ac * rp2[uniq_is] * tp[uniq_is])
rp2 = rp2[uniq_is].reshape(lu, 1)
tp = tp[uniq_is].reshape(lu, 1)
tp2 = tp * tp
tp3 = tp2 * tp # noqa: F841
seds.append([0.0])
continue
if bi >= 0:
rest_wavs = (self._sample_wavelengths[bi] *
u.Angstrom.cgs.scale / zp1)
else:
rest_wavs = [cc / (self._frequencies[li] * zp1)] # noqa: F841
amp = lum * amps[li]
if not evaled:
sed = ne.evaluate(
'amp * exp(-0.5 * ((rest_wavs - lw) / ls) ** 2)')
evaled = True
else:
sed = ne.re_evaluate()
sed = np.nan_to_num(sed)
norm = (lum + amp / zp1 * np.sqrt(np.pi / 2.0) * (
1.0 + erf(lw / (np.sqrt(2.0) * ls)))) / lum
seds[li] += sed
seds[li] /= norm
return {'sample_wavelengths': self._sample_wavelengths,
self.key('seds'): seds}
te2 = te ** 2
tb = max(np.sqrt(max(te2 - tbarg, 0.0)), min_te)
int_times = np.linspace(tb, te, self.N_INT_TIMES)
dt = int_times[1] - int_times[0]
td = self._tau_diff # noqa: F841
int_lums = np.interp( # noqa: F841
int_times, self._dense_times_since_exp,
self._dense_luminosities)
if not evaled:
int_arg = ne.evaluate('int_lums * int_times / td**2 * '
'exp((int_times - te) / td)')
evaled = True
else:
int_arg = ne.re_evaluate()
int_arg[np.isnan(int_arg)] = 0.0
lum_val = np.trapz(int_arg, dx=dt)
lum_cache[te] = lum_val
new_lum[ti] = lum_val
return {self.dense_key('luminosities'): new_lum}