2018-11-23 12:53:20 +00:00
|
|
|
"""
|
|
|
|
Created on 17 oct. 2018
|
|
|
|
|
|
|
|
@author: Axel Huynh-Phuc, Thibaud Gasser
|
|
|
|
"""
|
2018-12-09 13:27:39 +00:00
|
|
|
import traci
|
|
|
|
from traci._trafficlight import Logic
|
2018-11-23 12:53:20 +00:00
|
|
|
from typing import Iterable
|
|
|
|
|
|
|
|
from shapely.geometry.linestring import LineString
|
|
|
|
|
|
|
|
from model import Area, Vehicle
|
2018-12-09 13:27:39 +00:00
|
|
|
|
2018-11-23 12:53:20 +00:00
|
|
|
|
|
|
|
def compute_edge_weight(edge_id):
|
|
|
|
return (traci.edge.getCOEmission(edge_id)
|
|
|
|
+ traci.edge.getNOxEmission(edge_id)
|
|
|
|
+ traci.edge.getHCEmission(edge_id)
|
|
|
|
+ traci.edge.getPMxEmission(edge_id)
|
2018-12-06 20:49:01 +00:00
|
|
|
+ traci.edge.getCO2Emission(edge_id))/(traci.edge.getLaneNumber(edge_id))
|
2018-11-23 12:53:20 +00:00
|
|
|
|
2018-12-06 20:49:01 +00:00
|
|
|
def adjust_edges_weights():
|
2018-11-23 12:53:20 +00:00
|
|
|
for edge_id in traci.edge.getIDList():
|
|
|
|
weight = compute_edge_weight(edge_id) # by default edges weight = length/mean speed
|
2018-12-06 20:49:01 +00:00
|
|
|
traci.edge.setEffort(edge_id, weight)
|
|
|
|
|
|
|
|
for veh_id in traci.vehicle.getIDList():
|
|
|
|
traci.vehicle.rerouteEffort(veh_id)
|
|
|
|
|
2018-12-07 15:30:23 +00:00
|
|
|
def limit_speed_into_area(area: Area, vehicles: Iterable[Vehicle], speed_rf):
|
2018-12-06 20:49:01 +00:00
|
|
|
area.limited_speed = True
|
2018-11-23 12:53:20 +00:00
|
|
|
for lane in area._lanes:
|
2018-12-07 15:30:23 +00:00
|
|
|
traci.lane.setMaxSpeed(lane.lane_id, speed_rf * lane.initial_max_speed)
|
2018-11-29 10:48:07 +00:00
|
|
|
|
2018-12-03 20:05:01 +00:00
|
|
|
def modifyLogic(logic, rf): #rf for "reduction factor"
|
|
|
|
new_phases = []
|
|
|
|
for phase in logic._phases:
|
|
|
|
new_phase = traci.trafficlight.Phase(phase.duration*rf,phase.minDuration*rf,phase.maxDuration*rf,phase.phaseDef)
|
|
|
|
new_phases.append(new_phase)
|
2018-11-23 12:53:20 +00:00
|
|
|
|
2018-12-03 20:05:01 +00:00
|
|
|
return traci.trafficlight.Logic("new-program", 0 , 0 , 0 , new_phases)
|
|
|
|
|
|
|
|
def adjust_traffic_light_phase_duration(area, reduction_factor):
|
2018-12-07 15:30:23 +00:00
|
|
|
area.tls_adjusted = True
|
2018-11-29 16:00:13 +00:00
|
|
|
for tl in area._tls:
|
|
|
|
for logic in tl._logics:
|
2018-12-03 20:05:01 +00:00
|
|
|
traci.trafficlights.setCompleteRedYellowGreenDefinition(tl.tl_id, modifyLogic(logic,reduction_factor))
|
2018-11-23 13:40:47 +00:00
|
|
|
|
2018-12-07 15:30:23 +00:00
|
|
|
def count_vehicles_in_area(area):
|
2018-12-06 20:49:01 +00:00
|
|
|
vehicles_in_area = 0
|
|
|
|
for lane in area._lanes:
|
|
|
|
vehicles_in_area += traci.lane.getLastStepVehicleNumber(lane.lane_id)
|
2018-12-07 15:30:23 +00:00
|
|
|
return vehicles_in_area
|
|
|
|
|
|
|
|
def lock_area(area):
|
|
|
|
area.locked = True
|
|
|
|
for lane in area._lanes:
|
2018-12-07 16:16:26 +00:00
|
|
|
traci.lane.setDisallowed(lane.lane_id, 'passenger')
|
|
|
|
|
|
|
|
def reverse_actions(area):
|
|
|
|
#Reset max speed to original
|
|
|
|
if not area.limited_speed:
|
|
|
|
area.limited_speed = False
|
|
|
|
for lane in area.lanes:
|
|
|
|
traci.lane.setMaxSpeed(lane.lane_id, lane.initial_max_speed / 3.6)
|
|
|
|
|
|
|
|
#Reset traffic lights initial duration
|
|
|
|
if not area.tls_adjusted:
|
|
|
|
area.tls_adjusted = False
|
|
|
|
for initial_logic in tl._logics:
|
|
|
|
traci.trafficlights.setCompleteRedYellowGreenDefinition(tl.tl_id, initial_logic)
|
|
|
|
|
|
|
|
#Unlock the area
|
|
|
|
if not area.locked:
|
|
|
|
area.locked = False
|
|
|
|
for lane in area._lanes:
|
|
|
|
traci.lane.setAllowed(lane.lane_id, '') #empty means all classes are allowed
|
|
|
|
|
|
|
|
|