From 0df0897c7206d14c96cba9767a7b4016be8ddf9c Mon Sep 17 00:00:00 2001 From: Ahp06 Date: Fri, 23 Nov 2018 14:40:47 +0100 Subject: [PATCH] Added TrafficLight model and actions on traffic lights --- sumo_project/actions.py | 8 +- sumo_project/emissions.py | 236 +++++++++++++++++++------------------- sumo_project/model.py | 141 ++++++++++++----------- 3 files changed, 203 insertions(+), 182 deletions(-) diff --git a/sumo_project/actions.py b/sumo_project/actions.py index 298ebe2..a0f6df9 100644 --- a/sumo_project/actions.py +++ b/sumo_project/actions.py @@ -36,7 +36,9 @@ def limit_speed_into_area(area: Area, vehicles: Iterable[Vehicle], max_speed): traci.lane.setMaxSpeed(lane.lane_id, max_speed/3.6) -def adjust_traffic_light_phase_duration(): - '''for tl_id in traci.trafficlight.getIDList(): - print(traci.trafficlight.getCompleteRedYellowGreenDefinition(tl_id))''' +def adjust_traffic_light_phase_duration(area,reduction_factor): + for tl in area._tls: + phaseDuration = traci.trafficlight.getPhaseDuration(tl.tl_id) + traci.trafficlight.setPhaseDuration(tl.tl_id, phaseDuration*reduction_factor) + \ No newline at end of file diff --git a/sumo_project/emissions.py b/sumo_project/emissions.py index f199357..c4f7742 100644 --- a/sumo_project/emissions.py +++ b/sumo_project/emissions.py @@ -1,115 +1,121 @@ -from typing import List - -import traci -from shapely.geometry import LineString - -import actions -import config -import sys -from model import Area, Vehicle, Lane - - -def init_grid(simulation_bounds, cells_number): - grid = list() - width = simulation_bounds[1][0] / cells_number - height = simulation_bounds[1][1] / cells_number - for i in range(cells_number): - for j in range(cells_number): - # bounds coordinates for the area : (xmin, ymin, xmax, ymax) - ar_bounds = ((i * width, j * height), (i * width, (j + 1) * height), - ((i + 1) * width, (j + 1) * height), ((i + 1) * width, j * height)) - area = Area(ar_bounds) - area.name = 'area ({},{})'.format(i, j) - grid.append(area) - traci.polygon.add(area.name, ar_bounds, (0, 255, 0)) - return grid - - -def compute_vehicle_emissions(veh_id): - return (traci.vehicle.getCOEmission(veh_id) - + traci.vehicle.getNOxEmission(veh_id) - + traci.vehicle.getHCEmission(veh_id) - + traci.vehicle.getPMxEmission(veh_id) - + traci.vehicle.getCO2Emission(veh_id)) - - -def get_all_vehicles() -> List[Vehicle]: - vehicles = list() - for veh_id in traci.vehicle.getIDList(): - veh_pos = traci.vehicle.getPosition(veh_id) - vehicle = Vehicle(veh_id, veh_pos) - vehicle.emissions = compute_vehicle_emissions(veh_id) - vehicles.append(vehicle) - return vehicles - - -def get_all_lanes() -> List[Lane]: - lanes = [] - for lane_id in traci.lane.getIDList(): - polygon_lane = LineString(traci.lane.getShape(lane_id)) - lanes.append(Lane(lane_id, polygon_lane)) - return lanes - - -def get_emissions(grid: List[Area], vehicles: List[Vehicle]): - for area in grid: - for vehicle in vehicles: - if vehicle.pos in area: - area.emissions += vehicle.emissions - if config.lock_mode and area.emissions > config.EMISSIONS_THRESHOLD and not area.locked: - actions.limit_speed_into_area(area, vehicles,30) - traci.polygon.setColor(area.name, (255, 0, 0)) - traci.polygon.setFilled(area.name, True) - - -def add_lanes_to_areas(areas: List[Area]): - lanes = get_all_lanes() - for area in areas: - for lane in lanes: - if area.rectangle.intersects(lane.polygon): - area.add_lane(lane) - - -def main(): - grid = list() - try: - traci.start(config.sumo_cmd) - grid = init_grid(traci.simulation.getNetBoundary(), config.CELLS_NUMBER) - add_lanes_to_areas(grid) - - actions.adjust_traffic_light_phase_duration() - - step = 0 - while step < config.n_steps : #traci.simulation.getMinExpectedNumber() > 0: - traci.simulationStep() - - vehicles = get_all_vehicles() - get_emissions(grid, vehicles) - - if config.routing_mode: - actions.adjust_edges_weights() - # actions.rerouteAllVehicles() - - step += 1 - progress = round(step/config.n_steps*100,2) - sys.stdout.write(f'Progress : {progress}%'+'\r') - sys.stdout.flush() - - finally: - traci.close(False) - - total_emissions = 0 - for area in grid: - total_emissions += area.emissions - - #Total of emissions of all pollutants in mg for 200 steps of simulation without locking areas - total_emissions200 = 43970763.15084749 - - print("\n**** RESULTS ****") - print(f'Total emissions = {total_emissions} mg') - diff_with_lock = (total_emissions200 - total_emissions)/total_emissions200 - print(f'Reduction percentage of emissions = {diff_with_lock*100} %') - print("With the configuration :\n" + str(config.showConfig())) - -if __name__ == '__main__': - main() +from typing import List + +import traci +from shapely.geometry import LineString + +import actions +import config +import sys +from model import Area, Vehicle, Lane , TrafficLight +from traci import trafficlight + + +def init_grid(simulation_bounds, cells_number): + grid = list() + width = simulation_bounds[1][0] / cells_number + height = simulation_bounds[1][1] / cells_number + for i in range(cells_number): + for j in range(cells_number): + # bounds coordinates for the area : (xmin, ymin, xmax, ymax) + ar_bounds = ((i * width, j * height), (i * width, (j + 1) * height), + ((i + 1) * width, (j + 1) * height), ((i + 1) * width, j * height)) + area = Area(ar_bounds) + area.name = 'area ({},{})'.format(i, j) + grid.append(area) + traci.polygon.add(area.name, ar_bounds, (0, 255, 0)) + return grid + + +def compute_vehicle_emissions(veh_id): + return (traci.vehicle.getCOEmission(veh_id) + + traci.vehicle.getNOxEmission(veh_id) + + traci.vehicle.getHCEmission(veh_id) + + traci.vehicle.getPMxEmission(veh_id) + + traci.vehicle.getCO2Emission(veh_id)) + + +def get_all_vehicles() -> List[Vehicle]: + vehicles = list() + for veh_id in traci.vehicle.getIDList(): + veh_pos = traci.vehicle.getPosition(veh_id) + vehicle = Vehicle(veh_id, veh_pos) + vehicle.emissions = compute_vehicle_emissions(veh_id) + vehicles.append(vehicle) + return vehicles + + +def get_all_lanes() -> List[Lane]: + lanes = [] + for lane_id in traci.lane.getIDList(): + polygon_lane = LineString(traci.lane.getShape(lane_id)) + lanes.append(Lane(lane_id, polygon_lane)) + return lanes + + +def get_emissions(grid: List[Area], vehicles: List[Vehicle]): + for area in grid: + for vehicle in vehicles: + if vehicle.pos in area: + area.emissions += vehicle.emissions + if config.lock_mode and area.emissions > config.EMISSIONS_THRESHOLD and not area.locked: + + actions.limit_speed_into_area(area, vehicles,30) + actions.adjust_traffic_light_phase_duration(area, 0.5) + + traci.polygon.setColor(area.name, (255, 0, 0)) + traci.polygon.setFilled(area.name, True) + + +def add_data_to_areas(areas: List[Area]): + lanes = get_all_lanes() + for area in areas: + for lane in lanes: + if area.rectangle.intersects(lane.polygon): + area.add_lane(lane) + for tl_id in traci.trafficlight.getIDList(): + if lane.lane_id in traci.trafficlight.getControlledLanes(tl_id): + area.add_tl(TrafficLight(tl_id)) + + + +def main(): + grid = list() + try: + traci.start(config.sumo_cmd) + grid = init_grid(traci.simulation.getNetBoundary(), config.CELLS_NUMBER) + add_data_to_areas(grid) + + step = 0 + while step < config.n_steps : #traci.simulation.getMinExpectedNumber() > 0: + traci.simulationStep() + + vehicles = get_all_vehicles() + get_emissions(grid, vehicles) + + if config.routing_mode: + actions.adjust_edges_weights() + # actions.rerouteAllVehicles() + + step += 1 + progress = round(step/config.n_steps*100,2) + sys.stdout.write(f'Progress : {progress}%'+'\r') + sys.stdout.flush() + + finally: + traci.close(False) + + total_emissions = 0 + for area in grid: + total_emissions += area.emissions + + #Total of emissions of all pollutants in mg for 200 steps of simulation without locking areas + total_emissions200 = 43970763.15084749 + + print("\n**** RESULTS ****") + print(f'Total emissions = {total_emissions} mg') + diff_with_lock = (total_emissions200 - total_emissions)/total_emissions200 + print(f'Reduction percentage of emissions = {diff_with_lock*100} %') + print("With the configuration :\n" + str(config.showConfig())) + +if __name__ == '__main__': + main() diff --git a/sumo_project/model.py b/sumo_project/model.py index 3fe8afc..9c3e9eb 100644 --- a/sumo_project/model.py +++ b/sumo_project/model.py @@ -1,64 +1,77 @@ -from typing import Tuple, Set - -from shapely.geometry import Point, LineString -from shapely.geometry import Polygon -from shapely.geometry.base import BaseGeometry - - -class Lane: - - def __init__(self, lane_id: str, polygon: LineString): - self.polygon = polygon - self.lane_id = lane_id - - def __hash__(self): - """Overrides the default implementation""" - return hash(self.lane_id) - - -class Area: - - def __init__(self, coords, name=''): - self.locked = False - self.rectangle = Polygon(coords) - self.name = name - self.emissions = 0.0 - self._lanes: Set[Lane] = set() - - def __eq__(self, other): - return self.rectangle.__eq__(other) - - def __contains__(self, item): - return self.rectangle.contains(item) - - @property - def bounds(self): - return self.rectangle.bounds - - def intersects(self, other: BaseGeometry) -> bool: - return self.rectangle.intersects(other) - - def add_lane(self, lane: Lane): - self._lanes.add(lane) - - def remove_lane(self, lane: Lane): - self._lanes.remove(lane) - - @classmethod - def from_bounds(cls, xmin, ymin, xmax, ymax): - return cls(( - (xmin, ymin), - (xmin, ymax), - (xmax, ymax), - (xmax, ymin))) - - -class Vehicle: - - def __init__(self, veh_id: int, pos: Tuple[float, float]): - self.emissions: float = 0.0 - self.veh_id = veh_id - self.pos = Point(pos) - - def __repr__(self) -> str: - return str(self.__dict__) +from typing import Tuple, Set + +from shapely.geometry import Point, LineString +from shapely.geometry import Polygon +from shapely.geometry.base import BaseGeometry + + +class Lane: + + def __init__(self, lane_id: str, polygon: LineString): + self.polygon = polygon + self.lane_id = lane_id + + def __hash__(self): + """Overrides the default implementation""" + return hash(self.lane_id) + +class TrafficLight: + + def __init__(self, tl_id: str): + self.tl_id = tl_id + + def __hash__(self): + """Overrides the default implementation""" + return hash(self.tl_id) + + +class Area: + + def __init__(self, coords, name=''): + self.locked = False + self.rectangle = Polygon(coords) + self.name = name + self.emissions = 0.0 + self._lanes: Set[Lane] = set() + self._tls: Set[TrafficLight] = set() + + def __eq__(self, other): + return self.rectangle.__eq__(other) + + def __contains__(self, item): + return self.rectangle.contains(item) + + @property + def bounds(self): + return self.rectangle.bounds + + def intersects(self, other: BaseGeometry) -> bool: + return self.rectangle.intersects(other) + + def add_lane(self, lane: Lane): + self._lanes.add(lane) + + def add_tl(self, tl: TrafficLight): + self._tls.add(tl) + + def remove_lane(self, lane: Lane): + self._lanes.remove(lane) + + @classmethod + def from_bounds(cls, xmin, ymin, xmax, ymax): + return cls(( + (xmin, ymin), + (xmin, ymax), + (xmax, ymax), + (xmax, ymin))) + + +class Vehicle: + + def __init__(self, veh_id: int, pos: Tuple[float, float]): + self.emissions: float = 0.0 + self.veh_id = veh_id + self.pos = Point(pos) + + def __repr__(self) -> str: + return str(self.__dict__)