Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
load_max: The max number of worked minutes for a valid selection.
commute_time: The commute time between two appointments in minutes.
Returns:
A matrix where each line is a valid combinations of appointments.
"""
model = cp_model.CpModel()
variables = [
model.NewIntVar(0, load_max // (duration + commute_time), '')
for duration in durations
]
terms = sum(variables[i] * (duration + commute_time)
for i, duration in enumerate(durations))
model.AddLinearConstraint(terms, load_min, load_max)
solver = cp_model.CpSolver()
solution_collector = AllSolutionCollector(variables)
solver.SearchForAllSolutions(model, solution_collector)
return solution_collector.combinations()
In order to make this process easier, we present a mathematical
formulation that models the seating chart problem. This model can
be solved to find the optimal arrangement of guests at tables.
At the very least, it can provide a starting point and hopefully
minimize stress and arguments.
Adapted from
https://github.com/google/or-tools/blob/master/examples/csharp/wedding_optimal_chart.cs
"""
from __future__ import print_function
import time
from ortools.sat.python import cp_model
class WeddingChartPrinter(cp_model.CpSolverSolutionCallback):
"""Print intermediate solutions."""
def __init__(self, seats, names, num_tables, num_guests):
cp_model.CpSolverSolutionCallback.__init__(self)
self.__solution_count = 0
self.__start_time = time.time()
self.__seats = seats
self.__names = names
self.__num_tables = num_tables
self.__num_guests = num_guests
def on_solution_callback(self):
current_time = time.time()
objective = self.ObjectiveValue()
print("Solution %i, time = %f s, objective = %i" %
(self.__solution_count, current_time - self.__start_time,
problem_type = ('Resource Investment Problem'
if problem.is_resource_investment else 'RCPSP')
if problem.is_rcpsp_max:
problem_type += '/Max delay'
# We print 2 less tasks as these are sentinel tasks that are not counted in
# the description of the rcpsp models.
if problem.is_consumer_producer:
print('Solving %s with %i reservoir resources and %i tasks' %
(problem_type, len(problem.resources), len(problem.tasks) - 2))
else:
print('Solving %s with %i resources and %i tasks' %
(problem_type, len(problem.resources), len(problem.tasks) - 2))
# Create the model.
model = cp_model.CpModel()
num_tasks = len(problem.tasks)
num_resources = len(problem.resources)
all_active_tasks = range(1, num_tasks - 1)
all_resources = range(num_resources)
horizon = problem.deadline if problem.deadline != -1 else problem.horizon
if horizon == -1: # Naive computation.
horizon = sum(max(r.duration for r in t.recipes) for t in problem.tasks)
if problem.is_rcpsp_max:
for t in problem.tasks:
for sd in t.successor_delays:
for rd in sd.recipe_delays:
for d in rd.min_delays:
horizon += abs(d)
model.AddMapDomain(positions[p[1]], assign[p[1]])
# Finally add the symmetry breaking constraint.
model.Add(positions[p[0]] <= positions[p[1]])
# Objective.
obj = model.NewIntVar(0, num_slabs * max_loss, 'obj')
losses = [model.NewIntVar(0, max_loss, 'loss_%i' % s) for s in all_slabs]
for s in all_slabs:
model.AddElement(loads[s], loss_array, losses[s])
model.Add(obj == sum(losses))
model.Minimize(obj)
### Solve model.
solver = cp_model.CpSolver()
solver.parameters.num_search_workers = 8
objective_printer = cp_model.ObjectiveSolutionPrinter()
status = solver.SolveWithSolutionCallback(model, objective_printer)
### Output the solution.
if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
print('Loss = %i, time = %f s, %i conflicts' % (
solver.ObjectiveValue(), solver.WallTime(), solver.NumConflicts()))
else:
print('No solution')
model.Add(expr == 2).OnlyEnforceIf(b2)
# expr == 3 when x == 7
b3 = model.NewBoolVar('b3')
model.Add(x == 7).OnlyEnforceIf(b3)
model.Add(expr == 3).OnlyEnforceIf(b3)
# At least one bi is true. (we could use a sum == 1).
model.AddBoolOr([b0, b2, b3])
# Search for x values in increasing order.
model.AddDecisionStrategy([x], cp_model.CHOOSE_FIRST,
cp_model.SELECT_MIN_VALUE)
# Create a solver and solve with a fixed search.
solver = cp_model.CpSolver()
# Force the solver to follow the decision strategy exactly.
solver.parameters.search_branching = cp_model.FIXED_SEARCH
# Search and print out all solutions.
solution_printer = VarArraySolutionPrinter([x, expr])
solver.SearchForAllSolutions(model, solution_printer)
def SchedulingWithCalendarSampleSat():
"""Interval spanning across a lunch break."""
model = cp_model.CpModel()
# The data is the following:
# Work starts at 8h, ends at 18h, with a lunch break between 13h and 14h.
# We need to schedule a task that needs 3 hours of processing time.
# Total duration can be 3 or 4 (if it spans the lunch break).
#
# Because the duration is at least 3 hours, work cannot start after 15h.
# Because of the break, work cannot start at 13h.
start = model.NewIntVarFromDomain(
cp_model.Domain.FromIntervals([(8, 12), (14, 15)]), 'start')
duration = model.NewIntVar(3, 4, 'duration')
end = model.NewIntVar(8, 18, 'end')
unused_interval = model.NewIntervalVar(start, duration, end, 'interval')
# We have 2 states (spanning across lunch or not)
self.num_sections = len(problem.sections)
self.courses = [
x * self.num_levels + y
for x in problem.levels
for y in problem.sections
]
self.num_courses = self.num_levels * self.num_sections
all_courses = range(self.num_courses)
all_teachers = range(self.num_teachers)
all_slots = range(self.num_slots)
all_sections = range(self.num_sections)
all_subjects = range(self.num_subjects)
all_levels = range(self.num_levels)
self.model = cp_model.CpModel()
self.assignment = {}
for c in all_courses:
for s in all_subjects:
for t in all_teachers:
for slot in all_slots:
if t in self.problem.specialties[s]:
name = 'C:{%i} S:{%i} T:{%i} Slot:{%i}' % (c, s, t, slot)
self.assignment[c, s, t, slot] = self.model.NewBoolVar(name)
else:
name = 'NO DISP C:{%i} S:{%i} T:{%i} Slot:{%i}' % (c, s, t, slot)
self.assignment[c, s, t, slot] = self.model.NewIntVar(0, 0, name)
# Constraints
# Each course must have the quantity of classes specified in the curriculum
for c in range(max_capacity + 1)
]
### Model problem.
# Generate all valid slabs (columns)
unsorted_valid_slabs = collect_valid_slabs_dp(capacities, colors, widths,
loss_array)
# Sort slab by descending load/loss. Remove duplicates.
valid_slabs = sorted(
unsorted_valid_slabs, key=lambda c: 1000 * c[-1] + c[-2])
all_valid_slabs = range(len(valid_slabs))
# create model and decision variables.
model = cp_model.CpModel()
selected = [model.NewBoolVar('selected_%i' % i) for i in all_valid_slabs]
for order_id in all_orders:
model.Add(
sum(selected[i] for i, slab in enumerate(valid_slabs)
if slab[order_id]) == 1)
# Redundant constraint (sum of loads == sum of widths).
model.Add(
sum(selected[i] * valid_slabs[i][-1]
for i in all_valid_slabs) == sum(widths))
# Objective.
model.Minimize(
sum(selected[i] * valid_slabs[i][-2] for i in all_valid_slabs))
def main():
"""Create the shift scheduling model and solve it."""
# Create the model.
model = cp_model.CpModel()
#
# data
#
num_vendors = 9
num_hours = 10
num_work_types = 1
traffic = [100, 500, 100, 200, 320, 300, 200, 220, 300, 120]
max_traffic_per_vendor = 100
# Last columns are :
# index_of_the_schedule, sum of worked hours (per work type).
# The index is useful for branching.
possible_schedules = [[1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 0,
8], [1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 1,
model.Add(
all_tasks[(job, index + 1)].start >= all_tasks[(job,
index)].end)
# Makespan objective.
obj_var = model.NewIntVar(0, horizon, 'makespan')
model.AddMaxEquality(
obj_var,
[all_tasks[(job, len(machines[job]) - 1)].end for job in all_jobs])
model.Minimize(obj_var)
# Solve model.
solver = cp_model.CpSolver()
status = solver.Solve(model)
if status == cp_model.OPTIMAL:
# Print out makespan.
print('Optimal Schedule Length: %i' % solver.ObjectiveValue())
print()
# Create one list of assigned tasks per machine.
assigned_jobs = [[] for _ in range(machines_count)]
for job in all_jobs:
for index in range(len(machines[job])):
machine = machines[job][index]
assigned_jobs[machine].append(
assigned_task_type(
start=solver.Value(all_tasks[(job, index)].start),
job=job,
index=index))
disp_col_width = 10