1
0
mirror of https://github.com/Ahp06/SUMO_Emissions.git synced 2024-11-22 11:36:29 +00:00

Merge pull request #5 from Ahp06/Axel

Axel
This commit is contained in:
Axel HUYNH-PHUC 2018-12-12 10:40:44 +01:00 committed by GitHub
commit 0ff926e8c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 229 additions and 161 deletions

View File

@ -3,13 +3,14 @@ Created on 17 oct. 2018
@author: Axel Huynh-Phuc, Thibaud Gasser @author: Axel Huynh-Phuc, Thibaud Gasser
""" """
import traci
from traci._trafficlight import Logic
from typing import Iterable from typing import Iterable
import traci
from shapely.geometry.linestring import LineString from shapely.geometry.linestring import LineString
from model import Area, Vehicle from model import Area, Vehicle
from traci._trafficlight import Logic
def compute_edge_weight(edge_id): def compute_edge_weight(edge_id):
return (traci.edge.getCOEmission(edge_id) return (traci.edge.getCOEmission(edge_id)
@ -58,19 +59,20 @@ def lock_area(area):
def reverse_actions(area): def reverse_actions(area):
#Reset max speed to original #Reset max speed to original
if not area.limited_speed: if area.limited_speed:
area.limited_speed = False area.limited_speed = False
for lane in area.lanes: for lane in area._lanes:
traci.lane.setMaxSpeed(lane.lane_id, lane.initial_max_speed / 3.6) traci.lane.setMaxSpeed(lane.lane_id, lane.initial_max_speed)
#Reset traffic lights initial duration #Reset traffic lights initial duration
if not area.tls_adjusted: if area.tls_adjusted:
area.tls_adjusted = False area.tls_adjusted = False
for tl in area._tls:
for initial_logic in tl._logics: for initial_logic in tl._logics:
traci.trafficlights.setCompleteRedYellowGreenDefinition(tl.tl_id, initial_logic) traci.trafficlights.setCompleteRedYellowGreenDefinition(tl.tl_id, initial_logic._logic)
#Unlock the area #Unlock the area
if not area.locked: if area.locked:
area.locked = False area.locked = False
for lane in area._lanes: for lane in area._lanes:
traci.lane.setAllowed(lane.lane_id, '') #empty means all classes are allowed traci.lane.setAllowed(lane.lane_id, '') #empty means all classes are allowed

View File

@ -2,116 +2,111 @@
Global configuration for the simulation Global configuration for the simulation
""" """
import datetime
import json
import logging
import os import os
import sys import sys
import datetime
import logging
###############################################################################
############################# SIMULATION FILE #################################
###############################################################################
class Config:
# Total of emissions of all pollutants in mg for n steps of simulation without acting on areas
# These constants are simulation dependant, you must change them according to your simulation
total_emissions100 = 13615949.148296086
total_emissions200 = 43970763.15084738
total_emissions300 = 87382632.0821697
def __init__(self):
'''Default constructor'''
def import_config_file(self,config_file):
with open(config_file,'r') as f:
data = json.load(f)
self._SUMOCMD = data["_SUMOCMD"]
self._SUMOCFG = data["_SUMOCFG"]
self.areas_number = data["areas_number"]
self.emissions_threshold = data["emissions_threshold"]
self.n_steps = data["n_steps"]
self.window_size = data["window_size"]
self.without_actions_mode = data["without_actions_mode"]
self.limit_speed_mode = data["limit_speed_mode"]
self.speed_rf = data["speed_rf"]
self.adjust_traffic_light_mode = data["adjust_traffic_light_mode"]
self.trafficLights_duration_rf = data["trafficLights_duration_rf"]
self.weight_routing_mode = data["weight_routing_mode"]
self.lock_area_mode = data["lock_area_mode"]
self.check_config()
def check_config(self):
#Weight routing mode cannot be combinated with other actions
if self.weight_routing_mode:
self.limit_speed_mode = False
self.adjust_traffic_light_mode = False
self.lock_area_mode = False
#If without_actions_mode is choosen
if self.without_actions_mode:
self.limit_speed_mode = False
self.adjust_traffic_light_mode = False
self.weight_routing_mode = False
self.lock_area_mode = False
def __repr__(self) -> str:
return (str(f'grid : {self.areas_number}x{self.areas_number}\n')
+ str(f'step number = {self.n_steps}\n')
+ str(f'window size = {self.window_size}\n')
+ str(f'weight routing mode = {self.weight_routing_mode}\n')
+ str(f'lock area mode = {self.lock_area_mode}\n')
+ str(f'limit speed mode = {self.limit_speed_mode}, RF = {self.speed_rf*100}%\n')
+ str(f'adjust traffic light mode = {self.adjust_traffic_light_mode} , RF = {self.trafficLights_duration_rf*100}%\n'))
def init_traci(self):
if 'SUMO_HOME' in os.environ: if 'SUMO_HOME' in os.environ:
tools = os.path.join(os.environ['SUMO_HOME'], 'tools') tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
sys.path.append(tools) sys.path.append(tools)
else: else:
sys.exit("please declare environment variable 'SUMO_HOME'") sys.exit("please declare environment variable 'SUMO_HOME'")
_SUMOCMD = 'sumo' # use 'sumo-gui' cmd for UI sumo_binary = os.path.join(os.environ['SUMO_HOME'], 'bin', self._SUMOCMD)
_SUMOCFG = "mulhouse_simulation/osm.sumocfg" self.sumo_cmd = [sumo_binary, "-c", self._SUMOCFG]
sumo_binary = os.path.join(os.environ['SUMO_HOME'], 'bin', _SUMOCMD)
sumo_cmd = [sumo_binary, "-c", _SUMOCFG]
###############################################################################
################################## LOGS #######################################
###############################################################################
def init_logger(self, save_logs = False):
now = datetime.datetime.now() now = datetime.datetime.now()
current_date = now.strftime("%Y_%m_%d_%H_%M_%S") current_date = now.strftime("%Y_%m_%d_%H_%M_%S")
LOG_FILENAME = f'sumo_logs_{current_date}.log'
# create logger if not os.path.exists('logs'):
os.makedirs('logs')
log_filename = f'logs/sumo_logs_{current_date}.log'
logger = logging.getLogger("sumo_logger") logger = logging.getLogger("sumo_logger")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
# create console handler and set level to info
handler = logging.FileHandler(LOG_FILENAME)
# create formatter
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
# add formatter to handler
if save_logs :
file_handler = logging.FileHandler(log_filename)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
handler = logging.StreamHandler()
handler.setFormatter(formatter) handler.setFormatter(formatter)
# add handler to logger
logger.addHandler(handler) logger.addHandler(handler)
############################################################################### return logger
########################## SIMULATION CONFIGURATION ###########################
###############################################################################
CELLS_NUMBER = 10 def get_basics_emissions(self):
EMISSIONS_THRESHOLD = 500000 if self.n_steps == 100:
n_steps = 200 return self.total_emissions100
if self.n_steps == 200:
return self.total_emissions200
if self.n_steps == 300:
return self.total_emissions300
###############################################################################
########################## ACTIONS CONFIGURATION ##############################
###############################################################################
#Set this mode to True if you want running a basic simulation without actions
without_actions_mode = False
#Limit the speed into areas when the threshold is exceeded
speed_rf = 0.1
limit_speed_mode = True
#Decrease all traffic lights duration into the area when the threshold is exceeded
trafficLights_duration_rf = 0.2
adjust_traffic_light_mode = True
#Vehicles are routed according to the less polluted route (HEAVY)
weight_routing_mode = False
#Lock the area when the threshold is exceeded (NOT FIXED)
lock_area_mode = False
#Weight routing mode cannot be combinated with other actions
if weight_routing_mode:
limit_speed_mode = False
adjust_traffic_light_mode = False
#If without_actions_mode is choosen
if without_actions_mode:
limit_speed_mode = False
adjust_traffic_light_mode = False
weight_routing_mode = False
lock_area_mode = False
###############################################################################
########################## SIMULATION REFERENCES ##############################
###############################################################################
# Total of emissions of all pollutants in mg for n steps of simulation without locking areas
total_emissions200 = 43970763.15084749
total_emissions300 = 87382632.08217141
total_emissions400 = 140757491.8489904
total_emissions500 = 202817535.43856794
###############################################################################
########################## CONFIGURATION METHODS ##############################
###############################################################################
def get_basics_emissions():
if n_steps == 200:
return total_emissions200
if n_steps == 300:
return total_emissions300
if n_steps == 400:
return total_emissions400
if n_steps == 500:
return total_emissions500
def show_config():
return (str(f'Grid : {CELLS_NUMBER}x{CELLS_NUMBER}\n')
+ str(f'step number = {n_steps}\n')
+ str(f'weight routing mode = {weight_routing_mode}\n')
+ str(f'lock area mode = {lock_area_mode}\n')
+ str(f'limit speed mode = {limit_speed_mode}, RF = {speed_rf*100}%\n')
+ str(f'adjust traffic light mode = {adjust_traffic_light_mode} , RF = {trafficLights_duration_rf*100}%\n'))

View File

@ -0,0 +1,22 @@
{
"_SUMOCMD": "sumo",
"_SUMOCFG": "simulations/mulhouse_simulation/osm.sumocfg",
"areas_number": 10,
"emissions_threshold": 500000,
"n_steps": 200,
"window_size":100,
"without_actions_mode": true,
"limit_speed_mode": false,
"speed_rf": 0.1,
"adjust_traffic_light_mode": false,
"trafficLights_duration_rf": 0.2,
"weight_routing_mode": false,
"lock_area_mode": false
}

View File

@ -1,25 +1,24 @@
import argparse
import sys
import time
from traci import trafficlight
import traci
from typing import List from typing import List
import traci
import time
from shapely.geometry import LineString
from parse import search from parse import search
from shapely.geometry import LineString
import actions import actions
import config from config import Config
import sys
from model import Area, Vehicle, Lane , TrafficLight , Phase , Logic from model import Area, Vehicle, Lane , TrafficLight , Phase , Logic
from traci import trafficlight
logger = config.logger
def init_grid(simulation_bounds, cells_number): def init_grid(simulation_bounds, areas_number):
grid = list() grid = list()
width = simulation_bounds[1][0] / cells_number width = simulation_bounds[1][0] / areas_number
height = simulation_bounds[1][1] / cells_number height = simulation_bounds[1][1] / areas_number
for i in range(cells_number): for i in range(areas_number):
for j in range(cells_number): for j in range(areas_number):
# bounds coordinates for the area : (xmin, ymin, xmax, ymax) # bounds coordinates for the area : (xmin, ymin, xmax, ymax)
ar_bounds = ((i * width, j * height), (i * width, (j + 1) * height), ar_bounds = ((i * width, j * height), (i * width, (j + 1) * height),
((i + 1) * width, (j + 1) * height), ((i + 1) * width, j * height)) ((i + 1) * width, (j + 1) * height), ((i + 1) * width, j * height))
@ -29,6 +28,7 @@ def init_grid(simulation_bounds, cells_number):
traci.polygon.add(area.name, ar_bounds, (0, 255, 0)) traci.polygon.add(area.name, ar_bounds, (0, 255, 0))
return grid return grid
def get_all_lanes() -> List[Lane]: def get_all_lanes() -> List[Lane]:
lanes = [] lanes = []
for lane_id in traci.lane.getIDList(): for lane_id in traci.lane.getIDList():
@ -37,6 +37,7 @@ def get_all_lanes() -> List[Lane]:
lanes.append(Lane(lane_id, polygon_lane, initial_max_speed)) lanes.append(Lane(lane_id, polygon_lane, initial_max_speed))
return lanes return lanes
def parse_phase(phase_repr): def parse_phase(phase_repr):
duration = search('duration: {:f}', phase_repr) duration = search('duration: {:f}', phase_repr)
minDuration = search('minDuration: {:f}', phase_repr) minDuration = search('minDuration: {:f}', phase_repr)
@ -48,6 +49,7 @@ def parse_phase(phase_repr):
return Phase(duration[0], minDuration[0], maxDuration[0], phaseDef) return Phase(duration[0], minDuration[0], maxDuration[0], phaseDef)
def add_data_to_areas(areas: List[Area]): def add_data_to_areas(areas: List[Area]):
lanes = get_all_lanes() lanes = get_all_lanes()
for area in areas: for area in areas:
@ -64,6 +66,7 @@ def add_data_to_areas(areas: List[Area]):
logics.append(Logic(l, phases)) logics.append(Logic(l, phases))
area.add_tl(TrafficLight(tl_id, logics)) area.add_tl(TrafficLight(tl_id, logics))
def compute_vehicle_emissions(veh_id): def compute_vehicle_emissions(veh_id):
return (traci.vehicle.getCOEmission(veh_id) return (traci.vehicle.getCOEmission(veh_id)
+traci.vehicle.getNOxEmission(veh_id) +traci.vehicle.getNOxEmission(veh_id)
@ -81,18 +84,21 @@ def get_all_vehicles() -> List[Vehicle]:
vehicles.append(vehicle) vehicles.append(vehicle)
return vehicles return vehicles
def get_emissions(grid: List[Area], vehicles: List[Vehicle]):
def get_emissions(grid: List[Area], vehicles: List[Vehicle], current_step, config, logger):
for area in grid: for area in grid:
vehicle_emissions = 0
for vehicle in vehicles: for vehicle in vehicles:
if vehicle.pos in area: if vehicle.pos in area:
area.emissions += vehicle.emissions vehicle_emissions += vehicle.emissions
if area.emissions > config.EMISSIONS_THRESHOLD:
area.emissions_by_step.append(vehicle_emissions)
if area.sum_emissions_into_window(current_step, config.window_size) >= config.emissions_threshold:
if config.limit_speed_mode and not area.limited_speed: if config.limit_speed_mode and not area.limited_speed:
logger.info(f'Action - Decreased max speed into {area.name} by {config.speed_rf*100}%') logger.info(f'Action - Decreased max speed into {area.name} by {config.speed_rf*100}%')
actions.limit_speed_into_area(area, vehicles, config.speed_rf) actions.limit_speed_into_area(area, vehicles, config.speed_rf)
traci.polygon.setColor(area.name, (255, 0, 0))
traci.polygon.setFilled(area.name, True)
if config.adjust_traffic_light_mode and not area.tls_adjusted: if config.adjust_traffic_light_mode and not area.tls_adjusted:
logger.info(f'Action - Decreased traffic lights duration by {config.trafficLights_duration_rf*100}%') logger.info(f'Action - Decreased traffic lights duration by {config.trafficLights_duration_rf*100}%')
actions.adjust_traffic_light_phase_duration(area, config.trafficLights_duration_rf) actions.adjust_traffic_light_phase_duration(area, config.trafficLights_duration_rf)
@ -102,15 +108,22 @@ def get_emissions(grid: List[Area], vehicles: List[Vehicle]):
logger.info(f'Action - {area.name} blocked') logger.info(f'Action - {area.name} blocked')
actions.lock_area(area) actions.lock_area(area)
def main(): traci.polygon.setColor(area.name, (255, 0, 0))
traci.polygon.setFilled(area.name, True)
else:
actions.reverse_actions(area)
def run(config, logger):
grid = list() grid = list()
try: try:
traci.start(config.sumo_cmd) traci.start(config.sumo_cmd)
logger.info(f'Loaded simulation file : {config._SUMOCFG}')
logger.info('Loading data for the simulation') logger.info('Loading data for the simulation')
start = time.perf_counter() start = time.perf_counter()
grid = init_grid(traci.simulation.getNetBoundary(), config.CELLS_NUMBER) grid = init_grid(traci.simulation.getNetBoundary(), config.areas_number)
add_data_to_areas(grid) add_data_to_areas(grid)
loading_time = round(time.perf_counter() - start, 2) loading_time = round(time.perf_counter() - start, 2)
@ -122,10 +135,9 @@ def main():
traci.simulationStep() traci.simulationStep()
vehicles = get_all_vehicles() vehicles = get_all_vehicles()
get_emissions(grid, vehicles) get_emissions(grid, vehicles, step, config, logger)
if config.weight_routing_mode: if config.weight_routing_mode:
logger.info('Action - Lane weights adjusted')
actions.adjust_edges_weights() actions.adjust_edges_weights()
step += 1 step += 1
@ -137,18 +149,42 @@ def main():
total_emissions = 0 total_emissions = 0
for area in grid: for area in grid:
total_emissions += area.emissions total_emissions += area.sum_all_emissions()
logger.info(f'Total emissions = {total_emissions} mg') logger.info(f'Total emissions = {total_emissions} mg')
if not config.without_actions_mode : if not config.without_actions_mode :
ref = config.get_basics_emissions() ref = config.get_basics_emissions()
if not (ref is None):
diff_with_actions = (ref - total_emissions) / ref diff_with_actions = (ref - total_emissions) / ref
logger.info(f'Reduction percentage of emissions = {diff_with_actions*100} %') logger.info(f'Reduction percentage of emissions = {diff_with_actions*100} %')
logger.info('With the configuration : \n' + str(config.show_config()))
logger.info('Logs END') def main(args):
parser = argparse.ArgumentParser(description="")
parser.add_argument("-f", "--configfile", type=str, default='configs/default_config.json', required=False)
parser.add_argument("-save", "--save", action="store_true")
parser.add_argument("-ref", "--ref", action="store_true")
args = parser.parse_args(args)
# > py ./emissions.py -f configs/config1.json -save
# will load the configuration file "config1.json" and save logs into the logs directory
# > py ./emissions.py -f configs/config1.json -save -ref & py ./emissions.py -f configs/config1.json -save
# same as above but also launches a reference simulation by using -ref option
config = Config()
config.import_config_file(args.configfile)
config.init_traci()
logger = config.init_logger(save_logs=args.save)
if args.ref:
config.without_actions_mode = True
config.check_config()
logger.info(f'Reference simulation')
logger.info(f'Loaded configuration file : {args.configfile}')
run(config, logger)
if __name__ == '__main__': if __name__ == '__main__':
main() main(sys.argv[1:])

Binary file not shown.

After

Width:  |  Height:  |  Size: 489 KiB

View File

@ -1,9 +1,9 @@
from traci._trafficlight import Logic as SUMO_Logic
from typing import Tuple, Set from typing import Tuple, Set
from shapely.geometry import Point, LineString from shapely.geometry import Point, LineString
from shapely.geometry import Polygon from shapely.geometry import Polygon
from shapely.geometry.base import BaseGeometry from shapely.geometry.base import BaseGeometry
from traci._trafficlight import Logic as SUMO_Logic
class Lane: class Lane:
@ -51,7 +51,7 @@ class Area:
self.tls_adjusted = False self.tls_adjusted = False
self.rectangle = Polygon(coords) self.rectangle = Polygon(coords)
self.name = name self.name = name
self.emissions = 0.0 self.emissions_by_step = []
self._lanes: Set[Lane] = set() self._lanes: Set[Lane] = set()
self._tls: Set[TrafficLight] = set() self._tls: Set[TrafficLight] = set()
@ -77,6 +77,19 @@ class Area:
def remove_lane(self, lane: Lane): def remove_lane(self, lane: Lane):
self._lanes.remove(lane) self._lanes.remove(lane)
def sum_all_emissions(self):
sum = 0
for emission in self.emissions_by_step:
sum += emission
return sum
def sum_emissions_into_window(self, current_step, window_size):
sum = 0
q = current_step // window_size #Returns the integral part of the quotient
for i in range(q*window_size, current_step):
sum += self.emissions_by_step[i]
return sum
@classmethod @classmethod
def from_bounds(cls, xmin, ymin, xmax, ymax): def from_bounds(cls, xmin, ymin, xmax, ymax):
return cls(( return cls((