1
0
mirror of https://github.com/Ahp06/SUMO_Emissions.git synced 2024-11-22 03:26:30 +00:00

Fixed tls duration mode

This commit is contained in:
Ahp06 2018-12-03 21:05:01 +01:00
parent 27ecd5c780
commit 07155c639f
4 changed files with 75 additions and 31 deletions

View File

@ -6,11 +6,10 @@ Created on 17 oct. 2018
from typing import Iterable from typing import Iterable
import traci import traci
import inspect
from shapely.geometry.linestring import LineString from shapely.geometry.linestring import LineString
from model import Area, Vehicle from model import Area, Vehicle
from traci._trafficlight import Logic, Phase from traci._trafficlight import Logic
def remove_vehicle(veh_id): def remove_vehicle(veh_id):
@ -34,16 +33,19 @@ def limit_speed_into_area(area: Area, vehicles: Iterable[Vehicle], max_speed):
for lane in area._lanes: for lane in area._lanes:
traci.lane.setMaxSpeed(lane.lane_id, max_speed/3.6) traci.lane.setMaxSpeed(lane.lane_id, max_speed/3.6)
def modifyLogic(logic, rf): #rf for "reduction factor"
new_phases = []
for phase in logic._phases:
new_phase = traci.trafficlight.Phase(phase.duration*rf,phase.minDuration*rf,phase.maxDuration*rf,phase.phaseDef)
new_phases.append(new_phase)
return traci.trafficlight.Logic("new-program", 0 , 0 , 0 , new_phases)
def adjust_traffic_light_phase_duration(area, reduction_factor): def adjust_traffic_light_phase_duration(area, reduction_factor):
#attributes = inspect.getmembers(Phase, lambda a:not(inspect.isroutine(a))) print(f'Decrease of traffic lights duration by a factor of {reduction_factor}')
#print ([a[0] for a in attributes])
for tl in area._tls: for tl in area._tls:
for logic in tl._logics: for logic in tl._logics:
phases = traci.trafficlight.Logic.getPhases(logic) traci.trafficlights.setCompleteRedYellowGreenDefinition(tl.tl_id, modifyLogic(logic,reduction_factor))
for phase in phases:
print(phase)
#phaseDuration = traci.trafficlight.getPhaseDuration(tl.tl_id) #phaseDuration = traci.trafficlight.getPhaseDuration(tl.tl_id)
#traci.trafficlight.setPhaseDuration(tl.tl_id, phaseDuration*reduction_factor) #traci.trafficlight.setPhaseDuration(tl.tl_id, phaseDuration*reduction_factor)

View File

@ -17,10 +17,19 @@ CELLS_NUMBER = 10
EMISSIONS_THRESHOLD = 500000 EMISSIONS_THRESHOLD = 500000
n_steps = 200 n_steps = 200
#Limit the speed into areas when the threshold is exceeded
limit_speed_mode = True limit_speed_mode = True
#Vehicles are routed according to the less polluted route
weight_routing_mode = False weight_routing_mode = False
#Decrease all traffic lights duration into the area when the threshold is exceeded
rf_trafficLights_duration = 0.2
adjust_traffic_light_mode = True adjust_traffic_light_mode = True
#Weight routing mode cannot be combinated with other actions
if weight_routing_mode:
limit_speed_mode = False
adjust_traffic_light_mode = False
sumo_binary = os.path.join(os.environ['SUMO_HOME'], 'bin', _SUMOCMD) sumo_binary = os.path.join(os.environ['SUMO_HOME'], 'bin', _SUMOCMD)
sumo_cmd = [sumo_binary, "-c", _SUMOCFG] sumo_cmd = [sumo_binary, "-c", _SUMOCFG]
@ -29,5 +38,5 @@ def showConfig():
+ str(f'step number = {n_steps}\n') + str(f'step number = {n_steps}\n')
+ str(f'limit speed mode = {limit_speed_mode}\n') + str(f'limit speed mode = {limit_speed_mode}\n')
+ str(f'weight routing mode= {weight_routing_mode}\n') + str(f'weight routing mode= {weight_routing_mode}\n')
+ str(f'adjust traffic light mode = {adjust_traffic_light_mode}\n')) + str(f'adjust traffic light mode = {adjust_traffic_light_mode} , RF = {rf_trafficLights_duration}\n'))

View File

@ -2,11 +2,12 @@ from typing import List
import traci import traci
from shapely.geometry import LineString from shapely.geometry import LineString
from parse import *
import actions import actions
import config import config
import sys import sys
from model import Area, Vehicle, Lane , TrafficLight from model import Area, Vehicle, Lane , TrafficLight , Phase , Logic
from traci import trafficlight from traci import trafficlight
@ -63,7 +64,19 @@ def get_emissions(grid: List[Area], vehicles: List[Vehicle]):
traci.polygon.setColor(area.name, (255, 0, 0)) traci.polygon.setColor(area.name, (255, 0, 0))
traci.polygon.setFilled(area.name, True) traci.polygon.setFilled(area.name, True)
if config.adjust_traffic_light_mode: if config.adjust_traffic_light_mode:
actions.adjust_traffic_light_phase_duration(area, 0.75) actions.adjust_traffic_light_phase_duration(area, config.rf_trafficLights_duration)
def parsePhase(phase_repr):
duration = search('duration: {:f}', phase_repr)
minDuration = search('minDuration: {:f}', phase_repr)
maxDuration = search('maxDuration: {:f}', phase_repr)
phaseDef = search('phaseDef: {}\n', phase_repr)
if phaseDef is None: phaseDef = ''
else : phaseDef = phaseDef[0]
return Phase(duration[0], minDuration[0], maxDuration[0], phaseDef)
def add_data_to_areas(areas: List[Area]): def add_data_to_areas(areas: List[Area]):
@ -73,12 +86,16 @@ def add_data_to_areas(areas: List[Area]):
if area.rectangle.intersects(lane.polygon): if area.rectangle.intersects(lane.polygon):
area.add_lane(lane) area.add_lane(lane)
for tl_id in traci.trafficlight.getIDList(): # add traffic lights for tl_id in traci.trafficlight.getIDList(): # add traffic lights
logics = traci.trafficlight.getCompleteRedYellowGreenDefinition(tl_id)
if lane.lane_id in traci.trafficlight.getControlledLanes(tl_id): if lane.lane_id in traci.trafficlight.getControlledLanes(tl_id):
logics = []
for l in traci.trafficlight.getCompleteRedYellowGreenDefinition(tl_id): #add logics
phases = []
for phase in traci.trafficlight.Logic.getPhases(l): #add phases to logics
phases.append(parsePhase(phase.__repr__()))
logics.append(Logic(l,phases))
area.add_tl(TrafficLight(tl_id,logics)) area.add_tl(TrafficLight(tl_id,logics))
def main(): def main():
grid = list() grid = list()
try: try:
@ -108,8 +125,9 @@ def main():
for area in grid: for area in grid:
total_emissions += area.emissions total_emissions += area.emissions
#Total of emissions of all pollutants in mg for 200 steps of simulation without locking areas # Total of emissions of all pollutants in mg for n steps of simulation without locking areas
total_emissions200 = 43970763.15084749 total_emissions200 = 43970763.15084749
total_emissions300 = 87382632.08217141
print("\n**** RESULTS ****") print("\n**** RESULTS ****")
print(f'Total emissions = {total_emissions} mg') print(f'Total emissions = {total_emissions} mg')
@ -117,5 +135,6 @@ def main():
print(f'Reduction percentage of emissions = {diff_with_lock*100} %') print(f'Reduction percentage of emissions = {diff_with_lock*100} %')
print("With the configuration :\n" + str(config.showConfig())) print("With the configuration :\n" + str(config.showConfig()))
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -3,7 +3,7 @@ from typing import Tuple, Set
from shapely.geometry import Point, LineString from shapely.geometry import Point, LineString
from shapely.geometry import Polygon from shapely.geometry import Polygon
from shapely.geometry.base import BaseGeometry from shapely.geometry.base import BaseGeometry
from traci._trafficlight import Logic from traci._trafficlight import Logic as SUMO_Logic
class Lane: class Lane:
@ -17,6 +17,22 @@ class Lane:
"""Overrides the default implementation""" """Overrides the default implementation"""
return hash(self.lane_id) return hash(self.lane_id)
class Phase:
def __init__(self, duration: float, minDuration: float, maxDuration : float, phaseDef: str):
self.duration = duration
self.minDuration = minDuration
self.maxDuration = maxDuration
self.phaseDef = phaseDef
def __repr__(self) -> str:
repr = f'Phase(duration:{self.duration},minDuration:{self.minDuration},maxDuration:{self.maxDuration},phaseDef:{self.phaseDef})'
return str(repr)
class Logic:
def __init__(self, logic: SUMO_Logic, phases: Set[Phase]):
self._logic = logic
self._phases: Set[Phase] = phases
class TrafficLight: class TrafficLight:
def __init__(self, tl_id: str, logics: Set[Logic]): def __init__(self, tl_id: str, logics: Set[Logic]):
@ -27,7 +43,6 @@ class TrafficLight:
"""Overrides the default implementation""" """Overrides the default implementation"""
return hash(self.tl_id) return hash(self.tl_id)
class Area: class Area:
def __init__(self, coords, name=''): def __init__(self, coords, name=''):
@ -68,7 +83,6 @@ class Area:
(xmax, ymax), (xmax, ymax),
(xmax, ymin))) (xmax, ymin)))
class Vehicle: class Vehicle:
def __init__(self, veh_id: int, pos: Tuple[float, float]): def __init__(self, veh_id: int, pos: Tuple[float, float]):