Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.max = max
self.boolean_constraint = boolean_constraint
def compute_constraint(self, times, observer, targets):
cached_altaz = _get_altaz(times, observer, targets)
alt = cached_altaz['altaz'].alt
if self.boolean_constraint:
lowermask = self.min <= alt
uppermask = alt <= self.max
return lowermask & uppermask
else:
return max_best_rescale(alt, self.min, self.max)
class AirmassConstraint(AltitudeConstraint):
"""
Constrain the airmass of a target.
In the current implementation the airmass is approximated by the secant of
the zenith angle.
.. note::
The ``max`` and ``min`` arguments appear in the order (max, min)
in this initializer to support the common case for users who care
about the upper limit on the airmass (``max``) and not the lower
limit.
Parameters
----------
max : float or `None`
Maximum airmass of the target. `None` indicates no limit.
a - 1*u.minute, a - 1*u.minute])
pre_filled = filled_times.reshape((2, 2))
else:
filled_times = Time(pre_filled.flatten())
pre_filled = filled_times.reshape((int(len(filled_times)/2), 2))
for b in blocks:
if b.constraints is None:
b._all_constraints = self.constraints
else:
b._all_constraints = self.constraints + b.constraints
# to make sure the scheduler has some constraint to work off of
# and to prevent scheduling of targets below the horizon
# TODO : change default constraints to [] and switch to append
if b._all_constraints is None:
b._all_constraints = [AltitudeConstraint(min=0 * u.deg)]
b.constraints = [AltitudeConstraint(min=0 * u.deg)]
elif not any(isinstance(c, AltitudeConstraint) for c in b._all_constraints):
b._all_constraints.append(AltitudeConstraint(min=0 * u.deg))
if b.constraints is None:
b.constraints = [AltitudeConstraint(min=0 * u.deg)]
else:
b.constraints.append(AltitudeConstraint(min=0 * u.deg))
b._duration_offsets = u.Quantity([0*u.second, b.duration/2,
b.duration])
b.observer = self.observer
current_time = self.schedule.start_time
while (len(blocks) > 0) and (current_time < self.schedule.end_time):
# first compute the value of all the constraints for each block
# given the current starting time
block_transitions = []
block_constraint_results = []
for b in blocks:
def _make_schedule(self, blocks):
# Combine individual constraints with global constraints, and
# retrieve priorities from each block to define scheduling order
_all_times = []
_block_priorities = np.zeros(len(blocks))
# make sure we don't schedule below the horizon
if self.constraints is None:
self.constraints = [AltitudeConstraint(min=0 * u.deg)]
else:
self.constraints.append(AltitudeConstraint(min=0 * u.deg))
for i, b in enumerate(blocks):
b._duration_offsets = u.Quantity([0 * u.second, b.duration / 2, b.duration])
_block_priorities[i] = b.priority
_all_times.append(b.duration)
b.observer = self.observer
# Define a master schedule
# Generate grid of time slots, and a mask for previous observations
time_resolution = self.time_resolution
times = time_grid_from_range([self.schedule.start_time, self.schedule.end_time],
time_resolution=time_resolution)
if b.constraints is None:
b._all_constraints = self.constraints
else:
b._all_constraints = self.constraints + b.constraints
# to make sure the scheduler has some constraint to work off of
# and to prevent scheduling of targets below the horizon
# TODO : change default constraints to [] and switch to append
if b._all_constraints is None:
b._all_constraints = [AltitudeConstraint(min=0 * u.deg)]
b.constraints = [AltitudeConstraint(min=0 * u.deg)]
elif not any(isinstance(c, AltitudeConstraint) for c in b._all_constraints):
b._all_constraints.append(AltitudeConstraint(min=0 * u.deg))
if b.constraints is None:
b.constraints = [AltitudeConstraint(min=0 * u.deg)]
else:
b.constraints.append(AltitudeConstraint(min=0 * u.deg))
b._duration_offsets = u.Quantity([0*u.second, b.duration/2,
b.duration])
b.observer = self.observer
current_time = self.schedule.start_time
while (len(blocks) > 0) and (current_time < self.schedule.end_time):
# first compute the value of all the constraints for each block
# given the current starting time
block_transitions = []
block_constraint_results = []
for b in blocks:
# first figure out the transition
if len(self.schedule.observing_blocks) > 0:
trans = self.transitioner(
self.schedule.observing_blocks[-1], b, current_time, self.observer)
else:
trans = None
filled_times = Time([a - 1*u.hour, a - 1*u.hour,
a - 1*u.minute, a - 1*u.minute])
pre_filled = filled_times.reshape((2, 2))
else:
filled_times = Time(pre_filled.flatten())
pre_filled = filled_times.reshape((int(len(filled_times)/2), 2))
for b in blocks:
if b.constraints is None:
b._all_constraints = self.constraints
else:
b._all_constraints = self.constraints + b.constraints
# to make sure the scheduler has some constraint to work off of
# and to prevent scheduling of targets below the horizon
# TODO : change default constraints to [] and switch to append
if b._all_constraints is None:
b._all_constraints = [AltitudeConstraint(min=0 * u.deg)]
b.constraints = [AltitudeConstraint(min=0 * u.deg)]
elif not any(isinstance(c, AltitudeConstraint) for c in b._all_constraints):
b._all_constraints.append(AltitudeConstraint(min=0 * u.deg))
if b.constraints is None:
b.constraints = [AltitudeConstraint(min=0 * u.deg)]
else:
b.constraints.append(AltitudeConstraint(min=0 * u.deg))
b._duration_offsets = u.Quantity([0*u.second, b.duration/2,
b.duration])
b.observer = self.observer
current_time = self.schedule.start_time
while (len(blocks) > 0) and (current_time < self.schedule.end_time):
# first compute the value of all the constraints for each block
# given the current starting time
block_transitions = []
block_constraint_results = []