1
0
mirror of https://github.com/Ahp06/SUMO_Emissions.git synced 2024-11-25 04:56:30 +00:00

Added docstrings to all methods of the application

This commit is contained in:
Ahp06 2019-01-17 18:02:24 +01:00
parent 0ba52d2c9a
commit 5b812d2dc1
5 changed files with 900 additions and 594 deletions

View File

@ -1,85 +1,136 @@
""" """
Created on 17 oct. 2018 Created on 17 oct. 2018
@author: Axel Huynh-Phuc, Thibaud Gasser @author: Axel Huynh-Phuc, Thibaud Gasser
""" """
import traci
from traci._trafficlight import Logic from typing import Iterable
from typing import Iterable
import traci
from shapely.geometry.linestring import LineString from model import Area, Vehicle
from model import Area, Vehicle """
This module defines all possible actions on the simulation
"""
def compute_edge_weight(edge_id):
def compute_edge_weight(edge_id):
co2 = traci.edge.getCO2Emission(edge_id) """
co = traci.edge.getCOEmission(edge_id) Sum the different pollutant emissions on the edge with the identifier edge_id
nox = traci.edge.getNOxEmission(edge_id) :param edge_id: The edge ID
hc = traci.edge.getHCEmission(edge_id) :return: The sum (in mg) of all pollutant emissions
pmx = traci.edge.getPMxEmission(edge_id) """
co2 = traci.edge.getCO2Emission(edge_id)
return (co2 + co + nox + hc + pmx) co = traci.edge.getCOEmission(edge_id)
nox = traci.edge.getNOxEmission(edge_id)
def adjust_edges_weights(area): hc = traci.edge.getHCEmission(edge_id)
area.weight_adjusted = True pmx = traci.edge.getPMxEmission(edge_id)
for lane in area._lanes:
edge_id = traci.lane.getEdgeID(lane.lane_id) return co2 + co + nox + hc + pmx
weight = compute_edge_weight(edge_id) # by default edges weight = length/mean speed
traci.edge.setEffort(edge_id, weight)
def adjust_edges_weights(area):
for veh_id in traci.vehicle.getIDList(): """
traci.vehicle.rerouteEffort(veh_id) Changes the edge weight of all edges into the area
:param area: The Area object
def limit_speed_into_area(area: Area, vehicles: Iterable[Vehicle], speed_rf): :return:
area.limited_speed = True """
for lane in area._lanes: area.weight_adjusted = True
traci.lane.setMaxSpeed(lane.lane_id, speed_rf * lane.initial_max_speed) for lane in area._lanes:
edge_id = traci.lane.getEdgeID(lane.lane_id)
def modifyLogic(logic, rf): #rf for "reduction factor" weight = compute_edge_weight(edge_id) # by default edges weight = length/mean speed
new_phases = [] traci.edge.setEffort(edge_id, weight)
for phase in logic._phases:
new_phase = traci.trafficlight.Phase(phase.duration*rf,phase.minDuration*rf,phase.maxDuration*rf,phase.phaseDef) for veh_id in traci.vehicle.getIDList():
new_phases.append(new_phase) traci.vehicle.rerouteEffort(veh_id)
return traci.trafficlight.Logic("new-program", 0 , 0 , 0 , new_phases)
def limit_speed_into_area(area: Area, speed_rf):
def adjust_traffic_light_phase_duration(area, reduction_factor): """
area.tls_adjusted = True Limit the speed into the area by speed_rf factor
for tl in area._tls: :param area: The Area object
for logic in tl._logics: :param speed_rf: The speed reduction factor (must be positive)
traci.trafficlights.setCompleteRedYellowGreenDefinition(tl.tl_id, modifyLogic(logic,reduction_factor)) :return:
"""
def count_vehicles_in_area(area): area.limited_speed = True
vehicles_in_area = 0 for lane in area._lanes:
for lane in area._lanes: traci.lane.setMaxSpeed(lane.lane_id, speed_rf * lane.initial_max_speed)
vehicles_in_area += traci.lane.getLastStepVehicleNumber(lane.lane_id)
return vehicles_in_area
def modifyLogic(logic, rf):
def lock_area(area): """
area.locked = True Change the logic of a traffic light by decreasing the overall duration of the traffic light
for lane in area._lanes: :param logic: The Logic object
traci.lane.setDisallowed(lane.lane_id, 'passenger') :param rf: The reduction factor (must be positive)
:return: A new Logic object with all phases modified
def reverse_actions(area): """
#Reset max speed to original new_phases = []
if area.limited_speed: for phase in logic._phases:
area.limited_speed = False new_phase = traci.trafficlight.Phase(phase.duration * rf, phase.minDuration * rf, phase.maxDuration * rf,
for lane in area._lanes: phase.phaseDef)
traci.lane.setMaxSpeed(lane.lane_id, lane.initial_max_speed) new_phases.append(new_phase)
#Reset traffic lights initial duration return traci.trafficlight.Logic("new-program", 0, 0, 0, new_phases)
if area.tls_adjusted:
area.tls_adjusted = False
for tl in area._tls: def adjust_traffic_light_phase_duration(area, reduction_factor):
for initial_logic in tl._logics: """
traci.trafficlights.setCompleteRedYellowGreenDefinition(tl.tl_id, initial_logic._logic) Set all logics modification on traffic lights into the area
:param area: The Area object
#Unlock the area :param reduction_factor: The reduction factor (must be positive)
if area.locked: :return:
area.locked = False """
for lane in area._lanes: area.tls_adjusted = True
traci.lane.setAllowed(lane.lane_id, '') #empty means all classes are allowed for tl in area._tls:
for logic in tl._logics:
traci.trafficlights.setCompleteRedYellowGreenDefinition(tl.tl_id, modifyLogic(logic, reduction_factor))
def count_vehicles_in_area(area):
"""
Count the vehicles number into the area
:param area: The Area object
:return: The number of vehicles into the area
"""
vehicles_in_area = 0
for lane in area._lanes:
vehicles_in_area += traci.lane.getLastStepVehicleNumber(lane.lane_id)
return vehicles_in_area
def lock_area(area):
"""
Prohibits access to the area to a particular vehicle class
NOT FIXED : Some vehicles continue to go into the area if they can not turn around and stay there
:param area: The Area object
:return:
"""
area.locked = True
for lane in area._lanes:
# The passenger class is an example, you have to adapt this code
traci.lane.setDisallowed(lane.lane_id, 'passenger')
def reverse_actions(area):
"""
Reverse all actions made in an area
:param area: The Area object
:return:
"""
# Reset max speed to original
if area.limited_speed:
area.limited_speed = False
for lane in area._lanes:
traci.lane.setMaxSpeed(lane.lane_id, lane.initial_max_speed)
# Reset traffic lights initial duration
if area.tls_adjusted:
area.tls_adjusted = False
for tl in area._tls:
for initial_logic in tl._logics:
traci.trafficlights.setCompleteRedYellowGreenDefinition(tl.tl_id, initial_logic._logic)
# Unlock the area
if area.locked:
area.locked = False
for lane in area._lanes:
traci.lane.setAllowed(lane.lane_id, '') # empty means all classes are allowed

View File

@ -1,107 +1,143 @@
""" """
Global configuration for the simulation Created on 17 oct. 2018
"""
@author: Axel Huynh-Phuc, Thibaud Gasser
import datetime """
import json
import logging import datetime
import os import json
import sys import logging
import os
from model import Emission import sys
from model import Emission
class Config:
# Total of emissions of all pollutants in mg for n steps of simulation without acting on areas """
# These constants are simulation dependant, you must change them according to your simulation This module defines the global configuration for the simulation
ref200 = Emission(co2=42816869.05436445, co=1128465.0343051048, nox=18389.648337283958, hc=6154.330914019103, """
pmx=885.0829265236318)
def __init__(self): class Config:
"""Default constructor""" """
The Config class defines all simulation properties that can be changed
def import_config_file(self, config_file): """
with open(config_file, 'r') as f:
data = json.load(f) # Total of emissions of all pollutants in mg for n steps of simulation without acting on areas
# These constants are simulation dependant, you must change them according to your simulation
self._SUMOCMD = data["_SUMOCMD"] ref200 = Emission(co2=42816869.05436445, co=1128465.0343051048, nox=18389.648337283958, hc=6154.330914019103,
self._SUMOCFG = data["_SUMOCFG"] pmx=885.0829265236318)
self.areas_number = data["areas_number"] def __init__(self):
self.emissions_threshold = data["emissions_threshold"] """
self.n_steps = data["n_steps"] Default constructor
self.window_size = data["window_size"] """
self.without_actions_mode = data["without_actions_mode"] def import_config_file(self, config_file):
self.limit_speed_mode = data["limit_speed_mode"] """
self.speed_rf = data["speed_rf"] Import your configuration file in JSON format
self.adjust_traffic_light_mode = data["adjust_traffic_light_mode"] :param config_file: The path to your configuration file
self.trafficLights_duration_rf = data["trafficLights_duration_rf"] :return:
self.weight_routing_mode = data["weight_routing_mode"] """
self.lock_area_mode = data["lock_area_mode"] with open(config_file, 'r') as f:
data = json.load(f)
self.check_config()
self._SUMOCMD = data["_SUMOCMD"]
def check_config(self): self._SUMOCFG = data["_SUMOCFG"]
# Weight routing mode cannot be combinated with other actions
if self.weight_routing_mode: self.areas_number = data["areas_number"]
self.limit_speed_mode = False self.emissions_threshold = data["emissions_threshold"]
self.adjust_traffic_light_mode = False self.n_steps = data["n_steps"]
self.lock_area_mode = False self.window_size = data["window_size"]
# If without_actions_mode is choosen self.without_actions_mode = data["without_actions_mode"]
if self.without_actions_mode: self.limit_speed_mode = data["limit_speed_mode"]
self.limit_speed_mode = False self.speed_rf = data["speed_rf"]
self.adjust_traffic_light_mode = False self.adjust_traffic_light_mode = data["adjust_traffic_light_mode"]
self.weight_routing_mode = False self.trafficLights_duration_rf = data["trafficLights_duration_rf"]
self.lock_area_mode = False self.weight_routing_mode = data["weight_routing_mode"]
self.lock_area_mode = data["lock_area_mode"]
def __repr__(self) -> str:
return ( self.check_config()
f'grid : {self.areas_number}x{self.areas_number}\n'
f'step number = {self.n_steps}\n' def check_config(self):
f'window size = {self.window_size}\n' """
f'weight routing mode = {self.weight_routing_mode}\n' Check the relevance of user configuration choices
f'lock area mode = {self.lock_area_mode}\n' :return:
f'limit speed mode = {self.limit_speed_mode}, RF = {self.speed_rf * 100}%\n' """
f'adjust traffic light mode = {self.adjust_traffic_light_mode},' # Weight routing mode cannot be combinated with other actions
f'RF = {self.trafficLights_duration_rf * 100}%\n' if self.weight_routing_mode:
) self.limit_speed_mode = False
self.adjust_traffic_light_mode = False
def init_traci(self): self.lock_area_mode = False
if 'SUMO_HOME' in os.environ:
tools = os.path.join(os.environ['SUMO_HOME'], 'tools') # If without_actions_mode is choosen
sys.path.append(tools) if self.without_actions_mode:
else: self.limit_speed_mode = False
sys.exit("please declare environment variable 'SUMO_HOME'") self.adjust_traffic_light_mode = False
self.weight_routing_mode = False
sumo_binary = os.path.join(os.environ['SUMO_HOME'], 'bin', self._SUMOCMD) self.lock_area_mode = False
self.sumo_cmd = [sumo_binary, "-c", self._SUMOCFG]
def __repr__(self) -> str:
def init_logger(self, save_logs=False): """
now = datetime.datetime.now() :return: All properties chosen by the user
current_date = now.strftime("%Y_%m_%d_%H_%M_%S") """
return (
if not os.path.exists('logs'): f'grid : {self.areas_number}x{self.areas_number}\n'
os.makedirs('logs') f'step number = {self.n_steps}\n'
f'window size = {self.window_size}\n'
log_filename = f'logs/sumo_logs_{current_date}.log' f'weight routing mode = {self.weight_routing_mode}\n'
f'lock area mode = {self.lock_area_mode}\n'
logger = logging.getLogger("sumo_logger") f'limit speed mode = {self.limit_speed_mode}, RF = {self.speed_rf * 100}%\n'
logger.setLevel(logging.INFO) f'adjust traffic light mode = {self.adjust_traffic_light_mode},'
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") f'RF = {self.trafficLights_duration_rf * 100}%\n'
)
if save_logs:
file_handler = logging.FileHandler(log_filename) def init_traci(self):
file_handler.setFormatter(formatter) """
logger.addHandler(file_handler) Init the Traci API
:return:
handler = logging.StreamHandler() """
handler.setFormatter(formatter) if 'SUMO_HOME' in os.environ:
logger.addHandler(handler) tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
sys.path.append(tools)
return logger else:
sys.exit("please declare environment variable 'SUMO_HOME'")
def get_ref_emissions(self):
if self.n_steps == 200: sumo_binary = os.path.join(os.environ['SUMO_HOME'], 'bin', self._SUMOCMD)
return self.ref200 self.sumo_cmd = [sumo_binary, "-c", self._SUMOCFG]
def init_logger(self, save_logs=False):
"""
Init the application logger
:param save_logs: If save_logs is True, it will save the logs into the logs directory
:return:
"""
now = datetime.datetime.now()
current_date = now.strftime("%Y_%m_%d_%H_%M_%S")
if not os.path.exists('logs'):
os.makedirs('logs')
log_filename = f'logs/sumo_logs_{current_date}.log'
logger = logging.getLogger("sumo_logger")
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
if save_logs:
file_handler = logging.FileHandler(log_filename)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def get_ref_emissions(self):
"""
:return: Return the sum of all emissions (in mg) from the simulation of reference
"""
if self.n_steps == 200:
return self.ref200

View File

@ -1,22 +1,22 @@
{ {
"_SUMOCMD": "sumo", "_SUMOCMD": "sumo",
"_SUMOCFG": "simulations/mulhouse_simulation/osm.sumocfg", "_SUMOCFG": "simulations/mulhouse_simulation/osm.sumocfg",
"areas_number": 10, "areas_number": 10,
"emissions_threshold": 500000, "emissions_threshold": 500000,
"n_steps": 200, "n_steps": 200,
"window_size":100, "window_size":100,
"without_actions_mode": true, "without_actions_mode": false,
"limit_speed_mode": false, "limit_speed_mode": true,
"speed_rf": 0.1, "speed_rf": 0.1,
"adjust_traffic_light_mode": false, "adjust_traffic_light_mode": true,
"trafficLights_duration_rf": 0.2, "trafficLights_duration_rf": 0.2,
"weight_routing_mode": false, "weight_routing_mode": false,
"lock_area_mode": false "lock_area_mode": false
} }

View File

@ -1,242 +1,320 @@
import argparse """
import csv Created on 17 oct. 2018
import datetime
import itertools @author: Axel Huynh-Phuc, Thibaud Gasser
import os """
import sys
import time import argparse
from typing import List import csv
import datetime
import traci import itertools
from parse import search import os
from shapely.geometry import LineString import sys
import time
import actions from typing import List
from config import Config
from model import Area, Vehicle, Lane, TrafficLight, Phase, Logic, Emission import actions
import traci
def init_grid(simulation_bounds, areas_number, window_size): from config import Config
grid = list() from model import Area, Vehicle, Lane, TrafficLight, Phase, Logic, Emission
width = simulation_bounds[1][0] / areas_number from parse import search
height = simulation_bounds[1][1] / areas_number from shapely.geometry import LineString
for i in range(areas_number):
for j in range(areas_number): """
# bounds coordinates for the area : (xmin, ymin, xmax, ymax) This module defines the entry point of the application
ar_bounds = ((i * width, j * height), (i * width, (j + 1) * height), """
((i + 1) * width, (j + 1) * height), ((i + 1) * width, j * height))
name = 'Area ({},{})'.format(i, j) def init_grid(simulation_bounds, areas_number, window_size):
area = Area(ar_bounds, name, window_size) """
grid.append(area) Initialize the grid of the loaded map from the configuration
traci.polygon.add(area.name, ar_bounds, (255, 0, 0)) :param simulation_bounds: The map bounds
return grid :param areas_number: The number of areas
:param window_size: The size of the acquisition window
:return: A list of areas
def get_all_lanes() -> List[Lane]: """
lanes = [] grid = list()
for lane_id in traci.lane.getIDList(): width = simulation_bounds[1][0] / areas_number
polygon_lane = LineString(traci.lane.getShape(lane_id)) height = simulation_bounds[1][1] / areas_number
initial_max_speed = traci.lane.getMaxSpeed(lane_id) for i in range(areas_number):
lanes.append(Lane(lane_id, polygon_lane, initial_max_speed)) for j in range(areas_number):
return lanes # bounds coordinates for the area : (xmin, ymin, xmax, ymax)
ar_bounds = ((i * width, j * height), (i * width, (j + 1) * height),
((i + 1) * width, (j + 1) * height), ((i + 1) * width, j * height))
def parse_phase(phase_repr): name = 'Area ({},{})'.format(i, j)
duration = search('duration: {:f}', phase_repr) area = Area(ar_bounds, name, window_size)
min_duration = search('minDuration: {:f}', phase_repr) grid.append(area)
max_duration = search('maxDuration: {:f}', phase_repr) traci.polygon.add(area.name, ar_bounds, (255, 0, 0))
phase_def = search('phaseDef: {}\n', phase_repr) return grid
if phase_def is None:
phase_def = '' def get_all_lanes() -> List[Lane]:
else: """
phase_def = phase_def[0] Recover and creates a list of Lane objects
:return: The lanes list
return Phase(duration[0], min_duration[0], max_duration[0], phase_def) """
lanes = []
for lane_id in traci.lane.getIDList():
def add_data_to_areas(areas: List[Area]): polygon_lane = LineString(traci.lane.getShape(lane_id))
lanes = get_all_lanes() initial_max_speed = traci.lane.getMaxSpeed(lane_id)
for area in areas: lanes.append(Lane(lane_id, polygon_lane, initial_max_speed))
for lane in lanes: # add lanes return lanes
if area.rectangle.intersects(lane.polygon):
area.add_lane(lane)
for tl_id in traci.trafficlight.getIDList(): # add traffic lights def parse_phase(phase_repr):
if lane.lane_id in traci.trafficlight.getControlledLanes(tl_id): """
logics = [] Because the SUMO object Phase does not contain accessors,
for l in traci.trafficlight.getCompleteRedYellowGreenDefinition(tl_id): # add logics we parse the string representation to retrieve data members.
phases = [] :param phase_repr: The Phase string representation
for phase in traci.trafficlight.Logic.getPhases(l): # add phases to logics :return: An new Phase instance
phases.append(parse_phase(phase.__repr__())) """
logics.append(Logic(l, phases)) duration = search('duration: {:f}', phase_repr)
area.add_tl(TrafficLight(tl_id, logics)) min_duration = search('minDuration: {:f}', phase_repr)
max_duration = search('maxDuration: {:f}', phase_repr)
phase_def = search('phaseDef: {}\n', phase_repr)
def compute_vehicle_emissions(veh_id):
co2 = traci.vehicle.getCO2Emission(veh_id) if phase_def is None:
co = traci.vehicle.getCOEmission(veh_id) phase_def = ''
nox = traci.vehicle.getNOxEmission(veh_id) else:
hc = traci.vehicle.getHCEmission(veh_id) phase_def = phase_def[0]
pmx = traci.vehicle.getPMxEmission(veh_id)
return Phase(duration[0], min_duration[0], max_duration[0], phase_def)
return Emission(co2, co, nox, hc, pmx)
def add_data_to_areas(areas: List[Area]):
def get_all_vehicles() -> List[Vehicle]: """
vehicles = list() Adds all recovered data to different areas
for veh_id in traci.vehicle.getIDList(): :param areas: The list of areas
veh_pos = traci.vehicle.getPosition(veh_id) :return:
vehicle = Vehicle(veh_id, veh_pos) """
vehicle.emissions = compute_vehicle_emissions(veh_id) lanes = get_all_lanes()
vehicles.append(vehicle) for area in areas:
return vehicles for lane in lanes: # add lanes
if area.rectangle.intersects(lane.polygon):
area.add_lane(lane)
def get_emissions(grid: List[Area], vehicles: List[Vehicle], current_step, config, logger): for tl_id in traci.trafficlight.getIDList(): # add traffic lights
for area in grid: if lane.lane_id in traci.trafficlight.getControlledLanes(tl_id):
total_emissions = Emission() logics = []
for vehicle in vehicles: for l in traci.trafficlight.getCompleteRedYellowGreenDefinition(tl_id): # add logics
if vehicle.pos in area: phases = []
total_emissions += vehicle.emissions for phase in traci.trafficlight.Logic.getPhases(l): # add phases to logics
phases.append(parse_phase(phase.__repr__()))
area.emissions_by_step.append(total_emissions) logics.append(Logic(l, phases))
area.add_tl(TrafficLight(tl_id, logics))
if area.sum_emissions_into_window(current_step, config.window_size) >= config.emissions_threshold:
if config.limit_speed_mode and not area.limited_speed: def compute_vehicle_emissions(veh_id):
logger.info(f'Action - Decreased max speed into {area.name} by {config.speed_rf * 100}%') """
actions.limit_speed_into_area(area, vehicles, config.speed_rf) Recover the emissions of different pollutants from a vehicle and create an Emission instance
if config.adjust_traffic_light_mode and not area.tls_adjusted: :param veh_id:
logger.info( :return: A new Emission instance
f'Action - Decreased traffic lights duration by {config.trafficLights_duration_rf * 100}%') """
actions.adjust_traffic_light_phase_duration(area, config.trafficLights_duration_rf) co2 = traci.vehicle.getCO2Emission(veh_id)
co = traci.vehicle.getCOEmission(veh_id)
if config.lock_area_mode and not area.locked: nox = traci.vehicle.getNOxEmission(veh_id)
if actions.count_vehicles_in_area(area): hc = traci.vehicle.getHCEmission(veh_id)
logger.info(f'Action - {area.name} blocked') pmx = traci.vehicle.getPMxEmission(veh_id)
actions.lock_area(area)
return Emission(co2, co, nox, hc, pmx)
if config.weight_routing_mode and not area.weight_adjusted:
actions.adjust_edges_weights(area)
def get_all_vehicles() -> List[Vehicle]:
traci.polygon.setFilled(area.name, True) """
Recover all useful information about vehicles and creates a vehicles list
else: :return: A list of vehicles instances
actions.reverse_actions(area) """
traci.polygon.setFilled(area.name, False) vehicles = list()
for veh_id in traci.vehicle.getIDList():
veh_pos = traci.vehicle.getPosition(veh_id)
def get_reduction_percentage(ref, total): vehicle = Vehicle(veh_id, veh_pos)
return (ref - total) / ref * 100 vehicle.emissions = compute_vehicle_emissions(veh_id)
vehicles.append(vehicle)
return vehicles
def export_data_to_csv(config, grid):
csv_dir = os.path.join(SCRIPTDIR, 'csv')
if not os.path.exists(csv_dir): def get_emissions(grid: List[Area], vehicles: List[Vehicle], current_step, config, logger):
os.mkdir(csv_dir) """
For each area retrieves the acquired emissions in the window,
now = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S") and acts according to the configuration chosen by the user
:param grid: The list of areas
with open(f'csv/{now}.csv', 'w') as f: :param vehicles: The list of vehicles
writer = csv.writer(f) :param current_step: The simulation current step
# Write CSV headers :param config: The simulation configuration
writer.writerow(itertools.chain(('Step',), (a.name for a in grid))) :param logger: The simulation logger
# Write all areas emission value for each step :return:
for step in range(config.n_steps): """
em_for_step = (f'{a.emissions_by_step[step].value():.3f}' for a in grid) for area in grid:
writer.writerow(itertools.chain((step,), em_for_step)) total_emissions = Emission()
for vehicle in vehicles:
if vehicle.pos in area:
def run(config, logger): total_emissions += vehicle.emissions
grid = list()
try: area.emissions_by_step.append(total_emissions)
traci.start(config.sumo_cmd)
logger.info(f'Loaded simulation file : {config._SUMOCFG}') if area.sum_emissions_into_window(current_step) >= config.emissions_threshold:
logger.info('Loading data for the simulation')
start = time.perf_counter() if config.limit_speed_mode and not area.limited_speed:
logger.info(f'Action - Decreased max speed into {area.name} by {config.speed_rf * 100}%')
grid = init_grid(traci.simulation.getNetBoundary(), config.areas_number, config.window_size) actions.limit_speed_into_area(area, config.speed_rf)
add_data_to_areas(grid) if config.adjust_traffic_light_mode and not area.tls_adjusted:
logger.info(
loading_time = round(time.perf_counter() - start, 2) f'Action - Decreased traffic lights duration by {config.trafficLights_duration_rf * 100}%')
logger.info(f'Data loaded ({loading_time}s)') actions.adjust_traffic_light_phase_duration(area, config.trafficLights_duration_rf)
logger.info('Simulation started...') if config.lock_area_mode and not area.locked:
step = 0 if actions.count_vehicles_in_area(area):
while step < config.n_steps: # traci.simulation.getMinExpectedNumber() > 0: logger.info(f'Action - {area.name} blocked')
traci.simulationStep() actions.lock_area(area)
vehicles = get_all_vehicles() if config.weight_routing_mode and not area.weight_adjusted:
get_emissions(grid, vehicles, step, config, logger) actions.adjust_edges_weights(area)
step += 1
traci.polygon.setFilled(area.name, True)
print(f'step = {step}/{config.n_steps}', end='\r')
else:
finally: actions.reverse_actions(area)
traci.close(False) traci.polygon.setFilled(area.name, False)
export_data_to_csv(config, grid)
simulation_time = round(time.perf_counter() - start, 2) def get_reduction_percentage(ref, total):
logger.info(f'End of the simulation ({simulation_time}s)') """
logger.info(f'Real-time factor : {config.n_steps / simulation_time}') Return the reduction percentage of total emissions between reference and an other simulation
:param ref:
total_emissions = Emission() :param total:
for area in grid: :return:
total_emissions += area.sum_all_emissions() """
return (ref - total) / ref * 100
logger.info(f'Total emissions = {total_emissions.value()} mg')
if not config.without_actions_mode: def export_data_to_csv(config, grid):
ref = config.get_ref_emissions() """
if not (ref is None): Export all Emission objects as a CSV file into the csv directory
global_diff = (ref.value() - total_emissions.value()) / ref.value() :param config: The simulation configuration
:param grid: The list of areas
logger.info(f'Global reduction percentage of emissions = {global_diff * 100} %') :return:
logger.info(f'-> CO2 emissions = {get_reduction_percentage(ref.co2, total_emissions.co2)} %') """
logger.info(f'-> CO emissions = {get_reduction_percentage(ref.co, total_emissions.co)} %') csv_dir = 'csv'
logger.info(f'-> Nox emissions = {get_reduction_percentage(ref.nox, total_emissions.nox)} %') if not os.path.exists(csv_dir):
logger.info(f'-> HC emissions = {get_reduction_percentage(ref.hc, total_emissions.hc)} %') os.mkdir(csv_dir)
logger.info(f'-> PMx emissions = {get_reduction_percentage(ref.pmx, total_emissions.pmx)} %')
now = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
def add_options(parser): with open(f'csv/{now}.csv', 'w') as f:
parser.add_argument("-f", "--configfile", type=str, default='configs/default_config.json', required=False, writer = csv.writer(f)
help='Choose your configuration file from your working directory') # Write CSV headers
parser.add_argument("-save", "--save", action="store_true", writer.writerow(itertools.chain(('Step',), (a.name for a in grid)))
help='Save the logs into the logs folder') # Write all areas emission value for each step
parser.add_argument("-steps", "--steps", type=int, default=200, required=False, for step in range(config.n_steps):
help='Choose the simulated time (in seconds)') em_for_step = (f'{a.emissions_by_step[step].value():.3f}' for a in grid)
parser.add_argument("-ref", "--ref", action="store_true", writer.writerow(itertools.chain((step,), em_for_step))
help='Launch a reference simulation (without acting on areas)')
parser.add_argument("-gui", "--gui", action="store_true",
help="Set GUI mode") def run(config, logger):
"""
Run the simulation with the configuration chosen
def main(args): :param config: The simulation configuration
parser = argparse.ArgumentParser(description="") :param logger: The simulation logger
add_options(parser) :return:
args = parser.parse_args(args) """
grid = list()
config = Config() try:
config.import_config_file(args.configfile) traci.start(config.sumo_cmd)
config.init_traci() logger.info(f'Loaded simulation file : {config._SUMOCFG}')
logger = config.init_logger(save_logs=args.save) logger.info('Loading data for the simulation')
start = time.perf_counter()
if args.ref:
config.without_actions_mode = True grid = init_grid(traci.simulation.getNetBoundary(), config.areas_number, config.window_size)
logger.info(f'Reference simulation') add_data_to_areas(grid)
if args.steps: loading_time = round(time.perf_counter() - start, 2)
config.n_steps = args.steps logger.info(f'Data loaded ({loading_time}s)')
if args.gui: logger.info('Simulation started...')
config._SUMOCMD = "sumo-gui" step = 0
while step < config.n_steps: # traci.simulation.getMinExpectedNumber() > 0:
config.check_config() traci.simulationStep()
logger.info(f'Loaded configuration file : {args.configfile}') vehicles = get_all_vehicles()
logger.info(f'Simulated time : {args.steps}s') get_emissions(grid, vehicles, step, config, logger)
run(config, logger) step += 1
print(f'step = {step}/{config.n_steps}', end='\r')
if __name__ == '__main__':
main(sys.argv[1:]) finally:
traci.close(False)
export_data_to_csv(config, grid)
simulation_time = round(time.perf_counter() - start, 2)
logger.info(f'End of the simulation ({simulation_time}s)')
logger.info(f'Real-time factor : {config.n_steps / simulation_time}')
total_emissions = Emission()
for area in grid:
total_emissions += area.sum_all_emissions()
logger.info(f'Total emissions = {total_emissions.value()} mg')
if not config.without_actions_mode:
ref = config.get_ref_emissions()
if not (ref is None):
global_diff = (ref.value() - total_emissions.value()) / ref.value()
logger.info(f'Global reduction percentage of emissions = {global_diff * 100} %')
logger.info(f'-> CO2 emissions = {get_reduction_percentage(ref.co2, total_emissions.co2)} %')
logger.info(f'-> CO emissions = {get_reduction_percentage(ref.co, total_emissions.co)} %')
logger.info(f'-> Nox emissions = {get_reduction_percentage(ref.nox, total_emissions.nox)} %')
logger.info(f'-> HC emissions = {get_reduction_percentage(ref.hc, total_emissions.hc)} %')
logger.info(f'-> PMx emissions = {get_reduction_percentage(ref.pmx, total_emissions.pmx)} %')
def add_options(parser):
"""
Add command line options
:param parser: The command line parser
:return:
"""
parser.add_argument("-f", "--configfile", type=str, default='configs/default_config.json', required=False,
help='Choose your configuration file from your working directory')
parser.add_argument("-save", "--save", action="store_true",
help='Save the logs into the logs folder')
parser.add_argument("-steps", "--steps", type=int, default=200, required=False,
help='Choose the simulated time (in seconds)')
parser.add_argument("-ref", "--ref", action="store_true",
help='Launch a reference simulation (without acting on areas)')
parser.add_argument("-gui", "--gui", action="store_true",
help="Set GUI mode")
def main(args):
"""
The entry point of the application
:param args: Command line options
:return:
"""
parser = argparse.ArgumentParser(description="")
add_options(parser)
args = parser.parse_args(args)
config = Config()
config.import_config_file(args.configfile)
config.init_traci()
logger = config.init_logger(save_logs=args.save)
if args.ref:
config.without_actions_mode = True
logger.info(f'Reference simulation')
if args.steps:
config.n_steps = args.steps
if args.gui:
config._SUMOCMD = "sumo-gui"
config.check_config()
logger.info(f'Loaded configuration file : {args.configfile}')
logger.info(f'Simulated time : {args.steps}s')
run(config, logger)
if __name__ == '__main__':
main(sys.argv[1:])

View File

@ -1,139 +1,280 @@
import collections """
from typing import Tuple, Set Created on 17 oct. 2018
from shapely.geometry import Point, LineString @author: Axel Huynh-Phuc, Thibaud Gasser
from shapely.geometry import Polygon """
from shapely.geometry.base import BaseGeometry
from traci._trafficlight import Logic as SUMO_Logic import collections
from typing import Tuple, Set
class Lane: from shapely.geometry import Point, LineString
from shapely.geometry import Polygon
def __init__(self, lane_id: str, polygon: LineString, initial_max_speed: float): from shapely.geometry.base import BaseGeometry
self.polygon = polygon from traci._trafficlight import Logic as SUMO_Logic
self.lane_id = lane_id
self.initial_max_speed = initial_max_speed """
This module defines the business model of our application
def __hash__(self): """
"""Overrides the default implementation"""
return hash(self.lane_id)
class Lane:
"""
class Phase: The Lane class includes the polygon defining the lane
def __init__(self, duration: float, minDuration: float, maxDuration: float, phaseDef: str): and keep in memory the initial maximum speed on the lane
self.duration = duration """
self.minDuration = minDuration
self.maxDuration = maxDuration def __init__(self, lane_id: str, polygon: LineString, initial_max_speed: float):
self.phaseDef = phaseDef """
Lane constructor
def __repr__(self) -> str:
repr = f'Phase(duration:{self.duration},minDuration:{self.minDuration},maxDuration:{self.maxDuration},phaseDef:{self.phaseDef})' :param lane_id: The ID of the lane
return str(repr) :param polygon: The polygon defining the shape of the lane
:param initial_max_speed: The initial maximum speed
"""
class Logic: self.polygon = polygon
def __init__(self, logic: SUMO_Logic, phases: Set[Phase]): self.lane_id = lane_id
self._logic = logic self.initial_max_speed = initial_max_speed
self._phases: Set[Phase] = phases
def __hash__(self):
"""Overrides the default implementation"""
class TrafficLight: return hash(self.lane_id)
def __init__(self, tl_id: str, logics: Set[Logic]):
self.tl_id = tl_id class Phase:
self._logics: Set[Logic] = logics """
The Phase class defines a phase of a traffic light
def __hash__(self): """
"""Overrides the default implementation"""
return hash(self.tl_id) def __init__(self, duration: float, minDuration: float, maxDuration: float, phaseDef: str):
"""
Phase constructor
class Emission:
def __init__(self, co2=0, co=0, nox=0, hc=0, pmx=0): :param duration: The duration of the phase (in seconds)
self.co2 = co2 :param minDuration: The minimum duration of the phase
self.co = co :param maxDuration: The maximum duration of the phase
self.nox = nox :param phaseDef: The definition of the phase, following the definition rules of SUMO
self.hc = hc (See : http://sumo.dlr.de/wiki/Simulation/Traffic_Lights#.3Cphase.3E_Attributes)
self.pmx = pmx """
def __add__(self, other): self.duration = duration
return Emission(self.co2 + other.co2, self.co + other.co, self.nox + other.nox, self.hc + other.hc, self.minDuration = minDuration
self.pmx + other.pmx) self.maxDuration = maxDuration
self.phaseDef = phaseDef
def value(self):
return self.co2 + self.co + self.nox + self.hc + self.pmx def __repr__(self) -> str:
"""
def __repr__(self) -> str: :return: The Phase string representation
repr = f'Emission(co2={self.co2},co={self.co},nox={self.nox},hc={self.hc},pmx={self.pmx})' """
return str(repr) repr = f'Phase(duration:{self.duration},minDuration:{self.minDuration},maxDuration:{self.maxDuration},phaseDef:{self.phaseDef})'
return str(repr)
class Area:
class Logic:
def __init__(self, coords, name, window_size): """
self.limited_speed = False The Logic class defines the strategy of a traffic light.
self.locked = False This class includes the Logic instance of SUMO with all phases corresponding to it.
self.tls_adjusted = False A Logic object contains multiple phases.
self.weight_adjusted = False """
self.rectangle = Polygon(coords)
self.name = name def __init__(self, logic: SUMO_Logic, phases: Set[Phase]):
self.emissions_by_step = [] """
self.window = collections.deque(maxlen=window_size) Logic constructor
self._lanes: Set[Lane] = set() :param logic: The SUMO Logic object
self._tls: Set[TrafficLight] = set() :param phases: The list of phases belonging to this logic
"""
def __eq__(self, other): self._logic = logic
return self.rectangle.__eq__(other) self._phases: Set[Phase] = phases
def __contains__(self, item):
return self.rectangle.contains(item) class TrafficLight:
"""
@property This TrafficLight class defines a traffic light
def bounds(self): """
return self.rectangle.bounds
def __init__(self, tl_id: str, logics: Set[Logic]):
def intersects(self, other: BaseGeometry) -> bool: """
return self.rectangle.intersects(other) TrafficLight constructor
:param tl_id: The traffic light ID
def add_lane(self, lane: Lane): :param logics: The list of logics belonging to the traffic light
self._lanes.add(lane) """
self.tl_id = tl_id
def add_tl(self, tl: TrafficLight): self._logics: Set[Logic] = logics
self._tls.add(tl)
def __hash__(self):
def remove_lane(self, lane: Lane): """Overrides the default implementation"""
self._lanes.remove(lane) return hash(self.tl_id)
def sum_all_emissions(self):
sum = Emission() class Emission:
for emission in self.emissions_by_step: """
sum += emission This class defines the different pollutant emissions
return sum """
def sum_emissions_into_window(self, current_step, window_size): def __init__(self, co2=0, co=0, nox=0, hc=0, pmx=0):
"""
self.window.appendleft(self.emissions_by_step[current_step].value()) Emission constructor
:param co2: Quantity of CO2(in mg)
sum = 0 :param co: Quantity of C0(in mg)
for i in range(self.window.__len__()): :param nox: Quantity of Nox(in mg)
sum += self.window[i] :param hc: Quantity of HC(in mg)
return sum :param pmx: Quantity of PMx(in mg)
"""
@classmethod self.co2 = co2
def from_bounds(cls, xmin, ymin, xmax, ymax): self.co = co
return cls(( self.nox = nox
(xmin, ymin), self.hc = hc
(xmin, ymax), self.pmx = pmx
(xmax, ymax),
(xmax, ymin))) def __add__(self, other):
"""
Add two emission objects
class Vehicle: :param other: The other Emission object to add
:return: A new object whose emission values are the sum of both Emission object
def __init__(self, veh_id: int, pos: Tuple[float, float]): """
self.emissions: Emission = Emission() return Emission(self.co2 + other.co2, self.co + other.co, self.nox + other.nox, self.hc + other.hc,
self.veh_id = veh_id self.pmx + other.pmx)
self.pos = Point(pos)
def value(self):
def __repr__(self) -> str: """
return str(self.__dict__) :return: The sum of all emissions
"""
return self.co2 + self.co + self.nox + self.hc + self.pmx
def __repr__(self) -> str:
"""
:return: The Emission string representation
"""
repr = f'Emission(co2={self.co2},co={self.co},nox={self.nox},hc={self.hc},pmx={self.pmx})'
return str(repr)
class Area:
"""
The Area class defines a grid area of the map
"""
def __init__(self, coords, name, window_size):
"""
Area constructor
:param coords: The coordinates of the zone,
defined by the bounds coordinates of this area : (xmin, ymin, xmax, ymax)
:param name: The Area name
:param window_size: The size of the acquisition window
"""
self.limited_speed = False
self.locked = False
self.tls_adjusted = False
self.weight_adjusted = False
self.rectangle = Polygon(coords)
self.name = name
self.emissions_by_step = []
self.window = collections.deque(maxlen=window_size)
self._lanes: Set[Lane] = set()
self._tls: Set[TrafficLight] = set()
def __eq__(self, other):
"""
Overrides the equal definition
:param other: The other Area object
:return: True if the two rectangles are equals
"""
return self.rectangle.__eq__(other)
def __contains__(self, item):
"""
:param item: A position on the map
:return: True if the area contains the item
"""
return self.rectangle.contains(item)
@property
def bounds(self):
"""
Return the bounds rectangle of this area
:return:
"""
return self.rectangle.bounds
def intersects(self, other: BaseGeometry) -> bool:
"""
:param other: A BaseGeometry object
:return: True if this area intersects with other
"""
return self.rectangle.intersects(other)
def add_lane(self, lane: Lane):
"""
Add a new lane object into lanes list
:param lane: A Lane object
:return:
"""
self._lanes.add(lane)
def add_tl(self, tl: TrafficLight):
"""
Add a new trafficLight object into lanes list
:param tl: A TrafficLight object
:return:
"""
self._tls.add(tl)
def remove_lane(self, lane: Lane):
"""
Remove a lane from lanes list
:param lane: The Lane object to remove
:return:
"""
self._lanes.remove(lane)
def sum_all_emissions(self):
"""
Sum all Emissions object from initial step to final step
:return: The sum Emission object
"""
sum = Emission()
for emission in self.emissions_by_step:
sum += emission
return sum
def sum_emissions_into_window(self, current_step):
"""
Sum all Emissions object into the acquisition window
:param current_step: The current step of the simulation
:return:
"""
self.window.appendleft(self.emissions_by_step[current_step].value())
sum = 0
for i in range(self.window.__len__()):
sum += self.window[i]
return sum
@classmethod
def from_bounds(cls, xmin, ymin, xmax, ymax):
return cls((
(xmin, ymin),
(xmin, ymax),
(xmax, ymax),
(xmax, ymin)))
class Vehicle:
"""
The Vehicle class defines a vehicle object
"""
def __init__(self, veh_id: int, pos: Tuple[float, float]):
"""
Vehicle constructor
:param veh_id: The vehicle ID
:param pos: The position of the vehicle one the map
"""
self.emissions: Emission = Emission()
self.veh_id = veh_id
self.pos = Point(pos)
def __repr__(self) -> str:
"""
:return: The Vehicle string representation
"""
return str(self.__dict__)