mirror of
				https://github.com/Ahp06/SUMO_Emissions.git
				synced 2025-10-25 07:59:19 +00:00 
			
		
		
		
	Reformat code 🚀
This commit is contained in:
		| @@ -11,27 +11,27 @@ import sys | ||||
| from model import Emission | ||||
|  | ||||
|  | ||||
| class Config:  | ||||
|      | ||||
| class Config: | ||||
|     # Total of emissions of all pollutants in mg for n steps of simulation without acting on areas | ||||
|     # These constants are simulation dependant, you must change them according to your simulation  | ||||
|     ref200 = Emission(co2=42816869.05436445,co=1128465.0343051048,nox=18389.648337283958,hc=6154.330914019103,pmx=885.0829265236318) | ||||
|      | ||||
|     ref200 = Emission(co2=42816869.05436445, co=1128465.0343051048, nox=18389.648337283958, hc=6154.330914019103, | ||||
|                       pmx=885.0829265236318) | ||||
|  | ||||
|     def __init__(self): | ||||
|         '''Default constructor''' | ||||
|          | ||||
|     def import_config_file(self,config_file): | ||||
|         with open(config_file,'r') as f: | ||||
|         """Default constructor""" | ||||
|  | ||||
|     def import_config_file(self, config_file): | ||||
|         with open(config_file, 'r') as f: | ||||
|             data = json.load(f) | ||||
|              | ||||
|  | ||||
|         self._SUMOCMD = data["_SUMOCMD"] | ||||
|         self._SUMOCFG = data["_SUMOCFG"] | ||||
|          | ||||
|  | ||||
|         self.areas_number = data["areas_number"] | ||||
|         self.emissions_threshold = data["emissions_threshold"] | ||||
|         self.n_steps = data["n_steps"] | ||||
|         self.window_size = data["window_size"] | ||||
|          | ||||
|  | ||||
|         self.without_actions_mode = data["without_actions_mode"] | ||||
|         self.limit_speed_mode = data["limit_speed_mode"] | ||||
|         self.speed_rf = data["speed_rf"] | ||||
| @@ -39,58 +39,59 @@ class Config: | ||||
|         self.trafficLights_duration_rf = data["trafficLights_duration_rf"] | ||||
|         self.weight_routing_mode = data["weight_routing_mode"] | ||||
|         self.lock_area_mode = data["lock_area_mode"] | ||||
|              | ||||
|  | ||||
|         self.check_config() | ||||
|                  | ||||
|  | ||||
|     def check_config(self): | ||||
|         #Weight routing mode cannot be combinated with other actions  | ||||
|         # Weight routing mode cannot be combinated with other actions | ||||
|         if self.weight_routing_mode: | ||||
|             self.limit_speed_mode = False | ||||
|             self.adjust_traffic_light_mode = False | ||||
|             self.lock_area_mode = False | ||||
|                  | ||||
|         #If without_actions_mode is choosen  | ||||
|  | ||||
|         # If without_actions_mode is choosen | ||||
|         if self.without_actions_mode: | ||||
|             self.limit_speed_mode = False | ||||
|             self.adjust_traffic_light_mode = False | ||||
|             self.weight_routing_mode = False | ||||
|             self.lock_area_mode = False | ||||
|          | ||||
|                  | ||||
|  | ||||
|     def __repr__(self) -> str: | ||||
|         return (str(f'grid : {self.areas_number}x{self.areas_number}\n') | ||||
|         + str(f'step number = {self.n_steps}\n') | ||||
|         + str(f'window size = {self.window_size}\n') | ||||
|         + str(f'weight routing mode = {self.weight_routing_mode}\n') | ||||
|         + str(f'lock area mode = {self.lock_area_mode}\n') | ||||
|         + str(f'limit speed mode = {self.limit_speed_mode}, RF = {self.speed_rf*100}%\n') | ||||
|         + str(f'adjust traffic light mode = {self.adjust_traffic_light_mode} , RF = {self.trafficLights_duration_rf*100}%\n')) | ||||
|                  | ||||
|      | ||||
|         return ( | ||||
|             f'grid : {self.areas_number}x{self.areas_number}\n' | ||||
|             f'step number = {self.n_steps}\n' | ||||
|             f'window size = {self.window_size}\n' | ||||
|             f'weight routing mode = {self.weight_routing_mode}\n' | ||||
|             f'lock area mode = {self.lock_area_mode}\n' | ||||
|             f'limit speed mode = {self.limit_speed_mode}, RF = {self.speed_rf * 100}%\n' | ||||
|             f'adjust traffic light mode = {self.adjust_traffic_light_mode},' | ||||
|             'RF = {self.trafficLights_duration_rf * 100}%\n' | ||||
|         ) | ||||
|  | ||||
|     def init_traci(self): | ||||
|         if 'SUMO_HOME' in os.environ: | ||||
|             tools = os.path.join(os.environ['SUMO_HOME'], 'tools') | ||||
|             sys.path.append(tools) | ||||
|         else: | ||||
|             sys.exit("please declare environment variable 'SUMO_HOME'") | ||||
|          | ||||
|  | ||||
|         sumo_binary = os.path.join(os.environ['SUMO_HOME'], 'bin', self._SUMOCMD) | ||||
|         self.sumo_cmd = [sumo_binary, "-c", self._SUMOCFG] | ||||
|      | ||||
|     def init_logger(self, save_logs = False): | ||||
|  | ||||
|     def init_logger(self, save_logs=False): | ||||
|         now = datetime.datetime.now() | ||||
|         current_date = now.strftime("%Y_%m_%d_%H_%M_%S") | ||||
|          | ||||
|  | ||||
|         if not os.path.exists('logs'): | ||||
|             os.makedirs('logs') | ||||
|      | ||||
|  | ||||
|         log_filename = f'logs/sumo_logs_{current_date}.log' | ||||
|          | ||||
|  | ||||
|         logger = logging.getLogger("sumo_logger") | ||||
|         logger.setLevel(logging.INFO) | ||||
|         formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") | ||||
|          | ||||
|         if save_logs : | ||||
|  | ||||
|         if save_logs: | ||||
|             file_handler = logging.FileHandler(log_filename) | ||||
|             file_handler.setFormatter(formatter) | ||||
|             logger.addHandler(file_handler) | ||||
| @@ -98,11 +99,9 @@ class Config: | ||||
|         handler = logging.StreamHandler() | ||||
|         handler.setFormatter(formatter) | ||||
|         logger.addHandler(handler) | ||||
|          | ||||
|  | ||||
|         return logger | ||||
|  | ||||
|     def get_ref_emissions(self): | ||||
|         if self.n_steps == 200: | ||||
|             return self.ref200 | ||||
|      | ||||
|      | ||||
|   | ||||
| @@ -1,26 +1,25 @@ | ||||
| import argparse | ||||
| import csv | ||||
| import datetime | ||||
| import itertools | ||||
| import os | ||||
| import sys | ||||
| import time | ||||
| from traci import trafficlight | ||||
| import traci | ||||
| from typing import List | ||||
| import datetime | ||||
| import os | ||||
|  | ||||
| import traci | ||||
| from parse import search | ||||
| from shapely.geometry import LineString | ||||
|  | ||||
| import actions | ||||
| from config import Config | ||||
| from model import Area, Vehicle, Lane , TrafficLight , Phase , Logic, Emission | ||||
| from model import Area, Vehicle, Lane, TrafficLight, Phase, Logic, Emission | ||||
|  | ||||
| # Absolute path of the directory the script is in | ||||
| SCRIPTDIR = os.path.dirname(__file__) | ||||
|  | ||||
|  | ||||
| def init_grid(simulation_bounds, areas_number,window_size): | ||||
| def init_grid(simulation_bounds, areas_number, window_size): | ||||
|     grid = list() | ||||
|     width = simulation_bounds[1][0] / areas_number | ||||
|     height = simulation_bounds[1][1] / areas_number | ||||
| @@ -47,14 +46,16 @@ def get_all_lanes() -> List[Lane]: | ||||
|  | ||||
| def parse_phase(phase_repr): | ||||
|     duration = search('duration: {:f}', phase_repr) | ||||
|     minDuration = search('minDuration: {:f}', phase_repr) | ||||
|     maxDuration = search('maxDuration: {:f}', phase_repr) | ||||
|     phaseDef = search('phaseDef: {}\n', phase_repr) | ||||
|     min_duration = search('min_duration: {:f}', phase_repr) | ||||
|     max_duration = search('max_duration: {:f}', phase_repr) | ||||
|     phase_def = search('phase_def: {}\n', phase_repr) | ||||
|  | ||||
|     if phaseDef is None: phaseDef = '' | ||||
|     else : phaseDef = phaseDef[0] | ||||
|     if phase_def is None: | ||||
|         phase_def = '' | ||||
|     else: | ||||
|         phase_def = phase_def[0] | ||||
|  | ||||
|     return Phase(duration[0], minDuration[0], maxDuration[0], phaseDef) | ||||
|     return Phase(duration[0], min_duration[0], max_duration[0], phase_def) | ||||
|  | ||||
|  | ||||
| def add_data_to_areas(areas: List[Area]): | ||||
| @@ -70,19 +71,19 @@ def add_data_to_areas(areas: List[Area]): | ||||
|                             phases = [] | ||||
|                             for phase in traci.trafficlight.Logic.getPhases(l):  # add phases to logics | ||||
|                                 phases.append(parse_phase(phase.__repr__())) | ||||
|                             logics.append(Logic(l, phases))  | ||||
|                             logics.append(Logic(l, phases)) | ||||
|                         area.add_tl(TrafficLight(tl_id, logics)) | ||||
|  | ||||
|  | ||||
| def compute_vehicle_emissions(veh_id): | ||||
|      | ||||
|     co2 = traci.vehicle.getCO2Emission(veh_id) | ||||
|     co = traci.vehicle.getCOEmission(veh_id) | ||||
|     nox = traci.vehicle.getNOxEmission(veh_id) | ||||
|     hc = traci.vehicle.getHCEmission(veh_id) | ||||
|     pmx = traci.vehicle.getPMxEmission(veh_id) | ||||
|      | ||||
|     return Emission(co2,co,nox,hc,pmx) | ||||
|  | ||||
|     return Emission(co2, co, nox, hc, pmx) | ||||
|  | ||||
|  | ||||
| def get_all_vehicles() -> List[Vehicle]: | ||||
|     vehicles = list() | ||||
| @@ -100,33 +101,35 @@ def get_emissions(grid: List[Area], vehicles: List[Vehicle], current_step, confi | ||||
|         for vehicle in vehicles: | ||||
|             if vehicle.pos in area: | ||||
|                 total_emissions += vehicle.emissions | ||||
|          | ||||
|  | ||||
|         area.emissions_by_step.append(total_emissions) | ||||
|          | ||||
|         if area.sum_emissions_into_window(current_step, config.window_size) >= config.emissions_threshold:  | ||||
|                  | ||||
|  | ||||
|         if area.sum_emissions_into_window(current_step, config.window_size) >= config.emissions_threshold: | ||||
|  | ||||
|             if config.limit_speed_mode and not area.limited_speed: | ||||
|                 logger.info(f'Action - Decreased max speed into {area.name} by {config.speed_rf*100}%') | ||||
|                 logger.info(f'Action - Decreased max speed into {area.name} by {config.speed_rf * 100}%') | ||||
|                 actions.limit_speed_into_area(area, vehicles, config.speed_rf) | ||||
|                 if config.adjust_traffic_light_mode and not area.tls_adjusted: | ||||
|                     logger.info(f'Action - Decreased traffic lights duration by {config.trafficLights_duration_rf*100}%') | ||||
|                     logger.info( | ||||
|                         f'Action - Decreased traffic lights duration by {config.trafficLights_duration_rf * 100}%') | ||||
|                     actions.adjust_traffic_light_phase_duration(area, config.trafficLights_duration_rf) | ||||
|                  | ||||
|  | ||||
|             if config.lock_area_mode and not area.locked: | ||||
|                 if actions.count_vehicles_in_area(area): | ||||
|                     logger.info(f'Action - {area.name} blocked') | ||||
|                     actions.lock_area(area) | ||||
|                      | ||||
|  | ||||
|             if config.weight_routing_mode and not area.weight_adjusted: | ||||
|                 actions.adjust_edges_weights(area) | ||||
|              | ||||
|  | ||||
|             traci.polygon.setFilled(area.name, True) | ||||
|          | ||||
|  | ||||
|         else: | ||||
|             actions.reverse_actions(area) | ||||
|             traci.polygon.setFilled(area.name, False) | ||||
|  | ||||
| def get_reduction_percentage(ref,total): | ||||
|  | ||||
| def get_reduction_percentage(ref, total): | ||||
|     return (ref - total) / ref * 100 | ||||
|  | ||||
|  | ||||
| @@ -154,89 +157,90 @@ def run(config, logger): | ||||
|         logger.info(f'Loaded simulation file : {config._SUMOCFG}') | ||||
|         logger.info('Loading data for the simulation') | ||||
|         start = time.perf_counter() | ||||
|         | ||||
|  | ||||
|         grid = init_grid(traci.simulation.getNetBoundary(), config.areas_number, config.window_size) | ||||
|         add_data_to_areas(grid) | ||||
|          | ||||
|  | ||||
|         loading_time = round(time.perf_counter() - start, 2) | ||||
|         logger.info(f'Data loaded ({loading_time}s)') | ||||
|          | ||||
|  | ||||
|         logger.info('Simulation started...') | ||||
|         step = 0  | ||||
|         while step < config.n_steps :  # traci.simulation.getMinExpectedNumber() > 0: | ||||
|         step = 0 | ||||
|         while step < config.n_steps:  # traci.simulation.getMinExpectedNumber() > 0: | ||||
|             traci.simulationStep() | ||||
|              | ||||
|  | ||||
|             vehicles = get_all_vehicles() | ||||
|             get_emissions(grid, vehicles, step, config, logger) | ||||
|             step += 1 | ||||
|              | ||||
|  | ||||
|             print(f'step = {step}/{config.n_steps}', end='\r') | ||||
|              | ||||
|  | ||||
|     finally: | ||||
|         traci.close(False) | ||||
|         export_data_to_csv(config,grid) | ||||
|          | ||||
|         export_data_to_csv(config, grid) | ||||
|  | ||||
|         simulation_time = round(time.perf_counter() - start, 2) | ||||
|         logger.info(f'End of the simulation ({simulation_time}s)') | ||||
|         logger.info(f'Real-time factor : {config.n_steps/simulation_time}') | ||||
|          | ||||
|         logger.info(f'Real-time factor : {config.n_steps / simulation_time}') | ||||
|  | ||||
|         total_emissions = Emission() | ||||
|         for area in grid: | ||||
|             total_emissions += area.sum_all_emissions() | ||||
|              | ||||
|  | ||||
|         logger.info(f'Total emissions = {total_emissions.value()} mg') | ||||
|          | ||||
|         if not config.without_actions_mode : | ||||
|  | ||||
|         if not config.without_actions_mode: | ||||
|             ref = config.get_ref_emissions() | ||||
|             if not (ref is None): | ||||
|                 global_diff = (ref.value() - total_emissions.value()) / ref.value()     | ||||
|                  | ||||
|                 logger.info(f'Global reduction percentage of emissions = {global_diff*100} %') | ||||
|                 global_diff = (ref.value() - total_emissions.value()) / ref.value() | ||||
|  | ||||
|                 logger.info(f'Global reduction percentage of emissions = {global_diff * 100} %') | ||||
|                 logger.info(f'-> CO2 emissions = {get_reduction_percentage(ref.co2, total_emissions.co2)} %') | ||||
|                 logger.info(f'-> CO emissions = {get_reduction_percentage(ref.co, total_emissions.co)} %') | ||||
|                 logger.info(f'-> Nox emissions = {get_reduction_percentage(ref.nox, total_emissions.nox)} %')                                 | ||||
|                 logger.info(f'-> HC emissions = {get_reduction_percentage(ref.hc, total_emissions.hc)} %')                                 | ||||
|                 logger.info(f'-> PMx emissions = {get_reduction_percentage(ref.pmx, total_emissions.pmx)} %')                                 | ||||
|                 logger.info(f'-> Nox emissions = {get_reduction_percentage(ref.nox, total_emissions.nox)} %') | ||||
|                 logger.info(f'-> HC emissions = {get_reduction_percentage(ref.hc, total_emissions.hc)} %') | ||||
|                 logger.info(f'-> PMx emissions = {get_reduction_percentage(ref.pmx, total_emissions.pmx)} %') | ||||
|  | ||||
|  | ||||
| def add_options(parser): | ||||
|     parser.add_argument("-f", "--configfile", type=str, default='configs/default_config.json', required=False, | ||||
|                         help='Choose your configuration file from your working directory') | ||||
|     parser.add_argument("-save", "--save", action="store_true",  | ||||
|                         help = 'Save the logs into the logs folder') | ||||
|     parser.add_argument("-steps", "--steps", type= int, default= 200, required = False,  | ||||
|                         help='Choose the simulated time (in seconds)')  | ||||
|     parser.add_argument("-ref", "--ref", action="store_true",  | ||||
|     parser.add_argument("-save", "--save", action="store_true", | ||||
|                         help='Save the logs into the logs folder') | ||||
|     parser.add_argument("-steps", "--steps", type=int, default=200, required=False, | ||||
|                         help='Choose the simulated time (in seconds)') | ||||
|     parser.add_argument("-ref", "--ref", action="store_true", | ||||
|                         help='Launch a reference simulation (without acting on areas)') | ||||
|     parser.add_argument("-gui", "--gui", action ="store_true", | ||||
|                         help= "Set GUI mode") | ||||
|      | ||||
|     parser.add_argument("-gui", "--gui", action="store_true", | ||||
|                         help="Set GUI mode") | ||||
|  | ||||
|  | ||||
| def main(args): | ||||
|      | ||||
|     parser = argparse.ArgumentParser(description="") | ||||
|     add_options(parser) | ||||
|     args = parser.parse_args(args) | ||||
|      | ||||
|  | ||||
|     config = Config() | ||||
|     config.import_config_file(args.configfile) | ||||
|     config.init_traci() | ||||
|     logger = config.init_logger(save_logs=args.save) | ||||
|      | ||||
|     if args.ref:  | ||||
|  | ||||
|     if args.ref: | ||||
|         config.without_actions_mode = True | ||||
|         logger.info(f'Reference simulation') | ||||
|      | ||||
|     if args.steps:  | ||||
|         config.n_steps = args.steps  | ||||
|          | ||||
|  | ||||
|     if args.steps: | ||||
|         config.n_steps = args.steps | ||||
|  | ||||
|     if args.gui: | ||||
|         config._SUMOCMD = "sumo-gui" | ||||
|          | ||||
|  | ||||
|     config.check_config() | ||||
|      | ||||
|  | ||||
|     logger.info(f'Loaded configuration file : {args.configfile}') | ||||
|     logger.info(f'Simulated time : {args.steps}s') | ||||
|     run(config, logger) | ||||
|  | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     main(sys.argv[1:]) | ||||
|   | ||||
| @@ -1,10 +1,10 @@ | ||||
| import collections  | ||||
| from traci._trafficlight import Logic as SUMO_Logic | ||||
| import collections | ||||
| from typing import Tuple, Set | ||||
|  | ||||
| from shapely.geometry import Point, LineString | ||||
| from shapely.geometry import Polygon | ||||
| from shapely.geometry.base import BaseGeometry | ||||
| from traci._trafficlight import Logic as SUMO_Logic | ||||
|  | ||||
|  | ||||
| class Lane: | ||||
| @@ -18,51 +18,56 @@ class Lane: | ||||
|         """Overrides the default implementation""" | ||||
|         return hash(self.lane_id) | ||||
|  | ||||
|  | ||||
| class Phase: | ||||
|     def __init__(self, duration: float, minDuration: float, maxDuration : float, phaseDef: str): | ||||
|         self.duration = duration  | ||||
|     def __init__(self, duration: float, minDuration: float, maxDuration: float, phaseDef: str): | ||||
|         self.duration = duration | ||||
|         self.minDuration = minDuration | ||||
|         self.maxDuration = maxDuration | ||||
|         self.phaseDef = phaseDef  | ||||
|          | ||||
|         self.phaseDef = phaseDef | ||||
|  | ||||
|     def __repr__(self) -> str: | ||||
|         repr = f'Phase(duration:{self.duration},minDuration:{self.minDuration},maxDuration:{self.maxDuration},phaseDef:{self.phaseDef})' | ||||
|         return str(repr) | ||||
|  | ||||
|  | ||||
| class Logic: | ||||
|     def __init__(self, logic: SUMO_Logic, phases: Set[Phase]): | ||||
|         self._logic = logic | ||||
|         self._phases: Set[Phase] = phases | ||||
|  | ||||
| class TrafficLight:  | ||||
|      | ||||
|  | ||||
| class TrafficLight: | ||||
|  | ||||
|     def __init__(self, tl_id: str, logics: Set[Logic]): | ||||
|         self.tl_id = tl_id  | ||||
|         self.tl_id = tl_id | ||||
|         self._logics: Set[Logic] = logics | ||||
|          | ||||
|  | ||||
|     def __hash__(self): | ||||
|         """Overrides the default implementation""" | ||||
|         return hash(self.tl_id) | ||||
|  | ||||
|  | ||||
| class Emission: | ||||
|     def __init__(self, co2 = 0, co = 0 , nox = 0, hc = 0, pmx = 0): | ||||
|     def __init__(self, co2=0, co=0, nox=0, hc=0, pmx=0): | ||||
|         self.co2 = co2 | ||||
|         self.co = co | ||||
|         self.nox = nox | ||||
|         self.hc = hc | ||||
|         self.pmx = pmx | ||||
|      | ||||
|     def __add__(self,other): | ||||
|         return Emission(self.co2 + other.co2, self.co + other.co, self.nox + other.nox, self.hc + other.hc, self.pmx + other.pmx) | ||||
|          | ||||
|      | ||||
|  | ||||
|     def __add__(self, other): | ||||
|         return Emission(self.co2 + other.co2, self.co + other.co, self.nox + other.nox, self.hc + other.hc, | ||||
|                         self.pmx + other.pmx) | ||||
|  | ||||
|     def value(self): | ||||
|         return self.co2 + self.co + self.nox + self.hc + self.pmx | ||||
|      | ||||
|  | ||||
|     def __repr__(self) -> str: | ||||
|         repr = f'Emission(co2={self.co2},co={self.co},nox={self.nox},hc={self.hc},pmx={self.pmx})' | ||||
|         return str(repr) | ||||
|  | ||||
|  | ||||
| class Area: | ||||
|  | ||||
|     def __init__(self, coords, name, window_size): | ||||
| @@ -73,9 +78,9 @@ class Area: | ||||
|         self.rectangle = Polygon(coords) | ||||
|         self.name = name | ||||
|         self.emissions_by_step = [] | ||||
|         self.window = collections.deque(maxlen = window_size) | ||||
|         self.window = collections.deque(maxlen=window_size) | ||||
|         self._lanes: Set[Lane] = set() | ||||
|         self._tls: Set[TrafficLight] = set()  | ||||
|         self._tls: Set[TrafficLight] = set() | ||||
|  | ||||
|     def __eq__(self, other): | ||||
|         return self.rectangle.__eq__(other) | ||||
| @@ -92,23 +97,23 @@ class Area: | ||||
|  | ||||
|     def add_lane(self, lane: Lane): | ||||
|         self._lanes.add(lane) | ||||
|          | ||||
|  | ||||
|     def add_tl(self, tl: TrafficLight): | ||||
|         self._tls.add(tl) | ||||
|  | ||||
|     def remove_lane(self, lane: Lane): | ||||
|         self._lanes.remove(lane) | ||||
|          | ||||
|  | ||||
|     def sum_all_emissions(self): | ||||
|         sum = Emission() | ||||
|         for emission in self.emissions_by_step: | ||||
|             sum += emission | ||||
|         return sum  | ||||
|      | ||||
|     def sum_emissions_into_window(self, current_step, window_size):  | ||||
|         return sum | ||||
|  | ||||
|     def sum_emissions_into_window(self, current_step, window_size): | ||||
|  | ||||
|         self.window.appendleft(self.emissions_by_step[current_step].value()) | ||||
|          | ||||
|  | ||||
|         sum = 0 | ||||
|         for i in range(self.window.__len__()): | ||||
|             sum += self.window[i] | ||||
| @@ -122,6 +127,7 @@ class Area: | ||||
|             (xmax, ymax), | ||||
|             (xmax, ymin))) | ||||
|  | ||||
|  | ||||
| class Vehicle: | ||||
|  | ||||
|     def __init__(self, veh_id: int, pos: Tuple[float, float]): | ||||
|   | ||||
		Reference in New Issue
	
	Block a user