moved everything into api.py module, imported ApiClient into __init__.py. changed the path of the oauth-containing json file to current working directory, removed references to python-dotenv which has already been removed as a dependency
This commit is contained in:
@@ -2,7 +2,7 @@
|
||||
|
||||
This is a wrapper of [the roly-poly, not 100% ergonomic Freshbooks web API](https://www.freshbooks.com/api/start). It is far from comprehensive: It was created for the specific client- and invoice-related needs of [Averbach Transcription](https://avtranscription.com).
|
||||
|
||||
There are "band-aids" here to work around some of the API's shortcomings.
|
||||
There are "band-aids" here to work around some of the API's shortcomings. For example, you don't have to deal with pagination at all. 🎉
|
||||
|
||||
# Installation
|
||||
|
||||
|
||||
@@ -1,171 +1 @@
|
||||
import datetime as dt
|
||||
import json
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv, find_dotenv
|
||||
import requests
|
||||
|
||||
from avt_fresh.token import Token, NoToken
|
||||
|
||||
dotenv_path = find_dotenv()
|
||||
if not dotenv_path:
|
||||
dotenv_path = os.getcwd() + "/.env"
|
||||
load_dotenv(dotenv_path)
|
||||
|
||||
BASE_URL = "https://api.freshbooks.com"
|
||||
URL = f"{BASE_URL}/auth/oauth/token"
|
||||
HEADERS = {"Content-Type": "application/json"}
|
||||
|
||||
CLIENT_ID = os.getenv("FRESHBOOKS_API_CLIENT_ID")
|
||||
SECRET = os.getenv("FRESHBOOKS_API_CLIENT_SECRET")
|
||||
ACCOUNT_ID = os.getenv("FRESHBOOKS_ACCOUNT_ID")
|
||||
REDIRECT_URI = os.getenv("FRESHBOOKS_REDIRECT_URI")
|
||||
|
||||
|
||||
class ReRun(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AvtFreshException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def make_headers():
|
||||
return {**HEADERS, "Authorization": f"Bearer {_get_access_token()}"}
|
||||
|
||||
|
||||
REQUEST_LOOKUP = {
|
||||
"GET": (requests.get, "params"),
|
||||
"PUT": (requests.put, "json"),
|
||||
"POST": (requests.post, "json"),
|
||||
}
|
||||
|
||||
|
||||
def REQUEST(url, method_name: str, endpoint: str, stuff=None):
|
||||
method, arg_name = REQUEST_LOOKUP[method_name]
|
||||
if endpoint == "" or endpoint.startswith("?"):
|
||||
rendered_url = f"{url}{endpoint}"
|
||||
else:
|
||||
rendered_url = f"{url}/{endpoint}"
|
||||
_, the_rest = rendered_url.split("https://")
|
||||
if "//" in the_rest:
|
||||
the_rest = the_rest.replace("//", "/")
|
||||
rendered_url = f"https://{the_rest}"
|
||||
print(rendered_url)
|
||||
raw_response = method(
|
||||
rendered_url,
|
||||
**{
|
||||
arg_name: stuff or {},
|
||||
"headers": make_headers(),
|
||||
},
|
||||
)
|
||||
if not raw_response.ok:
|
||||
raise Exception(
|
||||
f"response: {raw_response.reason}\nrendered_url: '{rendered_url}'\nstuff:{stuff}"
|
||||
)
|
||||
try:
|
||||
response = raw_response.json()["response"]
|
||||
except KeyError:
|
||||
return raw_response.json()
|
||||
if "result" in response:
|
||||
return response["result"]
|
||||
raise Exception(
|
||||
f"response: {response}\nrendered_url: '{rendered_url}'\nstuff:{stuff}"
|
||||
)
|
||||
|
||||
|
||||
def _get_access_token(authorization_code: str | None = None) -> str:
|
||||
if authorization_code:
|
||||
token = _get_token_from_api_with_authorization_code(
|
||||
authorization_code=authorization_code
|
||||
)
|
||||
_save_token(token)
|
||||
return token["access_token"]
|
||||
|
||||
try:
|
||||
token = Token.get()
|
||||
except NoToken:
|
||||
auth_code = _get_code_from_user()
|
||||
token_dict = _get_token_from_api_with_authorization_code(auth_code)
|
||||
_save_token(token_dict)
|
||||
token = Token.get()
|
||||
if not _is_expired(token):
|
||||
return token.access_token
|
||||
|
||||
old_token = token
|
||||
try:
|
||||
token = _get_token_from_api_with_refresh_token(
|
||||
refresh_token=old_token.refresh_token
|
||||
)
|
||||
except AvtFreshException:
|
||||
auth_code = _get_code_from_user()
|
||||
token_dict = _get_token_from_api_with_authorization_code(auth_code)
|
||||
old_token.delete()
|
||||
_save_token(token_dict)
|
||||
token = Token.get()
|
||||
return token.access_token
|
||||
else:
|
||||
old_token.delete()
|
||||
_save_token(token)
|
||||
return token["access_token"]
|
||||
|
||||
|
||||
def _get_code_from_user() -> str:
|
||||
return input(
|
||||
"Please go here and get an auth code: "
|
||||
"https://my.freshbooks.com/#/developer, then enter it here: "
|
||||
)
|
||||
|
||||
|
||||
def _save_token(token_response: dict):
|
||||
Token.make_from_dict(token_response)
|
||||
|
||||
|
||||
def _is_expired(token: Token) -> bool:
|
||||
return dt.datetime.now().timestamp() > token.created_at + token.expires_in
|
||||
|
||||
|
||||
def _get_token_from_api_with_authorization_code(authorization_code: str) -> dict:
|
||||
payload = {
|
||||
"client_secret": SECRET,
|
||||
"client_id": CLIENT_ID,
|
||||
"redirect_uri": REDIRECT_URI,
|
||||
"grant_type": "authorization_code", # get this by visiting
|
||||
"code": authorization_code,
|
||||
}
|
||||
res = requests.post(URL, data=json.dumps(payload), headers=HEADERS)
|
||||
return _return_or_raise(res, payload)
|
||||
|
||||
|
||||
def _return_or_raise(response: requests.Response, payload: dict) -> dict:
|
||||
response_json = response.json()
|
||||
if "error" in response_json:
|
||||
raise AvtFreshException(f"{payload}:\n\n{response_json['error_description']}")
|
||||
del response_json["direct_buy_tokens"]
|
||||
return response_json
|
||||
|
||||
|
||||
def _get_token_from_api_with_refresh_token(refresh_token: str) -> dict:
|
||||
"""
|
||||
:return:
|
||||
{
|
||||
'access_token': <access_token: str>,
|
||||
'token_type': 'Bearer',
|
||||
'expires_in': <seconds: int>,
|
||||
'refresh_token': <refresh_token: str>,
|
||||
'scope': 'admin:all:legacy',
|
||||
'created_at': <timestamp>,
|
||||
'direct_buy_tokens': {}
|
||||
}
|
||||
|
||||
"""
|
||||
payload = {
|
||||
"client_secret": SECRET,
|
||||
"client_id": CLIENT_ID,
|
||||
"redirect_uri": REDIRECT_URI,
|
||||
"grant_type": "refresh_token",
|
||||
"refresh_token": refresh_token,
|
||||
}
|
||||
|
||||
res = requests.post(URL, data=json.dumps(payload), headers=HEADERS)
|
||||
return _return_or_raise(res, payload)
|
||||
from avt_fresh.api import ApiClient
|
||||
|
||||
166
avt_fresh/api.py
166
avt_fresh/api.py
@@ -1,4 +1,8 @@
|
||||
from avt_fresh import BASE_URL, REQUEST as _REQUEST
|
||||
import datetime as dt
|
||||
import json
|
||||
|
||||
import requests
|
||||
|
||||
from avt_fresh.client import (
|
||||
FreshbooksClient,
|
||||
get_freshbooks_client_from_email,
|
||||
@@ -26,6 +30,20 @@ from avt_fresh.payments import (
|
||||
get_default_payment_options,
|
||||
add_payment_option_to_invoice,
|
||||
)
|
||||
from avt_fresh.token import Token, NoToken
|
||||
|
||||
|
||||
BASE_URL = "https://api.freshbooks.com"
|
||||
URL = f"{BASE_URL}/auth/oauth/token"
|
||||
HEADERS = {"Content-Type": "application/json"}
|
||||
|
||||
|
||||
class ReRun(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AvtFreshException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ApiClient:
|
||||
@@ -38,6 +56,82 @@ class ApiClient:
|
||||
self.account_id = account_id
|
||||
self.url_lookup = self._make_url_lookup(account_id)
|
||||
|
||||
def make_headers(self):
|
||||
return {**HEADERS, "Authorization": f"Bearer {self._get_access_token()}"}
|
||||
|
||||
def _get_access_token(self, authorization_code: str | None = None) -> str:
|
||||
if authorization_code:
|
||||
token = self._get_token_from_api_with_authorization_code(
|
||||
authorization_code=authorization_code
|
||||
)
|
||||
_save_token(token)
|
||||
return token["access_token"]
|
||||
|
||||
try:
|
||||
token = Token.get()
|
||||
except NoToken:
|
||||
auth_code = _get_code_from_user()
|
||||
token_dict = self._get_token_from_api_with_authorization_code(auth_code)
|
||||
_save_token(token_dict)
|
||||
token = Token.get()
|
||||
if not _is_expired(token):
|
||||
return token.access_token
|
||||
|
||||
old_token = token
|
||||
try:
|
||||
token = self._get_token_from_api_with_refresh_token(
|
||||
refresh_token=old_token.refresh_token
|
||||
)
|
||||
except AvtFreshException:
|
||||
auth_code = _get_code_from_user()
|
||||
token_dict = self._get_token_from_api_with_authorization_code(auth_code)
|
||||
old_token.delete()
|
||||
_save_token(token_dict)
|
||||
token = Token.get()
|
||||
return token.access_token
|
||||
else:
|
||||
old_token.delete()
|
||||
_save_token(token)
|
||||
return token["access_token"]
|
||||
|
||||
def _get_token_from_api_with_authorization_code(
|
||||
self, authorization_code: str
|
||||
) -> dict:
|
||||
payload = {
|
||||
"client_secret": self.client_secret,
|
||||
"client_id": self.client_id,
|
||||
"redirect_uri": self.redirect_uri,
|
||||
"grant_type": "authorization_code", # get this by visiting
|
||||
"code": authorization_code,
|
||||
}
|
||||
res = requests.post(URL, data=json.dumps(payload), headers=HEADERS)
|
||||
return _return_or_raise(res, payload)
|
||||
|
||||
def _get_token_from_api_with_refresh_token(self, refresh_token: str) -> dict:
|
||||
"""
|
||||
:return:
|
||||
{
|
||||
'access_token': <access_token: str>,
|
||||
'token_type': 'Bearer',
|
||||
'expires_in': <seconds: int>,
|
||||
'refresh_token': <refresh_token: str>,
|
||||
'scope': 'admin:all:legacy',
|
||||
'created_at': <timestamp>,
|
||||
'direct_buy_tokens': {}
|
||||
}
|
||||
|
||||
"""
|
||||
payload = {
|
||||
"client_secret": self.client_secret,
|
||||
"client_id": self.client_id,
|
||||
"redirect_uri": self.redirect_uri,
|
||||
"grant_type": "refresh_token",
|
||||
"refresh_token": refresh_token,
|
||||
}
|
||||
|
||||
res = requests.post(URL, data=json.dumps(payload), headers=HEADERS)
|
||||
return _return_or_raise(res, payload)
|
||||
|
||||
@staticmethod
|
||||
def _make_url_lookup(account_id: str) -> dict[str, str]:
|
||||
return {
|
||||
@@ -58,8 +152,40 @@ class ApiClient:
|
||||
url = f"{BASE_URL}/accounting/account"
|
||||
else:
|
||||
url = self.url_lookup[what]
|
||||
return _REQUEST(
|
||||
url=url, method_name=method_name, endpoint=endpoint, stuff=stuff
|
||||
|
||||
method, arg_name = REQUEST_LOOKUP[method_name]
|
||||
if endpoint == "" or endpoint.startswith("?"):
|
||||
rendered_url = f"{url}{endpoint}"
|
||||
else:
|
||||
rendered_url = f"{url}/{endpoint}"
|
||||
|
||||
_, the_rest = rendered_url.split("https://")
|
||||
if "//" in the_rest:
|
||||
the_rest = the_rest.replace("//", "/")
|
||||
|
||||
rendered_url = f"https://{the_rest}"
|
||||
|
||||
print(rendered_url)
|
||||
|
||||
raw_response = method(
|
||||
rendered_url,
|
||||
**{
|
||||
arg_name: stuff or {},
|
||||
"headers": self.make_headers(),
|
||||
},
|
||||
)
|
||||
if not raw_response.ok:
|
||||
raise Exception(
|
||||
f"response: {raw_response.reason}\nrendered_url: '{rendered_url}'\nstuff:{stuff}"
|
||||
)
|
||||
try:
|
||||
response = raw_response.json()["response"]
|
||||
except KeyError:
|
||||
return raw_response.json()
|
||||
if "result" in response:
|
||||
return response["result"]
|
||||
raise Exception(
|
||||
f"response: {response}\nrendered_url: '{rendered_url}'\nstuff:{stuff}"
|
||||
)
|
||||
|
||||
def _GET(self, *, what: str, endpoint: str, params=None):
|
||||
@@ -172,7 +298,39 @@ class ApiClient:
|
||||
def get_default_payment_options(self) -> dict:
|
||||
return get_default_payment_options(get_func=self._GET)
|
||||
|
||||
def add_payment_option_to_invoice(self, invoice_id: int, gateway_name: str = "stripe") -> dict:
|
||||
def add_payment_option_to_invoice(
|
||||
self, invoice_id: int, gateway_name: str = "stripe"
|
||||
) -> dict:
|
||||
return add_payment_option_to_invoice(
|
||||
post_func=self._POST, invoice_id=invoice_id, gateway_name=gateway_name
|
||||
)
|
||||
|
||||
|
||||
REQUEST_LOOKUP = {
|
||||
"GET": (requests.get, "params"),
|
||||
"PUT": (requests.put, "json"),
|
||||
"POST": (requests.post, "json"),
|
||||
}
|
||||
|
||||
|
||||
def _get_code_from_user() -> str:
|
||||
return input(
|
||||
"Please go here and get an auth code: "
|
||||
"https://my.freshbooks.com/#/developer, then enter it here: "
|
||||
)
|
||||
|
||||
|
||||
def _save_token(token_response: dict) -> None:
|
||||
Token.make_from_dict(token_response)
|
||||
|
||||
|
||||
def _is_expired(token: Token) -> bool:
|
||||
return dt.datetime.now().timestamp() > token.created_at + token.expires_in
|
||||
|
||||
|
||||
def _return_or_raise(response: requests.Response, payload: dict) -> dict:
|
||||
response_json = response.json()
|
||||
if "error" in response_json:
|
||||
raise AvtFreshException(f"{payload}:\n\n{response_json['error_description']}")
|
||||
del response_json["direct_buy_tokens"]
|
||||
return response_json
|
||||
|
||||
@@ -95,7 +95,12 @@ def get_freshbooks_client_from_client_id(
|
||||
def get_all_clients(*, get_func: typing.Callable) -> list[FreshbooksClient]:
|
||||
response = get_func(what=WHAT, endpoint="")
|
||||
num_results = response["total"]
|
||||
return [FreshbooksClient.from_api(**c) for c in get_func(what=WHAT, endpoint=f"?{INCLUDE}&per_page={num_results}")["clients"]]
|
||||
return [
|
||||
FreshbooksClient.from_api(**c)
|
||||
for c in get_func(what=WHAT, endpoint=f"?{INCLUDE}&per_page={num_results}")[
|
||||
"clients"
|
||||
]
|
||||
]
|
||||
|
||||
|
||||
def delete(*, put_func: typing.Callable, client_id: int) -> None:
|
||||
|
||||
@@ -4,6 +4,9 @@ from typing import NamedTuple
|
||||
from uuid import uuid4
|
||||
|
||||
|
||||
TOKEN_PATH = Path("freshbooks_oauth_token.json")
|
||||
|
||||
|
||||
class NoToken(Exception):
|
||||
pass
|
||||
|
||||
@@ -23,7 +26,6 @@ class Token(NamedTuple):
|
||||
|
||||
@classmethod
|
||||
def _get_all(cls) -> dict:
|
||||
TOKEN_PATH = Path("~/freshbooks_oauth_token.json").expanduser()
|
||||
if not TOKEN_PATH.exists():
|
||||
raise NoToken
|
||||
with TOKEN_PATH.open(encoding="utf-8") as fin:
|
||||
@@ -43,7 +45,6 @@ class Token(NamedTuple):
|
||||
|
||||
@classmethod
|
||||
def make_from_dict(cls, token_dict: dict) -> None:
|
||||
TOKEN_PATH = Path("~/freshbooks_oauth_token.json").expanduser()
|
||||
if not TOKEN_PATH.exists():
|
||||
print(f"token JSON didn't exist, creating it at {TOKEN_PATH}:")
|
||||
TOKEN_PATH.touch()
|
||||
|
||||
Reference in New Issue
Block a user