# -*- coding: utf-8 -*- import os import shutil import subprocess import tempfile from xml.etree import ElementTree import randomTrips # Absolute path of the directory the script is in SCRIPTDIR = os.path.dirname(__file__) TEMPLATEDIR = os.path.join(SCRIPTDIR, 'templates') vehicle_classes = { 'passenger': { '--vehicle-class': 'passenger', '--vclass': 'passenger', '--prefix': 'veh' }, 'bus': { '--vehicle-class': 'bus', '--vclass': 'bus', '--prefix': 'bus' } } def load_netconvert_template(osm_input, out_name): tree = ElementTree.parse(os.path.join(TEMPLATEDIR, 'simul.netcfg')) root = tree.getroot() root.find('input/osm-files').set('value', osm_input) root.find('output/output-file').set('value', f'{out_name}.net.xml') root.find('report/log').set('value', f'{out_name}.netconvert.log') return tree def load_polyconvert_template(osm_file, type_file, scenario_name): tree = ElementTree.parse(os.path.join(TEMPLATEDIR, 'simul.polycfg')) root = tree.getroot() root.find('input/osm-files').set('value', osm_file) root.find('input/net-file').set('value', f'{scenario_name}.net.xml') root.find('input/type-file').set('value', type_file) root.find('output/output-file').set('value', f'{scenario_name}.poly.xml') root.find('report/log').set('value', f'{scenario_name}.polyconvert.log') return tree def load_sumoconfig_template(simulation_name, routefiles=None): routefiles = routefiles or (f'{simulation_name}.rou.xml',) tree = ElementTree.parse(os.path.join(TEMPLATEDIR, 'simul.sumocfg')) root = tree.getroot() root.find('input/net-file').set('value', f'{simulation_name}.net.xml') root.find('input/route-files').set('value', ','.join(f for f in routefiles)) root.find('input/additional-files').set('value', f'{simulation_name}.poly.xml') root.find('report/log').set('value', f'{simulation_name}.log') return tree def generate_scenario(osm_file, out_path, scenario_name): net_template = load_netconvert_template(osm_file, scenario_name) poly_template = load_polyconvert_template(osm_file, 'typemap/osmPolyconvert.typ.xml', scenario_name) with tempfile.TemporaryDirectory() as tmpdirname: netconfig = os.path.join(tmpdirname, f'{scenario_name}.netcfg') polyconfig = os.path.join(tmpdirname, f'{scenario_name}.polycfg') net_template.write(netconfig) poly_template.write(polyconfig) # Copy typemaps to tempdir shutil.copytree(os.path.join(TEMPLATEDIR, 'typemap'), os.path.join(tmpdirname, 'typemap')) polyconvert_cmd = ['polyconvert', '-c', polyconfig] netconvertcmd = ['netconvert', '-c', netconfig] subprocess.run(netconvertcmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) subprocess.run(polyconvert_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Move files to destination ignore_patterns = shutil.ignore_patterns('*.polycfg', '*.netcfg', 'typemap') shutil.copytree(tmpdirname, out_path, ignore=ignore_patterns) def generate_mobility(path, name): netfile = os.path.join(path, f'{name}.net.xml') output = os.path.join(path, f'{name}.trips.xml') routefiles = [] end_time = 200 classes = ('passenger', 'bus') for veh_class in classes: # simname.bus.rou.xml, simname.passenger.rou.xml, ... routefile = f'{name}.{veh_class}.rou.xml' routepath = os.path.join(path, routefile) routefiles.append(routefile) options = { '--net-file': netfile, '--output-trip-file': output, '--route-file': routepath, '-e': end_time } options.update(vehicle_classes[veh_class]) flags = ['-l'] generate_random_trips(flags, options) return routefiles def generate_random_trips(flags, options): randomTrips.main(randomTrips.get_options(dict_to_list(options) + flags)) def generate_sumo_configuration(routefiles, path, scenario_name): sumo_template = load_sumoconfig_template(routefiles, scenario_name) sumo_template.write(os.path.join(path, f'{scenario_name}.sumocfg')) def generate_all(osm_file, path, simulation_name): simulation_dir = os.path.join(path, simulation_name) logs_dir = os.path.join(simulation_dir, 'log') generate_scenario(osm_file, simulation_dir, simulation_name) routefiles = generate_mobility(simulation_dir, simulation_name) generate_sumo_configuration(routefiles, simulation_dir, simulation_name) # Move all logs to logdir move_logs(simulation_dir, logs_dir) def move_logs(simulation_dir, logs_dir): for f in os.listdir(simulation_dir): if os.path.splitext(f)[1] == '.log': shutil.move(os.path.join(simulation_dir, f), logs_dir) def dict_to_list(d): return [item for k in d for item in (k, d[k])] if __name__ == '__main__': if os.path.isdir('/tmp/scenario/foo'): shutil.rmtree('/tmp/scenario/foo') path = '/tmp/scenario/' osm = '/tmp/scenario/map.osm' generate_all(osm, path, 'foo')