Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main():
DUAROUTER = sumolib.checkBinary('duarouter')
options = get_options()
net = readNet(options.network)
routeInfos = {} # id-> RouteInfo
if options.standalone:
for route in parse(options.routeFile, 'route'):
ri = RouteInfo()
ri.edges = route.edges.split()
routeInfos[route.id] = ri
else:
for vehicle in parse(options.routeFile, 'vehicle'):
ri = RouteInfo()
ri.edges = vehicle.route[0].edges.split()
routeInfos[vehicle.id] = ri
for rInfo in routeInfos.values():
rInfo.airDist = euclidean(
net.getEdge(rInfo.edges[0]).getShape()[0],
net.getEdge(rInfo.edges[-1]).getShape()[-1])
rInfo.length = getRouteLength(net, rInfo.edges)
rInfo.airDistRatio = rInfo.length / rInfo.airDist
def main(options):
# cache stand-alone routes
routesDepart = {} # first edge for each route
routesArrival = {} # last edge for each route
with codecs.open(options.outfile, 'w', encoding='utf8') as out:
out.write("\n")
for route in parse(options.infile, "route"):
if route.hasAttribute('id') and route.id is not None:
routesDepart[route.id] = route.edges.split()[0]
routesArrival[route.id] = route.edges.split()[-1]
out.write(route.toXML(' '))
for obj in parse(options.infile, ['vehicle', 'trip', 'flow', 'vType'],
heterogeneous=options.heterogeneous, warn=False):
if obj.name == 'vType':
# copy
pass
else:
if options.modify_ids:
obj.id += options.name_suffix
# compute depart-edge filter
departEdge = None
if options.depart_edges is not None:
# determine the departEdge of the current vehicle
if obj.name == 'trip':
departEdge = obj.attr_from
elif obj.name == 'vehicle':
if obj.hasAttribute('route') and obj.route is not None:
def getFlows(routeFiles, verbose):
# get flows for each edge pair
for file in routeFiles.split(','):
edgePairFlowsMap = {}
if verbose:
print("route file:%s" % file)
for veh in sumolib.output.parse(file, 'vehicle'):
edgesList = veh.route[0].edges.split()
for i, e in enumerate(edgesList):
if i < len(edgesList)-1:
next = edgesList[i+1]
if e not in edgePairFlowsMap:
edgePairFlowsMap[e] = {}
if next not in edgePairFlowsMap[e]:
edgePairFlowsMap[e][next] = 0
edgePairFlowsMap[e][next] += 1
return edgePairFlowsMap
def main(options):
# cache stand-alone routes
routesDepart = {} # first edge for each route
routesArrival = {} # last edge for each route
with codecs.open(options.outfile, 'w', encoding='utf8') as out:
out.write("\n")
for route in parse(options.infile, "route"):
if route.hasAttribute('id') and route.id is not None:
routesDepart[route.id] = route.edges.split()[0]
routesArrival[route.id] = route.edges.split()[-1]
out.write(route.toXML(' '))
for obj in parse(options.infile, ['vehicle', 'trip', 'flow', 'vType'],
heterogeneous=options.heterogeneous, warn=False):
if obj.name == 'vType':
# copy
pass
else:
if options.modify_ids:
obj.id += options.name_suffix
# compute depart-edge filter
departEdge = None
def trackLanes(netstate, out):
# veh_id -> values
laneTimes = defaultdict(list)
laneChanges = defaultdict(lambda: 0)
lastEdge = defaultdict(lambda: None)
arrivals = {}
running = set()
with open(out, 'w') as f:
f.write("\n")
for timestep in parse(netstate, 'timestep'):
seen = set()
if timestep.edge is not None:
for edge in timestep.edge:
if edge.lane is not None:
for lane in edge.lane:
if lane.vehicle is not None:
for vehicle in lane.vehicle:
seen.add(vehicle.id)
if vehicle.id not in running or laneTimes[vehicle.id][-1][1] != lane.id:
laneTimes[vehicle.id].append(
(timestep.time, lane.id))
running.add(vehicle.id)
if lastEdge[vehicle.id] == edge.id:
laneChanges[vehicle.id] += 1
lastEdge[vehicle.id] = edge.id
for veh_id in running:
"-a", options.ptstops,
"--vehroute-output", options.routes,
"--stop-output", options.stopinfos, ])
print("done.")
print("creating routes...")
stopsUntil = {}
for stop in sumolib.output.parse_fast(options.stopinfos, 'stopinfo', ['id', 'ended', 'busStop']):
stopsUntil[stop.busStop] = stop.ended
with open(options.outfile, 'w') as foutflows:
flows = []
sumolib.writeXMLHeader(
foutflows, "$Id$", "routes")
foutflows.write('\t\n')
for vehicle in sumolib.output.parse(options.routes, 'vehicle'):
id = vehicle.id
flows.append(id)
edges = vehicle.routeDistribution[0]._child_dict['route'][1].edges
stops = vehicle.stop
foutflows.write(
'\t\n' % (id, edges))
for stop in stops:
foutflows.write(
'\t\t\n')
for flow in flows:
lineRef = trpIDLineMap[flow]
foutflows.write('\t\n' %
(flow, flow, options.begin, options.end, options.period, lineRef))
def write_diff(orig, new, out):
diffStats = defaultdict(Statistics)
with open(out, 'w') as f:
f.write("\n")
for interval_old, interval_new in zip(parse(orig, 'interval'), parse(new, 'interval')):
f.write(' \n' %
(interval_old.begin, interval_old.end))
for edge_old, edge_new in zip(interval_old.edge, interval_new.edge):
assert(edge_old.id == edge_new.id)
f.write(' \n")
def cut_trips(aEdges, options, validTaz):
areaEdges = set(aEdges)
num_trips = 0
num_returned = 0
for routeFile in options.routeFiles:
print("Parsing trips from %s" % routeFile)
for trip in parse(routeFile, 'trip'):
num_trips += 1
if trip.attr_from is not None and trip.attr_from not in areaEdges:
continue
if trip.to is not None and trip.to not in areaEdges:
continue
if trip.fromTaz is not None and trip.fromTaz not in validTaz:
continue
if trip.toTaz is not None and trip.toTaz not in validTaz:
continue
yield float(trip.depart), trip
num_returned += 1
print("Parsing persontrips from %s" % routeFile)
ignored_planitems = defaultdict(lambda: 0)
num_persons = 0
num_persontrips = 0
def buildEvaSite(inputFile, siteFile):
dots = 10
with open(siteFile, 'w') as out:
out.write("\n")
for poi in sumolib.output.parse(inputFile, "poi"):
lon = float(poi.lon)
lat = float(poi.lat)
radius = float(poi.radius) * 360 / 4008 / 1000
shape = ""
if poi.color is None:
poi.color = "0,0,255"
for i in range(dots):
angle = 2 * math.pi * i / dots
templon = lon + radius * math.cos(angle)
templat = lat + radius * \
math.sin(angle) * math.cos(math.radians(lat))
shape += "%s,%s " % (templon, templat)
out.write(' \n' %
(poi.id, poi.color, shape[:-1]))
out.write("\n")