1
0
mirror of https://github.com/Ahp06/SUMO_Emissions.git synced 2024-11-22 03:26:30 +00:00

Merge pull request #4 from Ahp06/Axel

Axel
This commit is contained in:
Axel HUYNH-PHUC 2018-12-07 22:18:10 +01:00 committed by GitHub
commit b012c4c4a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 445 additions and 246 deletions

View File

@ -1,44 +1,78 @@
""" """
Created on 17 oct. 2018 Created on 17 oct. 2018
@author: Axel Huynh-Phuc, Thibaud Gasser @author: Axel Huynh-Phuc, Thibaud Gasser
""" """
from typing import Iterable from typing import Iterable
import traci import traci
from shapely.geometry.linestring import LineString from shapely.geometry.linestring import LineString
from model import Area, Vehicle from model import Area, Vehicle
from traci._trafficlight import Logic
def remove_vehicle(veh_id): def compute_edge_weight(edge_id):
traci.vehicle.remove(veh_id, traci.constants.REMOVE_PARKING) return (traci.edge.getCOEmission(edge_id)
+ traci.edge.getNOxEmission(edge_id)
+ traci.edge.getHCEmission(edge_id)
def lanes_in_area(area): + traci.edge.getPMxEmission(edge_id)
for lane_id in traci.lane.getIDList(): + traci.edge.getCO2Emission(edge_id))/(traci.edge.getLaneNumber(edge_id))
polygon_lane = LineString(traci.lane.getShape(lane_id))
if area.rectangle.intersects(polygon_lane): def adjust_edges_weights():
yield lane_id for edge_id in traci.edge.getIDList():
weight = compute_edge_weight(edge_id) # by default edges weight = length/mean speed
traci.edge.setEffort(edge_id, weight)
def compute_edge_weight(edge_id):
return (traci.edge.getCOEmission(edge_id) for veh_id in traci.vehicle.getIDList():
+ traci.edge.getNOxEmission(edge_id) traci.vehicle.rerouteEffort(veh_id)
+ traci.edge.getHCEmission(edge_id)
+ traci.edge.getPMxEmission(edge_id) def limit_speed_into_area(area: Area, vehicles: Iterable[Vehicle], speed_rf):
+ traci.edge.getCO2Emission(edge_id)) area.limited_speed = True
for lane in area._lanes:
traci.lane.setMaxSpeed(lane.lane_id, speed_rf * lane.initial_max_speed)
def adjust_edges_weights():
for edge_id in traci.edge.getIDList(): def modifyLogic(logic, rf): #rf for "reduction factor"
weight = compute_edge_weight(edge_id) # by default edges weight = length/mean speed new_phases = []
traci.edge.adaptTraveltime(edge_id, weight) for phase in logic._phases:
new_phase = traci.trafficlight.Phase(phase.duration*rf,phase.minDuration*rf,phase.maxDuration*rf,phase.phaseDef)
new_phases.append(new_phase)
def lock_area(area: Area):
max_speed = 30 return traci.trafficlight.Logic("new-program", 0 , 0 , 0 , new_phases)
print(f'Setting max speed into {area.name} to {max_speed} km/h')
area.locked = True def adjust_traffic_light_phase_duration(area, reduction_factor):
for lane in area._lanes: area.tls_adjusted = True
traci.lane.setMaxSpeed(lane.lane_id, max_speed / 3.6) for tl in area._tls:
for logic in tl._logics:
traci.trafficlights.setCompleteRedYellowGreenDefinition(tl.tl_id, modifyLogic(logic,reduction_factor))
def count_vehicles_in_area(area):
vehicles_in_area = 0
for lane in area._lanes:
vehicles_in_area += traci.lane.getLastStepVehicleNumber(lane.lane_id)
return vehicles_in_area
def lock_area(area):
area.locked = True
for lane in area._lanes:
traci.lane.setDisallowed(lane.lane_id, 'passenger')
def reverse_actions(area):
#Reset max speed to original
if not area.limited_speed:
area.limited_speed = False
for lane in area.lanes:
traci.lane.setMaxSpeed(lane.lane_id, lane.initial_max_speed / 3.6)
#Reset traffic lights initial duration
if not area.tls_adjusted:
area.tls_adjusted = False
for initial_logic in tl._logics:
traci.trafficlights.setCompleteRedYellowGreenDefinition(tl.tl_id, initial_logic)
#Unlock the area
if not area.locked:
area.locked = False
for lane in area._lanes:
traci.lane.setAllowed(lane.lane_id, '') #empty means all classes are allowed

View File

@ -1,26 +1,117 @@
""" """
Global configuration for the simulation Global configuration for the simulation
""" """
import os import os
import sys import sys
import datetime
if 'SUMO_HOME' in os.environ: import logging
tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
sys.path.append(tools) ###############################################################################
else: ############################# SIMULATION FILE #################################
sys.exit("please declare environment variable 'SUMO_HOME'") ###############################################################################
_SUMOCMD = 'sumo' # use 'sumo-gui' cmd for UI if 'SUMO_HOME' in os.environ:
_SUMOCFG = "mulhouse_simulation/osm.sumocfg" tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
CELLS_NUMBER = 10 sys.path.append(tools)
EMISSIONS_THRESHOLD = 500000 else:
n_steps = 200 sys.exit("please declare environment variable 'SUMO_HOME'")
lock_mode = True _SUMOCMD = 'sumo' # use 'sumo-gui' cmd for UI
routing_mode = False _SUMOCFG = "mulhouse_simulation/osm.sumocfg"
sumo_binary = os.path.join(os.environ['SUMO_HOME'], 'bin', _SUMOCMD)
sumo_binary = os.path.join(os.environ['SUMO_HOME'], 'bin', _SUMOCMD) sumo_cmd = [sumo_binary, "-c", _SUMOCFG]
sumo_cmd = [sumo_binary, "-c", _SUMOCFG]
###############################################################################
################################## LOGS #######################################
###############################################################################
now = datetime.datetime.now()
current_date = now.strftime("%Y_%m_%d_%H_%M_%S")
LOG_FILENAME = f'sumo_logs_{current_date}.log'
# create logger
logger = logging.getLogger("sumo_logger")
logger.setLevel(logging.INFO)
# create console handler and set level to info
handler = logging.FileHandler(LOG_FILENAME)
# create formatter
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
# add formatter to handler
handler.setFormatter(formatter)
# add handler to logger
logger.addHandler(handler)
###############################################################################
########################## SIMULATION CONFIGURATION ###########################
###############################################################################
CELLS_NUMBER = 10
EMISSIONS_THRESHOLD = 500000
n_steps = 200
###############################################################################
########################## ACTIONS CONFIGURATION ##############################
###############################################################################
#Set this mode to True if you want running a basic simulation without actions
without_actions_mode = False
#Limit the speed into areas when the threshold is exceeded
speed_rf = 0.1
limit_speed_mode = True
#Decrease all traffic lights duration into the area when the threshold is exceeded
trafficLights_duration_rf = 0.2
adjust_traffic_light_mode = True
#Vehicles are routed according to the less polluted route (HEAVY)
weight_routing_mode = False
#Lock the area when the threshold is exceeded (NOT FIXED)
lock_area_mode = False
#Weight routing mode cannot be combinated with other actions
if weight_routing_mode:
limit_speed_mode = False
adjust_traffic_light_mode = False
#If without_actions_mode is choosen
if without_actions_mode:
limit_speed_mode = False
adjust_traffic_light_mode = False
weight_routing_mode = False
lock_area_mode = False
###############################################################################
########################## SIMULATION REFERENCES ##############################
###############################################################################
# Total of emissions of all pollutants in mg for n steps of simulation without locking areas
total_emissions200 = 43970763.15084749
total_emissions300 = 87382632.08217141
total_emissions400 = 140757491.8489904
total_emissions500 = 202817535.43856794
###############################################################################
########################## CONFIGURATION METHODS ##############################
###############################################################################
def get_basics_emissions():
if n_steps == 200:
return total_emissions200
if n_steps == 300:
return total_emissions300
if n_steps == 400:
return total_emissions400
if n_steps == 500:
return total_emissions500
def show_config():
return (str(f'Grid : {CELLS_NUMBER}x{CELLS_NUMBER}\n')
+ str(f'step number = {n_steps}\n')
+ str(f'weight routing mode = {weight_routing_mode}\n')
+ str(f'lock area mode = {lock_area_mode}\n')
+ str(f'limit speed mode = {limit_speed_mode}, RF = {speed_rf*100}%\n')
+ str(f'adjust traffic light mode = {adjust_traffic_light_mode} , RF = {trafficLights_duration_rf*100}%\n'))

View File

@ -1,112 +1,154 @@
from typing import List from typing import List
import traci import traci
from shapely.geometry import LineString import time
import actions from shapely.geometry import LineString
import config from parse import search
import sys
from model import Area, Vehicle, Lane import actions
import config
import sys
def init_grid(simulation_bounds, cells_number): from model import Area, Vehicle, Lane , TrafficLight , Phase , Logic
grid = list() from traci import trafficlight
width = simulation_bounds[1][0] / cells_number
height = simulation_bounds[1][1] / cells_number logger = config.logger
for i in range(cells_number):
for j in range(cells_number): def init_grid(simulation_bounds, cells_number):
# bounds coordinates for the area : (xmin, ymin, xmax, ymax) grid = list()
ar_bounds = ((i * width, j * height), (i * width, (j + 1) * height), width = simulation_bounds[1][0] / cells_number
((i + 1) * width, (j + 1) * height), ((i + 1) * width, j * height)) height = simulation_bounds[1][1] / cells_number
area = Area(ar_bounds) for i in range(cells_number):
area.name = 'area ({},{})'.format(i, j) for j in range(cells_number):
grid.append(area) # bounds coordinates for the area : (xmin, ymin, xmax, ymax)
traci.polygon.add(area.name, ar_bounds, (0, 255, 0)) ar_bounds = ((i * width, j * height), (i * width, (j + 1) * height),
return grid ((i + 1) * width, (j + 1) * height), ((i + 1) * width, j * height))
area = Area(ar_bounds)
area.name = 'Area ({},{})'.format(i, j)
def compute_vehicle_emissions(veh_id): grid.append(area)
return (traci.vehicle.getCOEmission(veh_id) traci.polygon.add(area.name, ar_bounds, (0, 255, 0))
+ traci.vehicle.getNOxEmission(veh_id) return grid
+ traci.vehicle.getHCEmission(veh_id)
+ traci.vehicle.getPMxEmission(veh_id) def get_all_lanes() -> List[Lane]:
+ traci.vehicle.getCO2Emission(veh_id)) lanes = []
for lane_id in traci.lane.getIDList():
polygon_lane = LineString(traci.lane.getShape(lane_id))
def get_all_vehicles() -> List[Vehicle]: initial_max_speed = traci.lane.getMaxSpeed(lane_id)
vehicles = list() lanes.append(Lane(lane_id, polygon_lane, initial_max_speed))
for veh_id in traci.vehicle.getIDList(): return lanes
veh_pos = traci.vehicle.getPosition(veh_id)
vehicle = Vehicle(veh_id, veh_pos) def parse_phase(phase_repr):
vehicle.emissions = compute_vehicle_emissions(veh_id) duration = search('duration: {:f}', phase_repr)
traci.vehicle.setRoutingMode(veh_id, traci.constants.ROUTING_MODE_AGGREGATED) minDuration = search('minDuration: {:f}', phase_repr)
vehicles.append(vehicle) maxDuration = search('maxDuration: {:f}', phase_repr)
return vehicles phaseDef = search('phaseDef: {}\n', phase_repr)
if phaseDef is None: phaseDef = ''
def get_all_lanes() -> List[Lane]: else : phaseDef = phaseDef[0]
lanes = []
for lane_id in traci.lane.getIDList(): return Phase(duration[0], minDuration[0], maxDuration[0], phaseDef)
polygon_lane = LineString(traci.lane.getShape(lane_id))
lanes.append(Lane(lane_id, polygon_lane)) def add_data_to_areas(areas: List[Area]):
return lanes lanes = get_all_lanes()
for area in areas:
for lane in lanes: # add lanes
def get_emissions(grid: List[Area], vehicles: List[Vehicle]): if area.rectangle.intersects(lane.polygon):
for area in grid: area.add_lane(lane)
for vehicle in vehicles: for tl_id in traci.trafficlight.getIDList(): # add traffic lights
if vehicle.pos in area: if lane.lane_id in traci.trafficlight.getControlledLanes(tl_id):
area.emissions += vehicle.emissions logics = []
if config.lock_mode and area.emissions > config.EMISSIONS_THRESHOLD and not area.locked: for l in traci.trafficlight.getCompleteRedYellowGreenDefinition(tl_id): #add logics
actions.lock_area(area) phases = []
traci.polygon.setColor(area.name, (255, 0, 0)) for phase in traci.trafficlight.Logic.getPhases(l): #add phases to logics
traci.polygon.setFilled(area.name, True) phases.append(parse_phase(phase.__repr__()))
logics.append(Logic(l,phases))
area.add_tl(TrafficLight(tl_id,logics))
def add_lanes_to_areas(areas: List[Area]):
lanes = get_all_lanes() def compute_vehicle_emissions(veh_id):
for area in areas: return (traci.vehicle.getCOEmission(veh_id)
for lane in lanes: +traci.vehicle.getNOxEmission(veh_id)
if area.rectangle.intersects(lane.polygon): +traci.vehicle.getHCEmission(veh_id)
area.add_lane(lane) +traci.vehicle.getPMxEmission(veh_id)
+traci.vehicle.getCO2Emission(veh_id))
def main():
grid = list() def get_all_vehicles() -> List[Vehicle]:
try: vehicles = list()
traci.start(config.sumo_cmd) for veh_id in traci.vehicle.getIDList():
grid = init_grid(traci.simulation.getNetBoundary(), config.CELLS_NUMBER) veh_pos = traci.vehicle.getPosition(veh_id)
add_lanes_to_areas(grid) vehicle = Vehicle(veh_id, veh_pos)
vehicle.emissions = compute_vehicle_emissions(veh_id)
step = 0 vehicles.append(vehicle)
while step < config.n_steps: # traci.simulation.getMinExpectedNumber() > 0: return vehicles
traci.simulationStep()
def get_emissions(grid: List[Area], vehicles: List[Vehicle]):
vehicles = get_all_vehicles() for area in grid:
get_emissions(grid, vehicles) for vehicle in vehicles:
if vehicle.pos in area:
if config.routing_mode: area.emissions += vehicle.emissions
actions.adjust_edges_weights() if area.emissions > config.EMISSIONS_THRESHOLD:
# actions.rerouteAllVehicles()
if config.limit_speed_mode and not area.limited_speed:
step += 1 logger.info(f'Action - Decreased max speed into {area.name} by {config.speed_rf*100}%')
sys.stdout.write(f'Simulation step = {step}/{config.n_steps}' + '\r') actions.limit_speed_into_area(area, vehicles, config.speed_rf)
sys.stdout.flush() traci.polygon.setColor(area.name, (255, 0, 0))
traci.polygon.setFilled(area.name, True)
finally: if config.adjust_traffic_light_mode and not area.tls_adjusted:
traci.close(False) logger.info(f'Action - Decreased traffic lights duration by {config.trafficLights_duration_rf*100}%')
actions.adjust_traffic_light_phase_duration(area, config.trafficLights_duration_rf)
total_emissions = 0
for area in grid: if config.lock_area_mode and not area.locked:
total_emissions += area.emissions if actions.count_vehicles_in_area(area):
logger.info(f'Action - {area.name} blocked')
# Total of emissions of all pollutants in mg for 200 steps of simulation without locking areas actions.lock_area(area)
total_emissions200 = 43970763.15084749
def main():
print(f'\n**** Total emissions = {total_emissions} mg ****') grid = list()
diff_with_lock = (total_emissions200 - total_emissions) / total_emissions200 try:
print(f'**** Reduction percentage of emissions = {diff_with_lock*100} % ****\n') traci.start(config.sumo_cmd)
logger.info('Loading data for the simulation')
if __name__ == '__main__': start = time.perf_counter()
main()
grid = init_grid(traci.simulation.getNetBoundary(), config.CELLS_NUMBER)
add_data_to_areas(grid)
loading_time = round(time.perf_counter() - start,2)
logger.info(f'Data loaded ({loading_time}s)')
logger.info('Start of the simulation')
step = 0
while step < config.n_steps : #traci.simulation.getMinExpectedNumber() > 0:
traci.simulationStep()
vehicles = get_all_vehicles()
get_emissions(grid, vehicles)
if config.weight_routing_mode:
logger.info('Action - Lane weights adjusted')
actions.adjust_edges_weights()
step += 1
finally:
traci.close(False)
simulation_time = round(time.perf_counter() - start,2)
logger.info(f'End of the simulation ({simulation_time}s)')
total_emissions = 0
for area in grid:
total_emissions += area.emissions
logger.info(f'Total emissions = {total_emissions} mg')
if not config.without_actions_mode:
ref = config.get_basics_emissions()
diff_with_actions = (ref - total_emissions)/ref
logger.info(f'Reduction percentage of emissions = {diff_with_actions*100} %')
logger.info('With the configuration : \n' + str(config.show_config()))
logger.info('Logs END')
if __name__ == '__main__':
main()

View File

@ -1,64 +1,96 @@
from typing import Tuple, Set from typing import Tuple, Set
from shapely.geometry import Point, LineString from shapely.geometry import Point, LineString
from shapely.geometry import Polygon from shapely.geometry import Polygon
from shapely.geometry.base import BaseGeometry from shapely.geometry.base import BaseGeometry
from traci._trafficlight import Logic as SUMO_Logic
class Lane:
class Lane:
def __init__(self, lane_id: str, polygon: LineString):
self.polygon = polygon def __init__(self, lane_id: str, polygon: LineString, initial_max_speed: float):
self.lane_id = lane_id self.polygon = polygon
self.lane_id = lane_id
def __hash__(self): self.initial_max_speed = initial_max_speed
"""Overrides the default implementation"""
return hash(self.lane_id) def __hash__(self):
"""Overrides the default implementation"""
return hash(self.lane_id)
class Area:
class Phase:
def __init__(self, coords, name=''): def __init__(self, duration: float, minDuration: float, maxDuration : float, phaseDef: str):
self.locked = False self.duration = duration
self.rectangle = Polygon(coords) self.minDuration = minDuration
self.name = name self.maxDuration = maxDuration
self.emissions = 0.0 self.phaseDef = phaseDef
self._lanes: Set[Lane] = set()
def __repr__(self) -> str:
def __eq__(self, other): repr = f'Phase(duration:{self.duration},minDuration:{self.minDuration},maxDuration:{self.maxDuration},phaseDef:{self.phaseDef})'
return self.rectangle.__eq__(other) return str(repr)
def __contains__(self, item): class Logic:
return self.rectangle.contains(item) def __init__(self, logic: SUMO_Logic, phases: Set[Phase]):
self._logic = logic
@property self._phases: Set[Phase] = phases
def bounds(self):
return self.rectangle.bounds class TrafficLight:
def intersects(self, other: BaseGeometry) -> bool: def __init__(self, tl_id: str, logics: Set[Logic]):
return self.rectangle.intersects(other) self.tl_id = tl_id
self._logics: Set[Logic] = logics
def add_lane(self, lane: Lane):
self._lanes.add(lane) def __hash__(self):
"""Overrides the default implementation"""
def remove_lane(self, lane: Lane): return hash(self.tl_id)
self._lanes.remove(lane)
class Area:
@classmethod
def from_bounds(cls, xmin, ymin, xmax, ymax): def __init__(self, coords, name=''):
return cls(( self.limited_speed = False
(xmin, ymin), self.locked = False
(xmin, ymax), self.tls_adjusted = False
(xmax, ymax), self.rectangle = Polygon(coords)
(xmax, ymin))) self.name = name
self.emissions = 0.0
self._lanes: Set[Lane] = set()
class Vehicle: self._tls: Set[TrafficLight] = set()
def __init__(self, veh_id: int, pos: Tuple[float, float]): def __eq__(self, other):
self.emissions: float = 0.0 return self.rectangle.__eq__(other)
self.veh_id = veh_id
self.pos = Point(pos) def __contains__(self, item):
return self.rectangle.contains(item)
def __repr__(self) -> str:
return str(self.__dict__) @property
def bounds(self):
return self.rectangle.bounds
def intersects(self, other: BaseGeometry) -> bool:
return self.rectangle.intersects(other)
def add_lane(self, lane: Lane):
self._lanes.add(lane)
def add_tl(self, tl: TrafficLight):
self._tls.add(tl)
def remove_lane(self, lane: Lane):
self._lanes.remove(lane)
@classmethod
def from_bounds(cls, xmin, ymin, xmax, ymax):
return cls((
(xmin, ymin),
(xmin, ymax),
(xmax, ymax),
(xmax, ymin)))
class Vehicle:
def __init__(self, veh_id: int, pos: Tuple[float, float]):
self.emissions: float = 0.0
self.veh_id = veh_id
self.pos = Point(pos)
def __repr__(self) -> str:
return str(self.__dict__)