import asyncio import configparser import logging import json import os from typing import Any, Dict, List import aiohttp from wallabag_api.wallabag import Wallabag logging.basicConfig(level=logging.DEBUG) ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) CONFIG_PATH = os.path.join(ROOT_DIR, "config.ini") ARTICLES_PATH = os.path.join(ROOT_DIR, "articles.json") def load_configuration(path): config = configparser.ConfigParser() config.read(path) return config["Wallabag Configuration"] async def connect_to_wallabag( configuration: Dict[str, str], session: aiohttp.ClientSession ) -> Wallabag: configuration["access_token"] = await Wallabag.get_token(**configuration) return Wallabag( host=configuration["host"], token=configuration["access_token"], client_id=configuration["client_id"], client_secret=configuration["client_secret"], aio_sess=session, ) async def post_entry( w_api: Wallabag, item: Dict[str, str], semaphore: asyncio.Semaphore ): async with semaphore: try: entry = await w_api.post_entries( url=item["url"], archive=item["is_archived"], starred=item["is_starred"], original_url=item["origin_url"], tags=",".join(item["tags"]), ) logging.info( f"Entry url {item['url']} posted to wallabag with id {entry['id']}" ) return entry["id"] except Exception as e: logging.error(f"Creating {item['url']} generated an exception : {e}") async def delete_entry(w_api: Wallabag, item_id: int, semaphore: asyncio.Semaphore): async with semaphore: try: entry = await w_api.delete_entries(item_id) logging.info(f"Deleted wallabag entry with id {entry['id']}") except Exception as e: logging.error(f"Deleting id {item_id} generated an exception : {e}") async def import_articles(w_api: Wallabag, path: str, max_entries=0, max_requests=20): sem = asyncio.Semaphore(max_requests) entries_id = set() tasks = set() with open(path) as f: for index, item in enumerate(json.load(f)): if max_entries > 0 and index == max_entries: break task = asyncio.ensure_future(post_entry(w_api, item, sem)) tasks.add(task) entries_id = await asyncio.gather(*tasks) return entries_id async def delete_all_entries(ids: List[int], w_api: Wallabag, max_requests=20): sem = asyncio.Semaphore(max_requests) tasks = ( asyncio.ensure_future(delete_entry(w_api, id, sem)) for id in ids if id is not None ) await asyncio.gather(*tasks) async def main(): async with aiohttp.ClientSession(loop=asyncio.get_event_loop()) as session: configuration = dict(load_configuration(CONFIG_PATH)) w_api = await connect_to_wallabag(configuration, session) entries_id = await import_articles(w_api, ARTICLES_PATH, max_entries=20, max_requests=20) await delete_all_entries(entries_id, w_api) if __name__ == "__main__": asyncio.run(main(), debug=True)