Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from __future__ import print_function
from __future__ import absolute_import
import os
import sys
if 'SUMO_HOME' in os.environ:
tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
sys.path.append(tools)
else:
sys.exit("please declare environment variable 'SUMO_HOME'")
import sumolib # noqa
import traci # noqa
sumoBinary = sumolib.checkBinary("sumo")
traci.start([sumoBinary,
'-n', 'input_net.net.xml',
'-r', 'input_routes.rou.xml',
'--no-step-log',
# '-S', '-Q'
])
vehID = 'v0'
traci.simulationStep()
while traci.simulation.getTime() < 75:
pos = traci.vehicle.getPosition3D(vehID)
pos2 = (pos[0] + 20, pos[1])
traci.vehicle.moveToXY('v0', '', 0, pos2[0], pos2[1], keepRoute=2)
traci.simulationStep()
pos3 = traci.vehicle.getPosition3D(vehID)
fdo = open("results.csv", "w")
for departPos in "random free random_free base pwagSimple pwagGeneric maxSpeedGap".split():
print(">>> Building the routes (for departPos %s)" % departPos)
fd = open("input_routes.rou.xml", "w")
print("""
""" % (3 * (departPos, DEPARTSPEED, PERIOD)), file=fd)
fd.close()
print(">>> Simulating ((for departPos %s)" % departPos)
call([sumoBinary, "-c", "sumo.sumocfg", "-v"])
dump = sumolib.output.dump.readDump("aggregated.xml", ["entered"])
print("%s;%s" % (departPos, dump.get("entered")[-1]["1si"]), file=fdo)
if os.path.exists(departPos + "_aggregated.xml"):
os.remove(departPos + "_aggregated.xml")
os.rename("aggregated.xml", departPos + "_aggregated.xml")
fdo.close()
# @author Jakob Erdmann
# @date 2017-01-23
from __future__ import print_function
from __future__ import absolute_import
import os
import subprocess
import sys
sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools'))
import traci # noqa
import sumolib # noqa
import traci.constants as tc # noqa
sumoBinary = os.environ["SUMO_BINARY"]
PORT = sumolib.miscutils.getFreeSocketPort()
sumoProcess = subprocess.Popen([sumoBinary,
'-n', 'input_net.net.xml',
'-r', 'input_routes.rou.xml',
'--no-step-log',
'--begin', '5',
# '-S', '-Q',
'--remote-port', str(PORT)], stdout=sys.stdout)
traci.init(PORT)
vehID = "v0"
traci.vehicle.add(vehID, "r0")
traci.vehicle.subscribeContext(vehID, tc.CMD_GET_VEHICLE_VARIABLE,
dist=20, begin=0, end=99999999)
traci.simulationStep(10.)
traci.close()
# @date 2010-02-20
from __future__ import absolute_import
from __future__ import print_function
import os
import subprocess
import sys
import time
from multiprocessing import Process, freeze_support
sys.path.append(os.path.join(os.environ["SUMO_HOME"], "tools"))
import sumolib # noqa
import traci # noqa
PORT = sumolib.miscutils.getFreeSocketPort()
DELTA_T = 1000
sumoBinary = sumolib.checkBinary(sys.argv[1])
def traciLoop(port, traciEndTime, index, orderOdd):
orderTime = 0.25
time.sleep(orderTime * index) # assure ordering of outputs
print("Starting process %s" % (index))
sys.stdout.flush()
time.sleep(orderTime * index) # assure ordering of outputs
step = 1
try:
traci.init(port)
if orderOdd and index % 2 == 1:
traci.setOrder(index)
sumoStop = False
# @author Michael Behrisch
# @author Jakob Erdmann
# @date 2008-05-12
from __future__ import absolute_import
from __future__ import print_function
import os
import subprocess
import sys
import time
THIS_DIR = os.path.abspath(os.path.dirname(__file__))
sys.path.append(os.path.join(THIS_DIR, '..', "tools"))
import sumolib # noqa
PORT = str(sumolib.miscutils.getFreeSocketPort())
server_args = sys.argv[1:] + ["--remote-port", PORT]
binaryDir, server = os.path.split(server_args[0])
client = "TraCITestClient"
if server.endswith("D") or server.endswith("D.exe"):
client += "D"
client_args = [os.path.join(binaryDir, client), "-def",
"testclient.prog", "-o", "testclient_out.txt", "-p", PORT]
# start sumo as server
serverProcess = subprocess.Popen(
server_args, stdout=sys.stdout, stderr=sys.stderr)
success = False
for retry in range(7):
time.sleep(retry)
clientProcess = subprocess.Popen(
import subprocess
import sys
import shutil
sys.path.append(os.path.join(os.environ["SUMO_HOME"], "tools"))
import sumolib # noqa
import traci # noqa
sumoBinary = sumolib.checkBinary(sys.argv[1])
if sys.argv[1] == "sumo":
addOption = []
secondConfig = "sumo.sumocfg"
else:
addOption = ["-S", "-Q"]
secondConfig = "sumo_log.sumocfg"
PORT = sumolib.miscutils.getFreeSocketPort()
sumoProc = subprocess.Popen(
[sumoBinary, "-c", "sumo.sumocfg", "--remote-port", str(PORT)] + addOption)
traci.init(PORT)
subprocess.call([sumoBinary, "-c", secondConfig,
"--remote-port", str(PORT)] + addOption)
step = 0
while not step > 100:
traci.simulationStep()
vehs = traci.vehicle.getIDList()
if vehs.index("horiz") < 0 or len(vehs) > 1:
print("Something is false")
step += 1
traci.close()
sumoProc.wait()
sys.stdout.flush()
from __future__ import absolute_import
import os
import subprocess
import sys
sys.path.append(os.path.join(os.environ["SUMO_HOME"], "tools"))
import sumolib # noqa
import traci # noqa
sumoBinary = sumolib.checkBinary(sys.argv[1])
if sys.argv[1] == "sumo":
addOption = ""
else:
addOption = "-S -Q"
PORT = sumolib.miscutils.getFreeSocketPort()
def run():
"""execute the TraCI control loop"""
traci.init(PORT)
step = 0
while traci.simulation.getMinExpectedNumber() > 0 and step < 100:
traci.simulationStep()
step += 1
if step == 4:
traci.trafficlight.setProgram("center", "0")
traci.close()
sys.stdout.flush()
sumoProcess = subprocess.Popen([sumoBinary,
from __future__ import absolute_import
from __future__ import print_function
import os
import subprocess
import sys
import time
import math
from multiprocessing import Process, freeze_support
sys.path.append(os.path.join(os.environ["SUMO_HOME"], "tools"))
import sumolib # noqa
import traci # noqa
PORT = sumolib.miscutils.getFreeSocketPort()
DELTA_T = 1000
sumoBinary = sumolib.checkBinary(sys.argv[1])
def traciLoop(port, traciEndTime, index, SUMOsteplength, steplength=0):
orderTime = 0.25
time.sleep(orderTime * index) # assure ordering of outputs
if steplength == 0:
steplength = DELTA_T / 1000.
print("Starting process %s with steplength %s" % (index, steplength))
sys.stdout.flush()
traci.init(port)
traci.setOrder(index)
step = 1
nrEnteredVehicles = 0
sumoStop = False
from __future__ import absolute_import
from __future__ import print_function
import os
import subprocess
import sys
import time
import math
from multiprocessing import Process, freeze_support
sys.path.append(os.path.join(os.environ["SUMO_HOME"], "tools"))
import sumolib # noqa
import traci # noqa
import traci.constants as tc # noqa
PORT = sumolib.miscutils.getFreeSocketPort()
DELTA_T = 1000
sumoBinary = sumolib.checkBinary(sys.argv[1])
def traciLoop(port, traciEndTime, i, runNr, steplength=0):
orderTime = 0.25
time.sleep(orderTime * i) # assure ordering of outputs
if steplength == 0:
steplength = DELTA_T / 1000.
# order index dependent on runNr
index = i if (runNr % 2 == 0) else 10 - i
sys.stdout.flush()
traci.init(port)
traci.setOrder(index)
message = ("Starting process %s (order: %s) with steplength %s\n" % (i, index, steplength))
step = 1
# @file runner.py
# @author Laura Bieker
# @date 2017-05-23
from __future__ import print_function
from __future__ import absolute_import
import os
import subprocess
import sys
sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools'))
import traci # noqa
import sumolib # noqa
sumoBinary = os.environ["SUMO_BINARY"]
PORT = sumolib.miscutils.getFreeSocketPort()
sumoProcess = subprocess.Popen([sumoBinary,
'-c', 'sumo.sumocfg',
'--fcd-output', 'fcd.xml',
'-S', '-Q',
'--remote-port', str(PORT)], stdout=sys.stdout)
traci.init(PORT)
traci.simulationStep()
for i in range(45):
traci.simulationStep()
traci.vehicle.setSpeedMode("rescue", 7)
traci.vehicle.setSpeed("rescue", 13.9)
traci.trafficlight.setRedYellowGreenState("C", "rrrrrrrrrrrrrrrrrr")
for i in range(21):
traci.simulationStep()
traci.close()