1
0
mirror of https://github.com/Ahp06/SUMO_Emissions.git synced 2024-11-22 03:26:30 +00:00

Added TrafficLight model and actions on traffic lights

This commit is contained in:
Ahp06 2018-11-23 14:40:47 +01:00
parent 7972f4d794
commit 0df0897c72
3 changed files with 203 additions and 182 deletions

View File

@ -36,7 +36,9 @@ def limit_speed_into_area(area: Area, vehicles: Iterable[Vehicle], max_speed):
traci.lane.setMaxSpeed(lane.lane_id, max_speed/3.6) traci.lane.setMaxSpeed(lane.lane_id, max_speed/3.6)
def adjust_traffic_light_phase_duration(): def adjust_traffic_light_phase_duration(area,reduction_factor):
'''for tl_id in traci.trafficlight.getIDList(): for tl in area._tls:
print(traci.trafficlight.getCompleteRedYellowGreenDefinition(tl_id))''' phaseDuration = traci.trafficlight.getPhaseDuration(tl.tl_id)
traci.trafficlight.setPhaseDuration(tl.tl_id, phaseDuration*reduction_factor)

View File

@ -1,115 +1,121 @@
from typing import List from typing import List
import traci import traci
from shapely.geometry import LineString from shapely.geometry import LineString
import actions import actions
import config import config
import sys import sys
from model import Area, Vehicle, Lane from model import Area, Vehicle, Lane , TrafficLight
from traci import trafficlight
def init_grid(simulation_bounds, cells_number):
grid = list() def init_grid(simulation_bounds, cells_number):
width = simulation_bounds[1][0] / cells_number grid = list()
height = simulation_bounds[1][1] / cells_number width = simulation_bounds[1][0] / cells_number
for i in range(cells_number): height = simulation_bounds[1][1] / cells_number
for j in range(cells_number): for i in range(cells_number):
# bounds coordinates for the area : (xmin, ymin, xmax, ymax) for j in range(cells_number):
ar_bounds = ((i * width, j * height), (i * width, (j + 1) * height), # bounds coordinates for the area : (xmin, ymin, xmax, ymax)
((i + 1) * width, (j + 1) * height), ((i + 1) * width, j * height)) ar_bounds = ((i * width, j * height), (i * width, (j + 1) * height),
area = Area(ar_bounds) ((i + 1) * width, (j + 1) * height), ((i + 1) * width, j * height))
area.name = 'area ({},{})'.format(i, j) area = Area(ar_bounds)
grid.append(area) area.name = 'area ({},{})'.format(i, j)
traci.polygon.add(area.name, ar_bounds, (0, 255, 0)) grid.append(area)
return grid traci.polygon.add(area.name, ar_bounds, (0, 255, 0))
return grid
def compute_vehicle_emissions(veh_id):
return (traci.vehicle.getCOEmission(veh_id) def compute_vehicle_emissions(veh_id):
+ traci.vehicle.getNOxEmission(veh_id) return (traci.vehicle.getCOEmission(veh_id)
+ traci.vehicle.getHCEmission(veh_id) + traci.vehicle.getNOxEmission(veh_id)
+ traci.vehicle.getPMxEmission(veh_id) + traci.vehicle.getHCEmission(veh_id)
+ traci.vehicle.getCO2Emission(veh_id)) + traci.vehicle.getPMxEmission(veh_id)
+ traci.vehicle.getCO2Emission(veh_id))
def get_all_vehicles() -> List[Vehicle]:
vehicles = list() def get_all_vehicles() -> List[Vehicle]:
for veh_id in traci.vehicle.getIDList(): vehicles = list()
veh_pos = traci.vehicle.getPosition(veh_id) for veh_id in traci.vehicle.getIDList():
vehicle = Vehicle(veh_id, veh_pos) veh_pos = traci.vehicle.getPosition(veh_id)
vehicle.emissions = compute_vehicle_emissions(veh_id) vehicle = Vehicle(veh_id, veh_pos)
vehicles.append(vehicle) vehicle.emissions = compute_vehicle_emissions(veh_id)
return vehicles vehicles.append(vehicle)
return vehicles
def get_all_lanes() -> List[Lane]:
lanes = [] def get_all_lanes() -> List[Lane]:
for lane_id in traci.lane.getIDList(): lanes = []
polygon_lane = LineString(traci.lane.getShape(lane_id)) for lane_id in traci.lane.getIDList():
lanes.append(Lane(lane_id, polygon_lane)) polygon_lane = LineString(traci.lane.getShape(lane_id))
return lanes lanes.append(Lane(lane_id, polygon_lane))
return lanes
def get_emissions(grid: List[Area], vehicles: List[Vehicle]):
for area in grid: def get_emissions(grid: List[Area], vehicles: List[Vehicle]):
for vehicle in vehicles: for area in grid:
if vehicle.pos in area: for vehicle in vehicles:
area.emissions += vehicle.emissions if vehicle.pos in area:
if config.lock_mode and area.emissions > config.EMISSIONS_THRESHOLD and not area.locked: area.emissions += vehicle.emissions
actions.limit_speed_into_area(area, vehicles,30) if config.lock_mode and area.emissions > config.EMISSIONS_THRESHOLD and not area.locked:
traci.polygon.setColor(area.name, (255, 0, 0))
traci.polygon.setFilled(area.name, True) actions.limit_speed_into_area(area, vehicles,30)
actions.adjust_traffic_light_phase_duration(area, 0.5)
def add_lanes_to_areas(areas: List[Area]): traci.polygon.setColor(area.name, (255, 0, 0))
lanes = get_all_lanes() traci.polygon.setFilled(area.name, True)
for area in areas:
for lane in lanes:
if area.rectangle.intersects(lane.polygon): def add_data_to_areas(areas: List[Area]):
area.add_lane(lane) lanes = get_all_lanes()
for area in areas:
for lane in lanes:
def main(): if area.rectangle.intersects(lane.polygon):
grid = list() area.add_lane(lane)
try: for tl_id in traci.trafficlight.getIDList():
traci.start(config.sumo_cmd) if lane.lane_id in traci.trafficlight.getControlledLanes(tl_id):
grid = init_grid(traci.simulation.getNetBoundary(), config.CELLS_NUMBER) area.add_tl(TrafficLight(tl_id))
add_lanes_to_areas(grid)
actions.adjust_traffic_light_phase_duration()
def main():
step = 0 grid = list()
while step < config.n_steps : #traci.simulation.getMinExpectedNumber() > 0: try:
traci.simulationStep() traci.start(config.sumo_cmd)
grid = init_grid(traci.simulation.getNetBoundary(), config.CELLS_NUMBER)
vehicles = get_all_vehicles() add_data_to_areas(grid)
get_emissions(grid, vehicles)
step = 0
if config.routing_mode: while step < config.n_steps : #traci.simulation.getMinExpectedNumber() > 0:
actions.adjust_edges_weights() traci.simulationStep()
# actions.rerouteAllVehicles()
vehicles = get_all_vehicles()
step += 1 get_emissions(grid, vehicles)
progress = round(step/config.n_steps*100,2)
sys.stdout.write(f'Progress : {progress}%'+'\r') if config.routing_mode:
sys.stdout.flush() actions.adjust_edges_weights()
# actions.rerouteAllVehicles()
finally:
traci.close(False) step += 1
progress = round(step/config.n_steps*100,2)
total_emissions = 0 sys.stdout.write(f'Progress : {progress}%'+'\r')
for area in grid: sys.stdout.flush()
total_emissions += area.emissions
finally:
#Total of emissions of all pollutants in mg for 200 steps of simulation without locking areas traci.close(False)
total_emissions200 = 43970763.15084749
total_emissions = 0
print("\n**** RESULTS ****") for area in grid:
print(f'Total emissions = {total_emissions} mg') total_emissions += area.emissions
diff_with_lock = (total_emissions200 - total_emissions)/total_emissions200
print(f'Reduction percentage of emissions = {diff_with_lock*100} %') #Total of emissions of all pollutants in mg for 200 steps of simulation without locking areas
print("With the configuration :\n" + str(config.showConfig())) total_emissions200 = 43970763.15084749
if __name__ == '__main__': print("\n**** RESULTS ****")
main() print(f'Total emissions = {total_emissions} mg')
diff_with_lock = (total_emissions200 - total_emissions)/total_emissions200
print(f'Reduction percentage of emissions = {diff_with_lock*100} %')
print("With the configuration :\n" + str(config.showConfig()))
if __name__ == '__main__':
main()

View File

@ -1,64 +1,77 @@
from typing import Tuple, Set from typing import Tuple, Set
from shapely.geometry import Point, LineString from shapely.geometry import Point, LineString
from shapely.geometry import Polygon from shapely.geometry import Polygon
from shapely.geometry.base import BaseGeometry from shapely.geometry.base import BaseGeometry
class Lane: class Lane:
def __init__(self, lane_id: str, polygon: LineString): def __init__(self, lane_id: str, polygon: LineString):
self.polygon = polygon self.polygon = polygon
self.lane_id = lane_id self.lane_id = lane_id
def __hash__(self): def __hash__(self):
"""Overrides the default implementation""" """Overrides the default implementation"""
return hash(self.lane_id) return hash(self.lane_id)
class TrafficLight:
class Area:
def __init__(self, tl_id: str):
def __init__(self, coords, name=''): self.tl_id = tl_id
self.locked = False
self.rectangle = Polygon(coords) def __hash__(self):
self.name = name """Overrides the default implementation"""
self.emissions = 0.0 return hash(self.tl_id)
self._lanes: Set[Lane] = set()
def __eq__(self, other): class Area:
return self.rectangle.__eq__(other)
def __init__(self, coords, name=''):
def __contains__(self, item): self.locked = False
return self.rectangle.contains(item) self.rectangle = Polygon(coords)
self.name = name
@property self.emissions = 0.0
def bounds(self): self._lanes: Set[Lane] = set()
return self.rectangle.bounds self._tls: Set[TrafficLight] = set()
def intersects(self, other: BaseGeometry) -> bool: def __eq__(self, other):
return self.rectangle.intersects(other) return self.rectangle.__eq__(other)
def add_lane(self, lane: Lane): def __contains__(self, item):
self._lanes.add(lane) return self.rectangle.contains(item)
def remove_lane(self, lane: Lane): @property
self._lanes.remove(lane) def bounds(self):
return self.rectangle.bounds
@classmethod
def from_bounds(cls, xmin, ymin, xmax, ymax): def intersects(self, other: BaseGeometry) -> bool:
return cls(( return self.rectangle.intersects(other)
(xmin, ymin),
(xmin, ymax), def add_lane(self, lane: Lane):
(xmax, ymax), self._lanes.add(lane)
(xmax, ymin)))
def add_tl(self, tl: TrafficLight):
self._tls.add(tl)
class Vehicle:
def remove_lane(self, lane: Lane):
def __init__(self, veh_id: int, pos: Tuple[float, float]): self._lanes.remove(lane)
self.emissions: float = 0.0
self.veh_id = veh_id @classmethod
self.pos = Point(pos) def from_bounds(cls, xmin, ymin, xmax, ymax):
return cls((
def __repr__(self) -> str: (xmin, ymin),
return str(self.__dict__) (xmin, ymax),
(xmax, ymax),
(xmax, ymin)))
class Vehicle:
def __init__(self, veh_id: int, pos: Tuple[float, float]):
self.emissions: float = 0.0
self.veh_id = veh_id
self.pos = Point(pos)
def __repr__(self) -> str:
return str(self.__dict__)