From 3f1ed1403e148ad3f3074d97570a721751530f74 Mon Sep 17 00:00:00 2001 From: Thibaud Date: Fri, 13 Mar 2020 16:37:31 +0100 Subject: [PATCH] feat: uses a semaphore to limit concurrent requests --- wallabag_mass_import.py | 67 ++++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 25 deletions(-) diff --git a/wallabag_mass_import.py b/wallabag_mass_import.py index 6e76cce..ff26cab 100644 --- a/wallabag_mass_import.py +++ b/wallabag_mass_import.py @@ -35,41 +35,58 @@ async def connect_to_wallabag( ) -async def post_entry(w_api: Wallabag, item: Dict[str, str]): - entry = await w_api.post_entries( - url=item["url"], - archive=item["is_archived"], - starred=item["is_starred"], - original_url=item["origin_url"], - tags=",".join(item["tags"]), - ) - logging.info(f"Entry url {item['url']} posted to wallabag with id {entry['id']}") - return entry["id"] +async def post_entry( + w_api: Wallabag, item: Dict[str, str], semaphore: asyncio.Semaphore +): + async with semaphore: + try: + entry = await w_api.post_entries( + url=item["url"], + archive=item["is_archived"], + starred=item["is_starred"], + original_url=item["origin_url"], + tags=",".join(item["tags"]), + ) + logging.info( + f"Entry url {item['url']} posted to wallabag with id {entry['id']}" + ) + return entry["id"] + except Exception as e: + logging.error(f"Creating {item['url']} generated an exception : {e}") -async def delete_entry(w_api: Wallabag, item_id: int): - entry = await w_api.delete_entries(item_id) - logging.info(f"Deleted wallabag entry with id {entry['id']}") +async def delete_entry(w_api: Wallabag, item_id: int, semaphore: asyncio.Semaphore): + async with semaphore: + try: + entry = await w_api.delete_entries(item_id) + logging.info(f"Deleted wallabag entry with id {entry['id']}") + except Exception as e: + logging.error(f"Deleting id {item_id} generated an exception : {e}") -async def import_articles(w_api: Wallabag, path: str, limit=0): - entries_id = [] - tasks = [] +async def import_articles(w_api: Wallabag, path: str, max_entries=0, max_requests=20): + sem = asyncio.Semaphore(max_requests) + + entries_id = set() + tasks = set() with open(path) as f: for index, item in enumerate(json.load(f)): - if limit > 0 and index == limit: + if max_entries > 0 and index == max_entries: break - task = asyncio.ensure_future(post_entry(w_api, item)) - tasks.append(task) + task = asyncio.ensure_future(post_entry(w_api, item, sem)) + tasks.add(task) entries_id = await asyncio.gather(*tasks) return entries_id -async def delete_all_entries(ids: List[int], w_api: Wallabag): - tasks = [] - for id in ids: - tasks.append(delete_entry(w_api, id)) +async def delete_all_entries(ids: List[int], w_api: Wallabag, max_requests=20): + sem = asyncio.Semaphore(max_requests) + tasks = ( + asyncio.ensure_future(delete_entry(w_api, id, sem)) + for id in ids + if id is not None + ) await asyncio.gather(*tasks) @@ -78,9 +95,9 @@ async def main(): configuration = dict(load_configuration(CONFIG_PATH)) w_api = await connect_to_wallabag(configuration, session) - entries_id = await import_articles(w_api, ARTICLES_PATH, limit=4) + entries_id = await import_articles(w_api, ARTICLES_PATH, max_entries=20, max_requests=20) await delete_all_entries(entries_id, w_api) if __name__ == "__main__": - asyncio.run(main()) + asyncio.run(main(), debug=True)