1
0
mirror of https://github.com/Ahp06/SUMO_Emissions.git synced 2024-11-22 03:26:30 +00:00
This commit is contained in:
Ahp06 2019-01-17 10:11:31 +01:00
commit 0ba52d2c9a
4 changed files with 230 additions and 148 deletions

View File

@ -13,17 +13,22 @@ from model import Area, Vehicle
def compute_edge_weight(edge_id):
return (traci.edge.getCOEmission(edge_id)
+ traci.edge.getNOxEmission(edge_id)
+ traci.edge.getHCEmission(edge_id)
+ traci.edge.getPMxEmission(edge_id)
+ traci.edge.getCO2Emission(edge_id))/(traci.edge.getLaneNumber(edge_id))
def adjust_edges_weights():
for edge_id in traci.edge.getIDList():
co2 = traci.edge.getCO2Emission(edge_id)
co = traci.edge.getCOEmission(edge_id)
nox = traci.edge.getNOxEmission(edge_id)
hc = traci.edge.getHCEmission(edge_id)
pmx = traci.edge.getPMxEmission(edge_id)
return (co2 + co + nox + hc + pmx)
def adjust_edges_weights(area):
area.weight_adjusted = True
for lane in area._lanes:
edge_id = traci.lane.getEdgeID(lane.lane_id)
weight = compute_edge_weight(edge_id) # by default edges weight = length/mean speed
traci.edge.setEffort(edge_id, weight)
for veh_id in traci.vehicle.getIDList():
traci.vehicle.rerouteEffort(veh_id)
@ -31,7 +36,7 @@ def limit_speed_into_area(area: Area, vehicles: Iterable[Vehicle], speed_rf):
area.limited_speed = True
for lane in area._lanes:
traci.lane.setMaxSpeed(lane.lane_id, speed_rf * lane.initial_max_speed)
def modifyLogic(logic, rf): #rf for "reduction factor"
new_phases = []
for phase in logic._phases:

View File

@ -8,30 +8,30 @@ import logging
import os
import sys
from model import Emission
class Config:
class Config:
# Total of emissions of all pollutants in mg for n steps of simulation without acting on areas
# These constants are simulation dependant, you must change them according to your simulation
total_emissions100 = 13615949.148296086
total_emissions200 = 43970763.15084738
total_emissions300 = 87382632.0821697
ref200 = Emission(co2=42816869.05436445, co=1128465.0343051048, nox=18389.648337283958, hc=6154.330914019103,
pmx=885.0829265236318)
def __init__(self):
'''Default constructor'''
def import_config_file(self,config_file):
with open(config_file,'r') as f:
"""Default constructor"""
def import_config_file(self, config_file):
with open(config_file, 'r') as f:
data = json.load(f)
self._SUMOCMD = data["_SUMOCMD"]
self._SUMOCFG = data["_SUMOCFG"]
self.areas_number = data["areas_number"]
self.emissions_threshold = data["emissions_threshold"]
self.n_steps = data["n_steps"]
self.window_size = data["window_size"]
self.without_actions_mode = data["without_actions_mode"]
self.limit_speed_mode = data["limit_speed_mode"]
self.speed_rf = data["speed_rf"]
@ -39,58 +39,59 @@ class Config:
self.trafficLights_duration_rf = data["trafficLights_duration_rf"]
self.weight_routing_mode = data["weight_routing_mode"]
self.lock_area_mode = data["lock_area_mode"]
self.check_config()
def check_config(self):
#Weight routing mode cannot be combinated with other actions
# Weight routing mode cannot be combinated with other actions
if self.weight_routing_mode:
self.limit_speed_mode = False
self.adjust_traffic_light_mode = False
self.lock_area_mode = False
#If without_actions_mode is choosen
# If without_actions_mode is choosen
if self.without_actions_mode:
self.limit_speed_mode = False
self.adjust_traffic_light_mode = False
self.weight_routing_mode = False
self.lock_area_mode = False
def __repr__(self) -> str:
return (str(f'grid : {self.areas_number}x{self.areas_number}\n')
+ str(f'step number = {self.n_steps}\n')
+ str(f'window size = {self.window_size}\n')
+ str(f'weight routing mode = {self.weight_routing_mode}\n')
+ str(f'lock area mode = {self.lock_area_mode}\n')
+ str(f'limit speed mode = {self.limit_speed_mode}, RF = {self.speed_rf*100}%\n')
+ str(f'adjust traffic light mode = {self.adjust_traffic_light_mode} , RF = {self.trafficLights_duration_rf*100}%\n'))
return (
f'grid : {self.areas_number}x{self.areas_number}\n'
f'step number = {self.n_steps}\n'
f'window size = {self.window_size}\n'
f'weight routing mode = {self.weight_routing_mode}\n'
f'lock area mode = {self.lock_area_mode}\n'
f'limit speed mode = {self.limit_speed_mode}, RF = {self.speed_rf * 100}%\n'
f'adjust traffic light mode = {self.adjust_traffic_light_mode},'
f'RF = {self.trafficLights_duration_rf * 100}%\n'
)
def init_traci(self):
if 'SUMO_HOME' in os.environ:
tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
sys.path.append(tools)
else:
sys.exit("please declare environment variable 'SUMO_HOME'")
sumo_binary = os.path.join(os.environ['SUMO_HOME'], 'bin', self._SUMOCMD)
self.sumo_cmd = [sumo_binary, "-c", self._SUMOCFG]
def init_logger(self, save_logs = False):
def init_logger(self, save_logs=False):
now = datetime.datetime.now()
current_date = now.strftime("%Y_%m_%d_%H_%M_%S")
if not os.path.exists('logs'):
os.makedirs('logs')
log_filename = f'logs/sumo_logs_{current_date}.log'
logger = logging.getLogger("sumo_logger")
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
if save_logs :
if save_logs:
file_handler = logging.FileHandler(log_filename)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
@ -98,15 +99,9 @@ class Config:
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def get_basics_emissions(self):
if self.n_steps == 100:
return self.total_emissions100
def get_ref_emissions(self):
if self.n_steps == 200:
return self.total_emissions200
if self.n_steps == 300:
return self.total_emissions300
return self.ref200

View File

@ -1,19 +1,21 @@
import argparse
import csv
import datetime
import itertools
import os
import sys
import time
from traci import trafficlight
import traci
from typing import List
import traci
from parse import search
from shapely.geometry import LineString
import actions
from config import Config
from model import Area, Vehicle, Lane , TrafficLight , Phase , Logic
from model import Area, Vehicle, Lane, TrafficLight, Phase, Logic, Emission
def init_grid(simulation_bounds, areas_number):
def init_grid(simulation_bounds, areas_number, window_size):
grid = list()
width = simulation_bounds[1][0] / areas_number
height = simulation_bounds[1][1] / areas_number
@ -22,10 +24,10 @@ def init_grid(simulation_bounds, areas_number):
# bounds coordinates for the area : (xmin, ymin, xmax, ymax)
ar_bounds = ((i * width, j * height), (i * width, (j + 1) * height),
((i + 1) * width, (j + 1) * height), ((i + 1) * width, j * height))
area = Area(ar_bounds)
area.name = 'Area ({},{})'.format(i, j)
name = 'Area ({},{})'.format(i, j)
area = Area(ar_bounds, name, window_size)
grid.append(area)
traci.polygon.add(area.name, ar_bounds, (0, 255, 0))
traci.polygon.add(area.name, ar_bounds, (255, 0, 0))
return grid
@ -40,14 +42,16 @@ def get_all_lanes() -> List[Lane]:
def parse_phase(phase_repr):
duration = search('duration: {:f}', phase_repr)
minDuration = search('minDuration: {:f}', phase_repr)
maxDuration = search('maxDuration: {:f}', phase_repr)
phaseDef = search('phaseDef: {}\n', phase_repr)
min_duration = search('minDuration: {:f}', phase_repr)
max_duration = search('maxDuration: {:f}', phase_repr)
phase_def = search('phaseDef: {}\n', phase_repr)
if phaseDef is None: phaseDef = ''
else : phaseDef = phaseDef[0]
if phase_def is None:
phase_def = ''
else:
phase_def = phase_def[0]
return Phase(duration[0], minDuration[0], maxDuration[0], phaseDef)
return Phase(duration[0], min_duration[0], max_duration[0], phase_def)
def add_data_to_areas(areas: List[Area]):
@ -63,16 +67,18 @@ def add_data_to_areas(areas: List[Area]):
phases = []
for phase in traci.trafficlight.Logic.getPhases(l): # add phases to logics
phases.append(parse_phase(phase.__repr__()))
logics.append(Logic(l, phases))
logics.append(Logic(l, phases))
area.add_tl(TrafficLight(tl_id, logics))
def compute_vehicle_emissions(veh_id):
return (traci.vehicle.getCOEmission(veh_id)
+traci.vehicle.getNOxEmission(veh_id)
+traci.vehicle.getHCEmission(veh_id)
+traci.vehicle.getPMxEmission(veh_id)
+traci.vehicle.getCO2Emission(veh_id))
co2 = traci.vehicle.getCO2Emission(veh_id)
co = traci.vehicle.getCOEmission(veh_id)
nox = traci.vehicle.getNOxEmission(veh_id)
hc = traci.vehicle.getHCEmission(veh_id)
pmx = traci.vehicle.getPMxEmission(veh_id)
return Emission(co2, co, nox, hc, pmx)
def get_all_vehicles() -> List[Vehicle]:
@ -87,32 +93,57 @@ def get_all_vehicles() -> List[Vehicle]:
def get_emissions(grid: List[Area], vehicles: List[Vehicle], current_step, config, logger):
for area in grid:
vehicle_emissions = 0
total_emissions = Emission()
for vehicle in vehicles:
if vehicle.pos in area:
vehicle_emissions += vehicle.emissions
area.emissions_by_step.append(vehicle_emissions)
if area.sum_emissions_into_window(current_step, config.window_size) >= config.emissions_threshold:
total_emissions += vehicle.emissions
area.emissions_by_step.append(total_emissions)
if area.sum_emissions_into_window(current_step, config.window_size) >= config.emissions_threshold:
if config.limit_speed_mode and not area.limited_speed:
logger.info(f'Action - Decreased max speed into {area.name} by {config.speed_rf*100}%')
logger.info(f'Action - Decreased max speed into {area.name} by {config.speed_rf * 100}%')
actions.limit_speed_into_area(area, vehicles, config.speed_rf)
if config.adjust_traffic_light_mode and not area.tls_adjusted:
logger.info(f'Action - Decreased traffic lights duration by {config.trafficLights_duration_rf*100}%')
logger.info(
f'Action - Decreased traffic lights duration by {config.trafficLights_duration_rf * 100}%')
actions.adjust_traffic_light_phase_duration(area, config.trafficLights_duration_rf)
if config.lock_area_mode and not area.locked:
if actions.count_vehicles_in_area(area):
logger.info(f'Action - {area.name} blocked')
actions.lock_area(area)
traci.polygon.setColor(area.name, (255, 0, 0))
if config.weight_routing_mode and not area.weight_adjusted:
actions.adjust_edges_weights(area)
traci.polygon.setFilled(area.name, True)
else:
actions.reverse_actions(area)
traci.polygon.setFilled(area.name, False)
def get_reduction_percentage(ref, total):
return (ref - total) / ref * 100
def export_data_to_csv(config, grid):
csv_dir = os.path.join(SCRIPTDIR, 'csv')
if not os.path.exists(csv_dir):
os.mkdir(csv_dir)
now = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
with open(f'csv/{now}.csv', 'w') as f:
writer = csv.writer(f)
# Write CSV headers
writer.writerow(itertools.chain(('Step',), (a.name for a in grid)))
# Write all areas emission value for each step
for step in range(config.n_steps):
em_for_step = (f'{a.emissions_by_step[step].value():.3f}' for a in grid)
writer.writerow(itertools.chain((step,), em_for_step))
def run(config, logger):
@ -122,69 +153,90 @@ def run(config, logger):
logger.info(f'Loaded simulation file : {config._SUMOCFG}')
logger.info('Loading data for the simulation')
start = time.perf_counter()
grid = init_grid(traci.simulation.getNetBoundary(), config.areas_number)
grid = init_grid(traci.simulation.getNetBoundary(), config.areas_number, config.window_size)
add_data_to_areas(grid)
loading_time = round(time.perf_counter() - start, 2)
logger.info(f'Data loaded ({loading_time}s)')
logger.info('Start of the simulation')
step = 0
while step < config.n_steps : # traci.simulation.getMinExpectedNumber() > 0:
logger.info('Simulation started...')
step = 0
while step < config.n_steps: # traci.simulation.getMinExpectedNumber() > 0:
traci.simulationStep()
vehicles = get_all_vehicles()
get_emissions(grid, vehicles, step, config, logger)
if config.weight_routing_mode:
actions.adjust_edges_weights()
step += 1
print(f'step = {step}/{config.n_steps}', end='\r')
finally:
traci.close(False)
export_data_to_csv(config, grid)
simulation_time = round(time.perf_counter() - start, 2)
logger.info(f'End of the simulation ({simulation_time}s)')
total_emissions = 0
logger.info(f'Real-time factor : {config.n_steps / simulation_time}')
total_emissions = Emission()
for area in grid:
total_emissions += area.sum_all_emissions()
logger.info(f'Total emissions = {total_emissions} mg')
if not config.without_actions_mode :
ref = config.get_basics_emissions()
logger.info(f'Total emissions = {total_emissions.value()} mg')
if not config.without_actions_mode:
ref = config.get_ref_emissions()
if not (ref is None):
diff_with_actions = (ref - total_emissions) / ref
logger.info(f'Reduction percentage of emissions = {diff_with_actions*100} %')
global_diff = (ref.value() - total_emissions.value()) / ref.value()
logger.info(f'Global reduction percentage of emissions = {global_diff * 100} %')
logger.info(f'-> CO2 emissions = {get_reduction_percentage(ref.co2, total_emissions.co2)} %')
logger.info(f'-> CO emissions = {get_reduction_percentage(ref.co, total_emissions.co)} %')
logger.info(f'-> Nox emissions = {get_reduction_percentage(ref.nox, total_emissions.nox)} %')
logger.info(f'-> HC emissions = {get_reduction_percentage(ref.hc, total_emissions.hc)} %')
logger.info(f'-> PMx emissions = {get_reduction_percentage(ref.pmx, total_emissions.pmx)} %')
def add_options(parser):
parser.add_argument("-f", "--configfile", type=str, default='configs/default_config.json', required=False,
help='Choose your configuration file from your working directory')
parser.add_argument("-save", "--save", action="store_true",
help='Save the logs into the logs folder')
parser.add_argument("-steps", "--steps", type=int, default=200, required=False,
help='Choose the simulated time (in seconds)')
parser.add_argument("-ref", "--ref", action="store_true",
help='Launch a reference simulation (without acting on areas)')
parser.add_argument("-gui", "--gui", action="store_true",
help="Set GUI mode")
def main(args):
parser = argparse.ArgumentParser(description="")
parser.add_argument("-f", "--configfile", type=str, default='configs/default_config.json', required=False)
parser.add_argument("-save", "--save", action="store_true")
parser.add_argument("-ref", "--ref", action="store_true")
add_options(parser)
args = parser.parse_args(args)
# > py ./emissions.py -f configs/config1.json -save
# will load the configuration file "config1.json" and save logs into the logs directory
# > py ./emissions.py -f configs/config1.json -save -ref & py ./emissions.py -f configs/config1.json -save
# same as above but also launches a reference simulation by using -ref option
config = Config()
config.import_config_file(args.configfile)
config.init_traci()
logger = config.init_logger(save_logs=args.save)
if args.ref:
if args.ref:
config.without_actions_mode = True
config.check_config()
logger.info(f'Reference simulation')
if args.steps:
config.n_steps = args.steps
if args.gui:
config._SUMOCMD = "sumo-gui"
config.check_config()
logger.info(f'Loaded configuration file : {args.configfile}')
logger.info(f'Simulated time : {args.steps}s')
run(config, logger)
if __name__ == '__main__':
main(sys.argv[1:])

View File

@ -1,9 +1,10 @@
from traci._trafficlight import Logic as SUMO_Logic
import collections
from typing import Tuple, Set
from shapely.geometry import Point, LineString
from shapely.geometry import Polygon
from shapely.geometry.base import BaseGeometry
from traci._trafficlight import Logic as SUMO_Logic
class Lane:
@ -17,43 +18,69 @@ class Lane:
"""Overrides the default implementation"""
return hash(self.lane_id)
class Phase:
def __init__(self, duration: float, minDuration: float, maxDuration : float, phaseDef: str):
self.duration = duration
def __init__(self, duration: float, minDuration: float, maxDuration: float, phaseDef: str):
self.duration = duration
self.minDuration = minDuration
self.maxDuration = maxDuration
self.phaseDef = phaseDef
self.phaseDef = phaseDef
def __repr__(self) -> str:
repr = f'Phase(duration:{self.duration},minDuration:{self.minDuration},maxDuration:{self.maxDuration},phaseDef:{self.phaseDef})'
return str(repr)
class Logic:
def __init__(self, logic: SUMO_Logic, phases: Set[Phase]):
self._logic = logic
self._phases: Set[Phase] = phases
class TrafficLight:
class TrafficLight:
def __init__(self, tl_id: str, logics: Set[Logic]):
self.tl_id = tl_id
self.tl_id = tl_id
self._logics: Set[Logic] = logics
def __hash__(self):
"""Overrides the default implementation"""
return hash(self.tl_id)
class Emission:
def __init__(self, co2=0, co=0, nox=0, hc=0, pmx=0):
self.co2 = co2
self.co = co
self.nox = nox
self.hc = hc
self.pmx = pmx
def __add__(self, other):
return Emission(self.co2 + other.co2, self.co + other.co, self.nox + other.nox, self.hc + other.hc,
self.pmx + other.pmx)
def value(self):
return self.co2 + self.co + self.nox + self.hc + self.pmx
def __repr__(self) -> str:
repr = f'Emission(co2={self.co2},co={self.co},nox={self.nox},hc={self.hc},pmx={self.pmx})'
return str(repr)
class Area:
def __init__(self, coords, name=''):
def __init__(self, coords, name, window_size):
self.limited_speed = False
self.locked = False
self.tls_adjusted = False
self.weight_adjusted = False
self.rectangle = Polygon(coords)
self.name = name
self.emissions_by_step = []
self.emissions_by_step = []
self.window = collections.deque(maxlen=window_size)
self._lanes: Set[Lane] = set()
self._tls: Set[TrafficLight] = set()
self._tls: Set[TrafficLight] = set()
def __eq__(self, other):
return self.rectangle.__eq__(other)
@ -70,24 +97,26 @@ class Area:
def add_lane(self, lane: Lane):
self._lanes.add(lane)
def add_tl(self, tl: TrafficLight):
self._tls.add(tl)
def remove_lane(self, lane: Lane):
self._lanes.remove(lane)
def sum_all_emissions(self):
sum = 0
sum = Emission()
for emission in self.emissions_by_step:
sum += emission
return sum
return sum
def sum_emissions_into_window(self, current_step, window_size):
self.window.appendleft(self.emissions_by_step[current_step].value())
sum = 0
q = current_step // window_size #Returns the integral part of the quotient
for i in range(q*window_size, current_step):
sum += self.emissions_by_step[i]
for i in range(self.window.__len__()):
sum += self.window[i]
return sum
@classmethod
@ -98,10 +127,11 @@ class Area:
(xmax, ymax),
(xmax, ymin)))
class Vehicle:
def __init__(self, veh_id: int, pos: Tuple[float, float]):
self.emissions: float = 0.0
self.emissions: Emission = Emission()
self.veh_id = veh_id
self.pos = Point(pos)