26 Commits

Author SHA1 Message Date
FoxMaSk
825efab0f7 v1.2.2 due to pypi error 2018-04-18 11:17:50 +02:00
FoxMaSk
2c269fba43 v1.2.1 2018-04-18 11:13:57 +02:00
FoxMaSk
07d3840ca6 upd few things 2018-04-18 11:02:56 +02:00
FoxMaSk
43b1602cbb upd few things 2018-04-18 11:01:27 +02:00
FoxMaSk
c6f62a4697 change repo 2018-02-19 09:06:06 +01:00
FoxMaSk
63cd4e80cb Merge pull request #7 from Findus23/exists-auth
add access_token to entries_exists
2018-01-28 17:19:39 +01:00
Lukas Winkler
b7d4abb0d1 add access_token to entries_exists 2018-01-27 22:37:02 +01:00
FoxMaSk
5b3cd028a7 do no exit if a user changed or does not provide the correct host, client_id, client_key, login, pass 2017-09-23 21:18:28 +02:00
FoxMaSk
0c40a35559 Switching request to aiohttp 2017-08-20 15:18:24 +02:00
FoxMaSk
a9e0335054 v 1.2.0 2017-08-20 15:12:33 +02:00
FoxMaSk
6a7573069b upd doc and setup 2017-08-20 15:11:23 +02:00
FoxMaSk
c92e55208a aiohttp integrated 2017-08-18 18:50:56 +02:00
FoxMaSk
aea9df0978 aiohttp 2017-08-18 13:36:53 +02:00
FoxMaSk
9fbd7cc28d switch from requests to aiohttp 2017-08-17 10:59:26 +02:00
FoxMaSk
3264fda186 Merge branch 'master' of github.com:foxmask/wallabag_api 2017-08-12 21:28:14 +02:00
FoxMaSk
922c47f1cf Lib updated to be compliant with Wallabag 2.2.3 2017-07-14 16:37:24 +02:00
FoxMaSk
fa9c6360a5 Update wallabag.py 2017-06-28 09:19:04 +02:00
Olivier Demah
12c30c6af3 str to list 2017-06-11 16:59:48 +02:00
FoxMaSk
546d744722 Merge pull request #4 from ashgan-dev/master
rajout de valeur par defaut
2017-04-29 14:38:29 +02:00
ashgan-dev
fbf71efa71 rajout de valeur par defaut
c'est mieux avec des valeurs par defaut...
2017-04-29 09:59:41 +00:00
FoxMaSk
c5c7fa06b1 Merge pull request #3 from ashgan-dev/master
add timestamp filter
2017-04-29 10:46:19 +02:00
ashgan-dev
86e5c02b0f add timestamp filter
add missing timestamp filter from getting all entries
2017-04-28 09:32:08 +00:00
FoxMaSk
99bfd7c08b upd requests / python 3.6 2017-04-05 23:01:55 +02:00
FoxMaSk
8ce21dcdd3 gitignore 2017-03-20 22:26:56 +01:00
FoxMaSk
b11f648ec9 handling a list of tags - fix #2 2017-03-20 22:25:02 +01:00
FoxMaSk
4ad230a83e improvments 2016-05-15 15:22:46 +02:00
6 changed files with 492 additions and 205 deletions

8
.gitignore vendored Normal file
View File

@@ -0,0 +1,8 @@
.idea
*.log
*.pot
*.pyc
.coverage
*.json
.tox
build

View File

@@ -1,13 +1,23 @@
.. image:: http://img.shields.io/badge/python-3.6-orange.svg
:target: https://pypi.python.org/pypi/django-th/
:alt: Python version supported
.. image:: http://img.shields.io/badge/license-BSD-blue.svg
:target: https://pypi.python.org/pypi/django-th/
:alt: License
============ ============
Wallabag API Wallabag API
============ ============
Python API for Wallabag v2 Python API for Wallabag v2.2.3
Requirements : Requirements :
============== ==============
* requests 2.5.0 * aiohttp
Installation: Installation:
@@ -17,7 +27,7 @@ to get the project, from your virtualenv, do :
.. code:: python .. code:: python
git clone https://github.com/foxmask/wallabag-api/ git clone https://github.com/push-things/wallabag_api/
or or
@@ -36,30 +46,71 @@ Creating a post :
.. code:: python .. code:: python
#!/usr/bin/env python
import aiohttp
import asyncio
from wallabag_api.wallabag import Wallabag from wallabag_api.wallabag import Wallabag
# settings # settings
params = {'username': 'foxmask',
'password': 'mypass',
'client_id': 'myid',
'client_secret': 'mysecret'}
my_host = 'http://localhost:8080' my_host = 'http://localhost:8080'
# get token
token = Wallabag.get_token(host=my_host, **params)
# create a post
wall = Wallabag(host=my_host, client_secret='mysecret', client_id='myid', token=token)
my_url = 'https://blog.trigger-happy.eu' async def main(loop):
my_title = 'Trigger Happy blog'
my_tags = ['python', 'wallabag']
wall.post_entries(url=my_url, title=my_title, tags=my_tags) params = {'username': 'foxmask',
'password': 'mypass',
'client_id': 'myid',
'client_secret': 'mysecret',
'extension': 'pdf'}
# get a new token
token = await Wallabag.get_token(host=my_host, **params)
# initializing
async with aiohttp.ClientSession(loop=loop) as session:
wall = Wallabag(host=my_host,
client_secret=params.get('client_secret'),
client_id=params.get('client_id'),
token=token,
extension=params['extension'],
aio_sess=session)
url = 'https://foxmask.trigger-happy.eu'
title = 'foxmask\'s blog'
await wall.post_entries(url, title, '', 0, 0)
url = 'https://trigger-happy.eu'
title = 'Project TrigerHappy'
await wall.post_entries(url, title, '', 0, 0)
# get all the articles
my_wallabag = await wall.get_entries()
all_article = my_wallabag['_embedded']['items']
for article in all_article:
print(article['id'], article['title'])
# get the version of wallabag
version = await wall.version
print(f"version {version}")
# export one article into PDF
my_wallabag = await wall.get_entry_export(entry=1)
with open("foobar.pdf", "wb") as f:
f.write(my_wallabag)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
this will give you something like this : this will give you something like this :
.. image:: https://github.com/foxmask/wallabag_api/blob/master/wallabag.png .. image:: https://github.com/push-things/wallabag_api/blob/master/wallabag.png
Testing : Testing :
@@ -67,11 +118,13 @@ Testing :
Install Wallabag V2 on your own host like explain here http://doc.wallabag.org/en/v2/user/installation.html Install Wallabag V2 on your own host like explain here http://doc.wallabag.org/en/v2/user/installation.html
Then run the development version (with make run)
Then create a client API like explain here http://doc.wallabag.org/en/v2/developer/api.html Then create a client API like explain here http://doc.wallabag.org/en/v2/developer/api.html
this will give you somthing like this this will give you something like this
.. image:: https://github.com/foxmask/wallabag_api/blob/master/wallabag_api_key.png .. image:: https://github.com/push-things/wallabag_api/blob/master/wallabag_api_key.png
Then replace the client_id / client_secret / login / pass to wallabag_test.py and run Then replace the client_id / client_secret / login / pass to wallabag_test.py and run
@@ -79,4 +132,3 @@ Then replace the client_id / client_secret / login / pass to wallabag_test.py an
python wallabag_test.py python wallabag_test.py

View File

@@ -1,28 +1,30 @@
from setuptools import setup, find_packages from setuptools import setup, find_packages
from wallabag_api import __version__ as version from wallabag_api import __version__ as version
desc = 'Wallabag API to add every pages you want to your Wallabag account'
long_desc = 'Wallabag is a "read it later" service, and that Wallabag API allow you to save web pages ' \
'to your own account'
install_requires = [ install_requires = [
'requests==2.5.0', 'aiohttp',
] ]
setup( setup(
name='wallabag_api', name='wallabag_api',
version=version, version=version,
description='Wallabag API to add every pages you want to your Wallabag account', description=desc,
long_description=long_desc,
author='FoxMaSk', author='FoxMaSk',
author_email='foxmask@trigger-happy.eu', author_email='foxmask@trigger-happy.eu',
url='https://github.com/foxmask/wallabag_api', url='https://github.com/push-things/wallabag_api',
download_url="https://github.com/foxmask/wallabag_api/archive/" download_url="https://github.com/push-things/wallabag_api/archive/wallabag_api-" + version + ".zip",
"wallabag_api-" + version + ".zip",
packages=find_packages(), packages=find_packages(),
classifiers=[ classifiers=[
'Development Status :: 4 - Beta', 'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers', 'Intended Audience :: Developers',
'License :: OSI Approved :: BSD License', 'License :: OSI Approved :: BSD License',
'Operating System :: OS Independent', 'Operating System :: OS Independent',
'Programming Language :: Python', 'Programming Language :: Python',
'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.5',
'Topic :: Internet', 'Topic :: Internet',
'Topic :: Communications', 'Topic :: Communications',
], ],

View File

@@ -1,2 +1,2 @@
VERSION = (1, 1, 0) # PEP 386 VERSION = (1, 2, 2) # PEP 386
__version__ = ".".join([str(x) for x in VERSION]) __version__ = ".".join([str(x) for x in VERSION])

View File

@@ -1,11 +1,11 @@
# coding: utf-8 # coding: utf-8
import requests
from requests import HTTPError
import logging import logging
import aiohttp
from aiohttp.http_exceptions import HttpProcessingError
from aiohttp.client_exceptions import ClientResponseError
__author__ = 'foxmask' __author__ = 'foxmask'
logging.basicConfig(format='%(message)s', level=logging.INFO) logging.basicConfig(format='%(message)s', level=logging.INFO)
__all__ = ['Wallabag'] __all__ = ['Wallabag']
@@ -16,7 +16,7 @@ class Wallabag(object):
Python Class 'Wallabag' to deal with Wallabag REST API Python Class 'Wallabag' to deal with Wallabag REST API
This class is able to handle any data from your Wallabag account This class is able to handle any data from your Wallabag account
""" """
EXTENTIONS = ('xml', 'json', 'txt', 'csv', 'pdf', 'epub', 'mobi', 'html')
host = '' host = ''
token = '' token = ''
client_id = '' client_id = ''
@@ -25,6 +25,7 @@ class Wallabag(object):
format = '' format = ''
username = '' username = ''
password = '' password = ''
aio_sess = None
def __init__(self, def __init__(self,
host='', host='',
@@ -32,16 +33,18 @@ class Wallabag(object):
client_id='', client_id='',
client_secret='', client_secret='',
extension='json', extension='json',
user_agent="WallabagPython/1.0 " user_agent="WallabagPython/1.2.2 "
"+https://github.com/foxmask/wallabag-api"): " +https://github.com/push-things/wallabag-api",
aio_sess=None):
""" """
init variable init variable
:param host: string url to the official API Wallabag :param host: string url to the official API Wallabag
:param token: string of the key provided by Wallabag :param token: string of the key provided by Wallabag
:param client_id client id :param client_id client id
:param client_secret client secret :param client_secret client secret
:param extension: json/xml/html :param extension: xml|json|txt|csv|pdf|epub|mobi|html
:param user_agent :param user_agent
:param aio_sess aiohttp session
""" """
self.host = host self.host = host
self.client_id = client_id self.client_id = client_id
@@ -49,156 +52,193 @@ class Wallabag(object):
self.token = token self.token = token
self.format = extension self.format = extension
self.user_agent = user_agent self.user_agent = user_agent
self.aio_sess = aio_sess
if self.format not in self.EXTENTIONS:
raise ValueError("format invalid {0} should be one of {1}".format(
self.format, self.EXTENTIONS))
def get_host(self): async def query(self, path, method='get', **params):
""" """
get the host from which to get API Do a query to the System API
:return host
"""
return self.host
def query(self, path, method='get', **params): :param path: url to the API
:param method: the kind of query to do
:param params: a dict with all the
necessary things to query the API
:return json data
""" """
Do a query to the System API if method in ('get', 'post', 'patch', 'delete', 'put'):
:param path: url to the API full_path = self.host + path
:param method: the kind of query to do
:param params: a dict with all the
necessary things to query the API
:return json data
"""
if method in ('get', 'post', 'patch', 'delete', 'put', 'get_token'):
if method == 'get': if method == 'get':
r = requests.get(self.get_host() + path, params=params) resp = await self.aio_sess.get(full_path, params=params)
elif method == 'post': elif method == 'post':
r = requests.post(self.get_host() + path, data=params) resp = await self.aio_sess.post(full_path, data=params)
elif method == 'patch': elif method == 'patch':
r = requests.patch(self.get_host() + path, data=params) resp = await self.aio_sess.patch(full_path, data=params)
elif method == 'delete': elif method == 'delete':
r = requests.delete(self.get_host() + path, headers=params) resp = await self.aio_sess.delete(full_path, headers=params)
elif method == 'put': elif method == 'put':
r = requests.put(self.get_host() + path, params=params) resp = await self.aio_sess.put(full_path, data=params)
return self.handle_json_response(r)
async with resp:
# return the content if its a binary one
if resp.content_type.startswith('application/pdf') or \
resp.content_type.startswith('application/epub'):
return await resp.read()
return await self.handle_json_response(resp)
else: else:
raise ValueError('method expected : get, post, patch, delete or put') raise ValueError('method expected: get, post, patch, delete, put')
@staticmethod @staticmethod
def handle_json_response(responses): async def handle_json_response(responses):
""" """
get the json data response get the json data response
:param responses: the json response :param responses: the json response
:return the json data without 'root' node :return the json data without 'root' node
""" """
if responses.status_code != 200:
raise HTTPError(responses.status_code, responses.json())
json_data = {} json_data = {}
try: if responses.status != 200:
json_data = responses.json() err_msg = HttpProcessingError(code=responses.status,
except: message=await responses.json())
for error in json_data['errors']: logging.error("Wallabag: aiohttp error {err_msg}".format(
error_json = json_data['errors'][error]['content'] err_msg=err_msg))
logging.error("Wallabag: {error}".format(error=error_json)) else:
return json_data try:
json_data = responses.json()
except ClientResponseError as e:
# sometimes json_data does not return any json() without
# any error. This is due to the grabbing URL which "rejects"
# the URL
logging.error("Wallabag: aiohttp error {code} {message}"
.format(code=e.code, message=e.message))
return await json_data
def get_entries(self, **kwargs): @staticmethod
def __get_attr(what, type_attr, value_attr, **kwargs):
""" """
get the value of a parm
:param what: string parm
:param type_attr: type of parm
:param value_attr:
:param kwargs:
:return: value of the parm
"""
value = int(kwargs[what]) if type_attr == 'int' else kwargs[what]
if what in kwargs and value in value_attr:
return value
GET /api/entries.{_format} # ENTRIES
async def get_entries(self, **kwargs):
"""
GET /api/entries.{_format}
Retrieve all entries. It could be filtered by many options. Retrieve all entries. It could be filtered by many options.
:param kwargs: can contain one of the following filters :param kwargs: can contain one of the following filters
archive: '0' or '1', default '0' filter by archived status. archive: '0' or '1', default '0' filter by archived status.
star: '0' or '1', default '0' filter by starred status. star: '0' or '1', default '0' filter by starred status.
delete: '0' or '1', default '0' filter by deleted status. delete: '0' or '1', default '0' filter by deleted status.
sort: 'created' or 'updated', default 'created' sort: 'created' or 'updated', default 'created'
order: 'asc' or 'desc', default 'desc' order: 'asc' or 'desc', default 'desc'
page: int default 1 what page you want page: int default 1 what page you want
perPage: int default 30 result per page perPage: int default 30 result per page
tags: list of tags url encoded. tags: list of tags url encoded.
Will returns entries that matches ALL tags since: int default 0 from what timestamp you want
:return data related to the ext Will returns entries that matches ALL tags
:return data related to the ext
""" """
# default values # default values
params = {'access_token': self.token, params = dict({'access_token': self.token,
'archive': 0, 'archive': 0,
'star': 0, 'star': 0,
'delete': 0, 'delete': 0,
'sort': 'created', 'sort': 'created',
'order': 'desc', 'order': 'desc',
'page': 1, 'page': 1,
'perPage': 30, 'perPage': 30,
'tags': []} 'tags': '',
'since': 0})
params['archive'] = self.__get_attr(what='archive',
type_attr=int,
value_attr=(0, 1),
**params)
params['star'] = self.__get_attr(what='star',
type_attr=int,
value_attr=(0, 1),
**params)
params['delete'] = self.__get_attr(what='delete',
type_attr=int,
value_attr=(0, 1),
**params)
params['order'] = self.__get_attr(what='order',
type_attr=str,
value_attr=('asc', 'desc'),
**params)
if 'archive' in kwargs and int(kwargs['archive']) in (0, 1):
params['archive'] = int(kwargs['archive'])
if 'star' in kwargs and int(kwargs['star']) in (0, 1):
params['star'] = int(kwargs['star'])
if 'delete' in kwargs and int(kwargs['delete']) in (0, 1):
params['delete'] = int(kwargs['delete'])
if 'order' in kwargs and kwargs['order'] in ('asc', 'desc'):
params['order'] = kwargs['order']
if 'page' in kwargs and isinstance(kwargs['page'], int): if 'page' in kwargs and isinstance(kwargs['page'], int):
params['page'] = kwargs['page'] params['page'] = kwargs['page']
if 'perPage' in kwargs and isinstance(kwargs['perPage'], int): if 'perPage' in kwargs and isinstance(kwargs['perPage'], int):
params['perPage'] = kwargs['perPage'] params['perPage'] = kwargs['perPage']
if 'tags' in kwargs and isinstance(kwargs['tags'], list): if 'tags' in kwargs and isinstance(kwargs['tags'], list):
params['tags'] = kwargs['tags'] params['tags'] = ', '.join(kwargs['tags'])
if 'since' in kwargs and isinstance(kwargs['since'], int):
params['since'] = kwargs['since']
path = '/api/entries.{ext}'.format(ext=self.format) path = '/api/entries.{ext}'.format(ext=self.format)
return self.query(path, "get", **params) return await self.query(path, "get", **params)
def post_entries(self, url, title='', tags='', starred=0, archive=0): async def post_entries(self, url, title='', tags='', starred=0, archive=0):
""" """
POST /api/entries.{_format}
POST /api/entries.{_format} Create an entry
Create an entry :param url: the url of the note to store
:param title: Optional, we'll get the title from the page.
:param url: the url of the note to store :param tags: tag1,tag2,tag3 a comma-separated list of tags.
:param title: Optional, we'll get the title from the page. :param starred entry already starred
:param tags: tag1,tag2,tag3 a comma-separated list of tags. :param archive entry already archived
:param starred entry already starred :return result
:param archive entry already archived
:return result
""" """
params = {'access_token': self.token, 'url': url, 'title': title, params = {'access_token': self.token, 'url': url, 'title': title,
'tags': tags, 'starred': starred, 'archive': archive} 'tags': tags, 'starred': starred, 'archive': archive}
if len(tags) > 0 and isinstance(tags, list): if len(tags) > 0 and ',' in tags:
params['tags'] = ', '.join(tags) params['tags'] = tags.split(',')
path = '/api/entries.{ext}'.format(ext=self.format) path = '/api/entries.{ext}'.format(ext=self.format)
return self.query(path, "post", **params) return await self.query(path, "post", **params)
def get_entry(self, entry): async def get_entry(self, entry):
""" """
GET /api/entries/{entry}.{_format}
GET /api/entries/{entry}.{_format} Retrieve a single entry
Retrieve a single entry :param entry: \w+ an integer The Entry ID
:return data related to the ext
:param entry: \w+ an integer The Entry ID
:return data related to the ext
""" """
params = {'access_token': self.token} params = {'access_token': self.token}
url = '/api/entries/{entry}.{ext}'.format(entry=entry, ext=self.format) url = '/api/entries/{entry}.{ext}'.format(entry=entry,
return self.query(url, "get", **params) ext=self.format)
return await self.query(url, "get", **params)
def patch_entries(self, entry, **kwargs): async def patch_entries(self, entry, **kwargs):
""" """
PATCH /api/entries/{entry}.{_format}
PATCH /api/entries/{entry}.{_format} Change several properties of an entry
Change several properties of an entry :param entry: the entry to 'patch' / update
:param kwargs: can contain one of the following
:param entry: the entry to 'patch' / update title: string
:param kwargs: can contain one of the following tags: a list of tags tag1,tag2,tag3
title: string archive: '0' or '1', default '0' archived the entry.
tags: a list of tags tag1,tag2,tag3 star: '0' or '1', default '0' starred the entry
archive: '0' or '1', default '0' archived the entry. delete: '0' or '1', default '0' flag as deleted.
star: '0' or '1', default '0' starred the entry In case that you don't want to *really* remove it..
delete: '0' or '1', default '0' flag as deleted. :return data related to the ext
In case that you don't want to *really* remove it..
:return data related to the ext
""" """
# default values # default values
params = {'access_token': self.token, params = {'access_token': self.token,
@@ -207,115 +247,294 @@ class Wallabag(object):
'tags': [], 'tags': [],
'star': 0, 'star': 0,
'delete': 0} 'delete': 0}
if 'title' in kwargs: if 'title' in kwargs:
params['title'] = kwargs['title'] params['title'] = kwargs['title']
if 'tags' in kwargs and isinstance(kwargs['tags'], list): if 'tags' in kwargs and isinstance(kwargs['tags'], list):
params['tags'] = ', '.join(kwargs['tags']) params['tags'] = ', '.join(kwargs['tags'])
if 'archive' in kwargs and int(kwargs['archive']) in (0, 1):
params['archive'] = int(kwargs['archive']) params['archive'] = self.__get_attr(what='archive',
if 'star' in kwargs and int(kwargs['star']) in (0, 1): type_attr=int,
params['star'] = int(kwargs['star']) value_attr=(0, 1),
if 'delete' in kwargs and int(kwargs['delete']) in (0, 1): **kwargs)
params['delete'] = int(kwargs['delete']) params['star'] = self.__get_attr(what='star',
type_attr=int,
value_attr=(0, 1),
**kwargs)
params['delete'] = self.__get_attr(what='delete',
type_attr=int,
value_attr=(0, 1),
**kwargs)
params['order'] = self.__get_attr(what='order',
type_attr=str,
value_attr=('asc', 'desc'),
**kwargs)
path = '/api/entries/{entry}.{ext}'.format( path = '/api/entries/{entry}.{ext}'.format(
entry=entry, ext=self.format) entry=entry, ext=self.format)
return self.query(path, "patch", **params) return await self.query(path, "patch", **params)
def delete_entries(self, entry): async def get_entry_export(self, entry):
""" """
GET /api/entries/{entry}/export.{_format}
DELETE /api/entries/{entry}.{_format} Retrieve a single entry as a predefined format.
Delete permanently an entry :param entry: \w+ an integer The Entry ID
:return data related to the ext
"""
params = {'access_token': self.token}
url = '/api/entries/{entry}/export.{ext}'.format(entry=entry,
ext=self.format)
return await self.query(url, "get", **params)
:param entry: \w+ an integer The Entry ID async def patch_entry_reload(self, entry):
:return result """
PATCH /api/entries/{entry}/reload.{_format}
Reload an entry. An empty response with HTTP Status 304 will be send
if we weren't able to update the content (because it hasn't changed
or we got an error).
:param entry: \w+ an integer The Entry ID
:return data related to the ext
"""
params = {'access_token': self.token}
url = '/api/entries/{entry}/reload.{ext}'.format(entry=entry,
ext=self.format)
return await self.query(url, "patch", **params)
async def delete_entries(self, entry):
"""
DELETE /api/entries/{entry}.{_format}
Delete permanently an entry
:param entry: \w+ an integer The Entry ID
:return result
""" """
params = {'Authorization': 'Bearer {}'.format(self.token)} params = {'Authorization': 'Bearer {}'.format(self.token)}
path = '/api/entries/{entry}.{ext}'.format( path = '/api/entries/{entry}.{ext}'.format(
entry=entry, ext=self.format) entry=entry, ext=self.format)
return self.query(path, "delete", **params) return await self.query(path, "delete", **params)
def get_entry_tags(self, entry): async def entries_exists(self, url, urls=''):
""" """
GET /api/entries/exists.{_format}
GET /api/entries/{entry}/tags.{_format} Check if an entry exist by url.
Retrieve all tags for an entry :param url string true An url Url to check if it exists
:param urls string false An array of urls
(?urls[]=http...&urls[]=http...) Urls (as an array)
to check if it exists
:param entry: \w+ an integer The Entry ID :return result
:return data related to the ext """
params = {'access_token': self.token,
'url': url,
'urls': urls}
path = '/api/entries/exists.{ext}'.format(ext=self.format)
return await self.query(path, "get", **params)
# TAGS
async def get_entry_tags(self, entry):
"""
GET /api/entries/{entry}/tags.{_format}
Retrieve all tags for an entry
:param entry: \w+ an integer The Entry ID
:return data related to the ext
""" """
params = {'access_token': self.token} params = {'access_token': self.token}
url = '/api/entries/{entry}/tags.{ext}'.format( url = '/api/entries/{entry}/tags.{ext}'.format(
entry=entry, ext=self.format) entry=entry, ext=self.format)
return self.query(url, "get", **params) return await self.query(url, "get", **params)
def post_entry_tags(self, entry, tags): async def post_entry_tags(self, entry, tags):
""" """
POST /api/entries/{entry}/tags.{_format}
POST /api/entries/{entry}/tags.{_format} Add one or more tags to an entry
Add one or more tags to an entry :param entry: \w+ an integer The Entry ID
:param tags: string
:param entry: \w+ an integer The Entry ID :return result
:param tags: string
:return result
""" """
params = {'access_token': self.token, 'tags': []} params = {'access_token': self.token, 'tags': []}
if isinstance(tags, list): if len(tags) > 0 and ',' in tags:
params['tags'] = tags params['tags'] = tags.split(',')
path = '/api/entries/{entry}/tags.{ext}'.format( path = '/api/entries/{entry}/tags.{ext}'.format(
entry=entry, ext=self.format) entry=entry, ext=self.format)
return self.query(path, "post", **params) return await self.query(path, "post", **params)
def delete_entry_tag(self, entry, tag): async def delete_entry_tag(self, entry, tag):
""" """
DELETE /api/entries/{entry}/tags/{tag}.{_format}
DELETE /api/entries/{entry}/tags/{tag}.{_format} Permanently remove one tag for an entry
Permanently remove one tag for an entry :param entry: \w+ an integer The Entry ID
:param tag: string The Tag
:param entry: \w+ an integer The Entry ID :return data related to the ext
:param tag: string The Tag
:return data related to the ext
""" """
params = {'access_token': self.token} params = {'access_token': self.token}
url = '/api/entries/{entry}/tags/{tag}.{ext}'.format( url = '/api/entries/{entry}/tags/{tag}.{ext}'.format(
entry=entry, tag=tag, ext=self.format) entry=entry, tag=tag, ext=self.format)
return self.query(url, "delete", **params) return await self.query(url, "delete", **params)
def get_tags(self): async def get_tags(self):
""" """
GET /api/tags.{_format}
GET /api/tags.{_format} Retrieve all tags
Retrieve all tags :return data related to the ext
:return data related to the ext
""" """
params = {'access_token': self.token} params = {'access_token': self.token}
path = '/api/tags.{ext}'.format(ext=self.format) path = '/api/tags.{ext}'.format(ext=self.format)
return self.query(path, "get", **params) return await self.query(path, "get", **params)
def delete_tag(self, tag): async def delete_tag(self, tag):
""" """
DELETE /api/tags/{tag}.{_format}
DELETE /api/tags/{tag}.{_format} Permanently remove one tag from every entry
Permanently remove one tag from every entry :param tag: string The Tag
:return data related to the ext
:param tag: string The Tag
:return data related to the ext
""" """
path = '/api/tags/{tag}.{ext}'.format(tag=tag, ext=self.format) path = '/api/tags/{tag}.{ext}'.format(tag=tag, ext=self.format)
params = {'access_token': self.token} params = {'access_token': self.token}
return self.query(path, "delete", **params) return await self.query(path, "delete", **params)
async def delete_tag_label(self, tag):
"""
DELETE /api/tag/label.{_format}
Permanently remove one tag from every entry.
:param tag: string The Tag
:return data related to the ext
"""
path = '/api/tag/label.{ext}'.format(ext=self.format)
params = {'access_token': self.token,
'tag': tag}
return await self.query(path, "delete", **params)
async def delete_tags_label(self, tags):
"""
DELETE /api/tags/label.{_format}
Permanently remove some tags from every entry.
:param tags: string tags as strings (comma splitted)
:return data related to the ext
"""
path = '/api/tag/label.{ext}'.format(ext=self.format)
params = {'access_token': self.token,
'tags': tags}
return await self.query(path, "delete", **params)
# ANNOTATIONS
async def delete_annotations(self, annotation):
"""
DELETE /api/annotations/{annotation}.{_format}
Removes an annotation.
:param annotation \w+ string The annotation ID
Will returns annotation for this entry
:return data related to the ext
"""
params = {'access_token': self.token}
url = '/api/annotations/{annotation}.{ext}'.format(
annotation=annotation, ext=self.format)
return await self.query(url, "delete", **params)
async def put_annotations(self, annotation):
"""
PUT /api/annotations/{annotation}.{_format}
Updates an annotation.
:param annotation \w+ string The annotation ID
Will returns annotation for this entry
:return data related to the ext
"""
params = {'access_token': self.token}
url = '/api/annotations/{annotation}.{ext}'.format(
annotation=annotation, ext=self.format)
return await self.query(url, "put", **params)
async def get_annotations(self, entry):
"""
GET /api/annotations/{entry}.{_format}
Retrieve annotations for an entry
:param entry \w+ integer The entry ID
Will returns annotation for this entry
:return data related to the ext
"""
params = {'access_token': self.token}
url = '/api/annotations/{entry}.{ext}'.format(entry=entry,
ext=self.format)
return await self.query(url, "get", **params)
async def post_annotations(self, entry, **kwargs):
"""
POST /api/annotations/{entry}.{_format}
Creates a new annotation.
:param entry \w+ integer The entry ID
:return
"""
params = dict({'access_token': self.token,
'ranges': [],
'quote': '',
'text': ''})
if 'ranges' in kwargs:
params['ranges'] = kwargs['ranges']
if 'quote' in kwargs:
params['quote'] = kwargs['quote']
if 'text' in kwargs:
params['text'] = kwargs['text']
url = '/api/annotations/{entry}.{ext}'.format(entry=entry,
ext=self.format)
return await self.query(url, "post", **params)
# VERSION
@property
async def version(self):
"""
GET /api/version.{_format}
Retrieve version number
:return data related to the ext
"""
params = {'access_token': self.token}
url = '/api/version.{ext}'.format(ext=self.format)
return await self.query(url, "get", **params)
@classmethod @classmethod
def get_token(cls, host, **params): async def get_token(cls, host, **params):
""" """
POST /oauth/v2/token
Get a new token
:param host: host of the service :param host: host of the service
:param params: will contain : :param params: will contain :
@@ -327,7 +546,9 @@ class Wallabag(object):
:return: access token :return: access token
""" """
params['grant_type'] = ["password"] params['grant_type'] = "password"
path = "/oauth/v2/token" path = "/oauth/v2/token"
r = requests.post(host + path, data=params) async with aiohttp.ClientSession() as sess:
return cls.handle_json_response(r)['access_token'] async with sess.post(host + path, data=params) as resp:
data = await cls.handle_json_response(resp)
return data.get("access_token")

View File

@@ -5,7 +5,7 @@ from wallabag import Wallabag
class TestWallabag(unittest.TestCase): class TestWallabag(unittest.TestCase):
host = 'http://localhost:8080' host = 'http://localhost:8000'
client_id = '' client_id = ''
client_secret = '' client_secret = ''
token = '' token = ''
@@ -20,12 +20,14 @@ class TestWallabag(unittest.TestCase):
def test_get_token(self): def test_get_token(self):
params = {"grant_type": "password", params = {"grant_type": "password",
"client_id": '1_37e16ub8a62oc4gwcg0o0wssks800kw0ok408kkwo4kosgc88g', "client_id":
"client_secret": '49etxpn66da8okg4cs40sswsog0sskwg4woc400c4w4w8s4wo4', '1_4wqe1riwt0qoks844kwc4go08koogkgk88go4cckkwg0408kcg',
"username": 'foxmask', "client_secret": '4mzw3qwi1xyc0cks4k80s4c8kco40wwkkkw0g40kwk4o4c44co',
"password": 'ratatab00m'} "username": 'wallabag',
"password": 'wallabag'}
data = Wallabag.get_token(host='http://localhost:8080', **params) print(self.host)
data = Wallabag.get_token(host=self.host, **params)
print(data)
self.assertTrue(isinstance(data, str), True) self.assertTrue(isinstance(data, str), True)
return data return data
@@ -75,6 +77,7 @@ class TestWallabag(unittest.TestCase):
params = {'title': 'I change the title', params = {'title': 'I change the title',
'archive': 0, 'archive': 0,
'tags': ["bimbo", "pipo"], 'tags': ["bimbo", "pipo"],
'order': 'asc',
'star': 0, 'star': 0,
'delete': 0} 'delete': 0}
self.assertTrue(isinstance(entry, int), True) self.assertTrue(isinstance(entry, int), True)
@@ -106,5 +109,6 @@ class TestWallabag(unittest.TestCase):
self.assertTrue(resp, True) self.assertTrue(resp, True)
""" """
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()