Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@overrides(Optimization)
def addNodeCapacityConstraint(self, pptc, resource, nodecaps,
nodeCapFunction):
for node in nodecaps:
loadstr = 'Load_{}_{}'.format(resource, node)
self.nodeLoads[node][resource][loadstr] = 1
self.cplexprob.variables.add(names=[loadstr])
v = self.cplexprob.variables.get_names()
varindex = dict(izip(v, range(len(v))))
for node in nodecaps:
cap = nodecaps[node]
loadstr = 'Load_{}_{}'.format(resource, node)
var = [varindex[loadstr]]
mults = [-1]
for tc in pptc:
for pi, path in enumerate(pptc[tc]):
@overrides(Optimization)
def load(self, fname):
return self.cplexprob.start.read_basis(fname)
@overrides(Optimization)
def solve(self):
"""
Call the solver and solve the CPLEX problem
"""
return self.cplexprob.solve()
@overrides(Optimization)
def setPredefinedObjective(self, objective, resource=None, routingCostFunc=len, pptc=None):
if objective.lower() == 'maxallflow':
self.defineVar('TotalFlow', {name: 1 for name in
self.allocationVars},
lowerBound=0)
self.setObjectiveCoeff({'TotalFlow': 1}, 'max')
elif objective.lower() == 'maxminflow':
self.defineVar('MinFlow')
varindex = self.getVarIndex()
self.setObjectiveCoeff({varindex['MinFlow']: 1}, 'max')
for allocation in self.allocationVars:
self.cplexprob.linear_constraints.add(
[cplex.SparsePair([varindex['MinFlow'], allocation],
[1, -1])],
senses=['L'], rhs=[0])
elif objective.lower() == 'minroutingcost':
@overrides(Optimization)
def addAllocateFlowConstraint(self, pptc, allocation=None):
v = self.cplexprob.variables.get_names()
for tc in pptc:
name = self.al(tc)
if name not in v:
self.cplexprob.variables.add(names=[name], lb=[0], ub=[1])
self.allocationVars.append(name)
v = self.cplexprob.variables.get_names()
varindex = dict(izip(v, range(len(v))))
if allocation is None:
for tc in pptc:
var = []
mults = []
for pi in range(len(pptc[tc])):
var.append(varindex[self.xp(tc, pi)])
mults.append(1/tc.priority)
@overrides(Optimization)
def defineVar(self, name, coeffs=None, const=0, lowerBound=None,
upperBound=None):
self.cplexprob.variables.add(names=[name])
if lowerBound is not None:
self.cplexprob.variables.set_lower_bounds(name, lowerBound)
if upperBound is not None:
self.cplexprob.variables.set_upper_bounds(name, upperBound)
if coeffs is None:
return
v = self.cplexprob.variables.get_names()
varindex = dict(izip(v, range(len(v))))
var = coeffs.keys()
mults = coeffs.values()
var.append(varindex[name])
mults.append(-1.0)
self.cplexprob.linear_constraints.add(
@overrides(Optimization)
def addRequireAllNodesConstraint(self, pptc, trafficClasses=None):
v = self.cplexprob.variables.get_names()
varindex = dict(izip(v, range(len(v))))
if trafficClasses is None:
trafficClasses = pptc.keys()
for tc in trafficClasses:
for pi, path in enumerate(pptc[tc]):
for n in path:
self.cplexprob.linear_constraints.add(
[cplex.SparsePair([varindex[self.bp(tc, pi)],
varindex[self.bn(n)]],
[1, -1])],
rhs=[0], senses='L')
@overrides(Optimization)
def defineVarSymbolic(self, name, symbolicEq):
raise NotImplementedError
@overrides(Optimization)
def getPathFractions(self, pptc, flowCarryingOnly=True):
result = {}
for tc, paths in pptc.iteritems():
result[tc] = []
for index, path in enumerate(paths):
newpath = copy.copy(path)
newpath.setNumFlows(self.cplexprob.solution.get_values(
self.xp(tc, index)))
if newpath.getNumFlows() > 0 and flowCarryingOnly:
result[tc].append(newpath)
elif not flowCarryingOnly:
result[tc].append(newpath)
return result
@overrides(Optimization)
def addEnforceSinglePath(self, pptc, trafficClasses=None):
v = self.cplexprob.variables.get_names()
varindex = dict(izip(v, range(len(v))))
if trafficClasses is None:
trafficClasses = pptc.keys()
for tc in trafficClasses:
var = []
for pi, path in enumerate(pptc[tc]):
var.append(varindex[self.bp(tc, pi)])
self.cplexprob.linear_constraints.add(
[cplex.SparsePair(var, [1] * len(var))], senses=['E'], rhs=[1],
names=['singlepath_{}'.format(tc.ID)])