Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import os
import sys
from collections import defaultdict
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sumolib.output import parse, parse_fast
route_file, keep_file = sys.argv[1:]
edges = set()
for route in parse_fast(route_file, 'route', ['edges']):
edges.update(route.edges.split())
for walk in parse_fast(route_file, 'walk', ['edges']):
edges.update(walk.edges.split())
# warn about potentially missing edges
for trip in parse_fast(route_file, 'trip', ['id', 'from', 'to']):
edges.update([trip.attr_from, trip.to])
print(
"Warning: Trip %s is not guaranteed to be connected within the extacted edges." % trip.id)
for walk in parse_fast(route_file, 'walk', ['from', 'to']):
edges.update([walk.attr_from, walk.to])
print("Warning: Walk from %s to %s is not guaranteed to be connected within the extacted edges." % (
walk.attr_from, walk.to))
with open(keep_file, 'w') as outf:
outf.write(','.join(edges) + '\n')
This file is part of SUMO.
SUMO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
"""
import os
import sys
from collections import defaultdict
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sumolib.output import parse, parse_fast
route_file, keep_file = sys.argv[1:]
edges = set()
for route in parse_fast(route_file, 'route', ['edges']):
edges.update(route.edges.split())
for walk in parse_fast(route_file, 'walk', ['edges']):
edges.update(walk.edges.split())
# warn about potentially missing edges
for trip in parse_fast(route_file, 'trip', ['id', 'from', 'to']):
edges.update([trip.attr_from, trip.to])
print(
"Warning: Trip %s is not guaranteed to be connected within the extacted edges." % trip.id)
for walk in parse_fast(route_file, 'walk', ['from', 'to']):
edges.update([walk.attr_from, walk.to])
print("Warning: Walk from %s to %s is not guaranteed to be connected within the extacted edges." % (
walk.attr_from, walk.to))
with open(keep_file, 'w') as outf:
outf.write(','.join(edges) + '\n')
def getTLPairs(net, routeFile, speedFactor):
# pairs of traffic lights
TLPairs = {} # PairKey -> PairData
for route in parse_fast(routeFile, 'route', ['edges']):
rTLSList = getTLSInRoute(net, route.edges.split())
for oldTL, TLelement in zip(rTLSList[:-1], rTLSList[1:]):
key = PairKey(oldTL.edgeID, TLelement.edgeID, oldTL.dist)
numVehicles = 0 if key not in TLPairs else TLPairs[key].numVehicles
tl = net.getEdge(TLelement.edgeID).getTLS()
otl = net.getEdge(oldTL.edgeID).getTLS()
edge = net.getEdge(TLelement.edgeID)
connection = TLelement.connection
oconnection = oldTL.connection
ogreen = getFirstGreenOffset(otl, oconnection)
green = getFirstGreenOffset(tl, connection)
travelTime = TLelement.time / speedFactor
def main():
options = parse_args()
edges = set()
for route in parse_fast(options.routefile, 'route', ['edges']):
edges.update(route.edges.split())
for walk in parse_fast(options.routefile, 'walk', ['edges']):
edges.update(walk.edges.split())
# warn about potentially missing edges
for trip in parse_fast(options.routefile, 'trip', ['id', 'from', 'to']):
edges.update([trip.attr_from, trip.to])
print(
"Warning: Trip %s is not guaranteed to be connected within the extracted edges." % trip.id)
for walk in parse_fast(options.routefile, 'walk', ['from', 'to']):
edges.update([walk.attr_from, walk.to])
print("Warning: Walk from %s to %s is not guaranteed to be connected within the extracted edges." % (
walk.attr_from, walk.to))
with open(options.outfile, 'w') as outf:
for e in sorted(list(edges)):
outf.write('edge:%s\n' % e)
def main():
options = parse_args()
departCounts = defaultdict(lambda: 0)
arrivalCounts = defaultdict(lambda: 0)
for route in parse_fast(options.routefile, 'route', ['edges']):
edges = route.edges.split()
if options.subpart is not None and not hasSubpart(edges, options.subpart):
continue
departCounts[edges[0]] += 1
arrivalCounts[edges[-1]] += 1
for walk in parse_fast(options.routefile, 'walk', ['edges']):
edges = walk.edges.split()
if options.subpart is not None and not hasSubpart(edges, options.subpart):
continue
departCounts[edges[0]] += 1
arrivalCounts[edges[-1]] += 1
# warn about potentially missing edges
for trip in parse_fast(options.routefile, 'trip', ['id', 'fromTaz', 'toTaz']):
if options.subpart is not None:
sys.stderr.print("Warning: Ignoring trips when using --subpart")
def __init__(self, fname):
self.weights = defaultdict(lambda: 0)
for edge in sumolib.output.parse_fast(fname, 'edge', ['id', 'value']):
self.weights[edge.id] = float(edge.value)
def createRoutes(options, trpMap, stopNames):
print("creating routes...")
stopsUntil = collections.defaultdict(list)
for stop in sumolib.output.parse_fast(options.stopinfos, 'stopinfo', ['id', 'ended', 'busStop']):
stopsUntil[(stop.id, stop.busStop)].append(float(stop.ended))
ft = formatTime if options.hrtime else lambda x: x
with codecs.open(options.outfile, 'w', encoding="UTF8") as foutflows:
flows = []
actualDepart = {} # departure may be delayed when the edge is not yet empty
sumolib.writeXMLHeader(
foutflows, "$Id: ptlines2flows.py v1_3_1+0313-ccb31df3eb jakob.erdmann@dlr.de 2019-09-02 13:26:32 +0200 $",
"routes")
if not options.novtypes:
writeTypes(foutflows, options.vtypeprefix)
collections.defaultdict(int)
for vehicle in sumolib.output.parse(options.routes, 'vehicle'):
id = vehicle.id
lineRef, name, completeness, period = trpMap[id]
print("generating trips...")
stopsLanes = {}
stopNames = {}
for stop in sumolib.output.parse(options.ptstops, 'busStop'):
stopsLanes[stop.id] = stop.lane
if stop.name:
stopNames[stop.id] = stop.attr_name
trpMap = {}
with codecs.open(options.trips, 'w', encoding="UTF8") as fouttrips:
sumolib.writeXMLHeader(
fouttrips, "$Id: ptlines2flows.py v1_3_1+0313-ccb31df3eb jakob.erdmann@dlr.de 2019-09-02 13:26:32 +0200 $",
"routes")
writeTypes(fouttrips, options.vtypeprefix)
departTimes = [options.begin for line in sumolib.output.parse_fast(options.ptlines, 'ptLine', ['id'])]
if options.randomBegin:
departTimes = sorted([options.begin
+ int(random.random() * options.period) for t in departTimes])
lineCount = collections.defaultdict(int)
typeCount = collections.defaultdict(int)
numLines = 0
numStops = 0
numSkipped = 0
for trp_nr, line in enumerate(sumolib.output.parse(options.ptlines, 'ptLine', heterogeneous=True)):
stop_ids = []
if not line.hasAttribute("period"):
line.setAttribute("period", options.period)
if line.busStop is not None:
for stop in line.busStop:
if stop.id not in stopsLanes:
def parse_trip_durations():
result = []
for file in sorted(glob.glob("tripinfo_*.xml")):
result.append([float(t.duration)
for t in parse_fast(file, 'tripinfo', ['duration'])])
return result
if detId in sources:
countIn += total
if detId in sinks:
countOut += total
print("detIn: %s detOut: %s" % (countIn, countOut))
totalSim = collections.defaultdict(int)
if options.validation:
c.clear()
v.clear()
countIn = 0
countOut = 0
start = 0
end = options.interval
#
for interval in sumolib.output.parse_fast(options.validation, "interval", ["begin", "id", "speed", "nVehEntered"]):
detId = interval.id[11:]
time = int(float(interval.begin) / 60)
if time >= end:
start = end
end += options.interval
for det, vals in sims.iteritems():
if c[det] > 0:
vals.append((time, c[det], v[det] / c[det]))
c.clear()
v.clear()
c[detId] += int(interval.nVehEntered)
totalSim[detId] += int(interval.nVehEntered)
v[detId] += 3.6 * int(interval.nVehEntered) * float(interval.speed)
if detId in sources:
countIn += int(interval.nVehEntered)
if detId in sinks: