Compare commits

...

10 Commits

9 changed files with 196 additions and 58 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
__pycache__/
env/
*.pyc
.vscode/

31
README.md Normal file
View File

@@ -0,0 +1,31 @@
# Purpose
This repository contains my implementations and explorations of rate limiting, drawn initially from the book _System Design Interview_.
I'm embarking on this in order to get a great software engineering (probably back-end) job, and at the moment I have Happy Scribe in mind since I have an interview with them on August 24th, directly after our family vacation in Greece.
Over a year ago (early 2022) I did a job search, and some of the interview processes ended right before or after the system design phase, so it's obviously something I need in my portfolio to truly be considered for senior dev roles. While I'd love to get a job as a junior or mid-level, my salary requirements and age (45) push me towards senior. That, and maybe the fact that I'm a decent programmer by now.
# TODO
- [ ] implement token bucket
- [ ] in-app
- [x] in-memory, lazy refill
- [ ] redis, process to refill
- [ ] implement leaky bucket
- in-app
- [x] redis
- [ ] redis cluster
- [ ] Flask middleware - https://flask.palletsprojects.com/en/2.1.x/quickstart/#hooking-in-wsgi-middleware
- [ ] NGINX - https://leandromoreira.com/2019/01/25/how-to-build-a-distributed-throttling-system-with-nginx-lua-redis/
- https://www.nginx.com/blog/rate-limiting-nginx/
- [ ] AWS API Gateway
- [ ] HAProxy Stick Tables - https://www.haproxy.com/blog/introduction-to-haproxy-stick-tables
- [ ] Cloudflare (Spectrum?)
- [ ] implement expiring tokens
- [ ] implement fixed window counter
- [ ] implement sliding window log
- [ ] implement sliding window counter
- [ ] use session IDs or API keys instead of IP address
- [ ] set headers appropriately in each case: https://www.ietf.org/archive/id/draft-polli-ratelimit-headers-02.html#name-ratelimit-headers-currently
- [ ] implement different rate limiting for each endpoint, using a `cost` variable for a given task

View File

@@ -1,38 +0,0 @@
import datetime as dt
TOKEN_BUCKET = {}
TIME_INTERVAL_SECONDS = 15
class TooManyRequests(Exception):
pass
def token_bucket(ip: str) -> str:
"""
Tokens are put in the bucket at preset rates periodically.
Once the bucket is full, no more tokens are added.
The refiller puts NUM_TOKENS_TO_REFILL tokens into the bucket every minute.
"""
REFILL_EVERY_SECONDS = TIME_INTERVAL_SECONDS
NUM_TOKENS_TO_REFILL = 4
MAX_CAPACITY = 8
entry = TOKEN_BUCKET.get(ip)
if entry is None:
TOKEN_BUCKET[ip] = {'tokens': MAX_CAPACITY, 'last_refilled': dt.datetime.now().timestamp()}
else:
last_refilled = entry['last_refilled']
now = dt.datetime.now().timestamp()
if now >= last_refilled + REFILL_EVERY_SECONDS:
num_tokens_to_refill = int((now - last_refilled) // REFILL_EVERY_SECONDS * NUM_TOKENS_TO_REFILL)
entry['last_refilled'] = dt.datetime.now().timestamp()
entry['tokens'] = min(entry['tokens'] + num_tokens_to_refill, MAX_CAPACITY)
left = TOKEN_BUCKET[ip]['tokens']
if left == 0:
raise TooManyRequests
TOKEN_BUCKET[ip]['tokens'] -= 1

View File

@@ -0,0 +1,13 @@
"""
These are implementations of different (in-application) rate limiting algorithms.
`identifier` is used as the first (usually only) argument for each implementation
because it might refer to IP address, a session ID, or perhaps an API key or token.
"""
from .exceptions import TooManyRequests
from .token_bucket_in_memory import token_bucket_in_memory_lazy_refill
from .leaky_bucket import (
leaking_bucket_dequeue,
leaking_bucket_enqueue,
RUN_LEAKING_BUCKET_TASKS_EVERY_X_SECONDS,
)

View File

@@ -0,0 +1,3 @@
class TooManyRequests(Exception):
pass

View File

@@ -0,0 +1,61 @@
import datetime as dt
import redis
from .exceptions import TooManyRequests
r = redis.Redis()
MAX_CAPACITY = 8
STORE_NAME_PREFIX_LEAKING_BUCKET = "leaking_bucket:queue:tasks"
LEAKING_BUCKET_INDEX_NAME = "exporter:queue:tasks:index"
RUN_LEAKING_BUCKET_TASKS_EVERY_X_SECONDS = 15
NUM_TASKS_TO_RUN_FOR_EACH_USER_AT_INTERVAL = 2
def leaking_bucket_enqueue(identifier: str, data: str = "") -> None:
"""
When a request arrives, the system checks if the queue for this particular
`identifier` is full. If it is not full, the request is added to the queue.
Otherwise, the request is dropped.
Requests are pulled from the queue and processed at regular intervals in
`leaking_bucket_dequeue`
"""
store_name = f"{STORE_NAME_PREFIX_LEAKING_BUCKET}:{identifier}"
if r.llen(store_name) == MAX_CAPACITY:
raise TooManyRequests
r.lpush(store_name, data)
# this is to enable iterating through all the queues in the system
r.sadd(LEAKING_BUCKET_INDEX_NAME, identifier)
def leaking_bucket_dequeue():
"""
Iterate through all leaking bucket queues and process at least one task
from each of them.
To be run on a schedule.
"""
def run_task(data):
...
for identifier_bytes in r.smembers(LEAKING_BUCKET_INDEX_NAME):
identifier = identifier_bytes.decode()
task_list = f"{STORE_NAME_PREFIX_LEAKING_BUCKET}:{identifier}"
print(
f"{dt.datetime.now().isoformat()}: dequeueing "
f"{NUM_TASKS_TO_RUN_FOR_EACH_USER_AT_INTERVAL} tasks from {task_list}"
)
for _ in range(NUM_TASKS_TO_RUN_FOR_EACH_USER_AT_INTERVAL):
data = r.rpop(task_list)
if data is not None:
data = data.decode()
print(f"running task with data '{data}'")
run_task(data)
else:
print("there wasn't anything there")

View File

@@ -0,0 +1,56 @@
import datetime as dt
import redis
from .exceptions import TooManyRequests
r = redis.Redis()
TOKEN_BUCKET = {}
MAX_CAPACITY = 8
REFILL_EVERY_SECONDS = 15
NUM_TOKENS_TO_REFILL = 4
def get_entry_from_token_bucket_in_memory(identifier: str) -> dict | None:
"""
This is implemented independently in order to decouple it from its caller.
Here it is initially implemented in-memory, but for scalability we'd
want to use something more durable.
"""
return TOKEN_BUCKET.get(identifier)
def token_bucket_in_memory_lazy_refill(identifier: str) -> str:
"""
Tokens are put in the bucket at preset rates periodically.
Once the bucket is full, no more tokens are added.
The refiller puts NUM_TOKENS_TO_REFILL tokens into the bucket every minute.
To be explicit, there is a token bucket for every `identifier`,
aka every user/IP
"""
entry = get_entry_from_token_bucket_in_memory(identifier)
if entry is None:
TOKEN_BUCKET[identifier] = {
"tokens": MAX_CAPACITY,
"last_refilled": dt.datetime.now().timestamp(),
}
else:
last_refilled = entry["last_refilled"]
now = dt.datetime.now().timestamp()
if now >= last_refilled + REFILL_EVERY_SECONDS:
num_tokens_to_refill = int(
(now - last_refilled) // REFILL_EVERY_SECONDS * NUM_TOKENS_TO_REFILL
)
entry["last_refilled"] = dt.datetime.now().timestamp()
entry["tokens"] = min(entry["tokens"] + num_tokens_to_refill, MAX_CAPACITY)
left = TOKEN_BUCKET[identifier]["tokens"]
if left == 0:
raise TooManyRequests
TOKEN_BUCKET[identifier]["tokens"] -= 1

View File

@@ -0,0 +1,27 @@
"""
adapted from https://pravash-techie.medium.com/python-sched-for-automating-tasks-in-python-396618864658
"""
import sched
import time
from algos import leaking_bucket_dequeue, RUN_LEAKING_BUCKET_TASKS_EVERY_X_SECONDS
scheduler = sched.scheduler(time.time, time.sleep)
def repeat_task(first_time=False) -> None:
scheduler.enter(
delay=RUN_LEAKING_BUCKET_TASKS_EVERY_X_SECONDS if not first_time else 0,
priority=1,
action=leaking_bucket_dequeue,
)
scheduler.enter(
delay=RUN_LEAKING_BUCKET_TASKS_EVERY_X_SECONDS if not first_time else 0,
priority=1,
action=repeat_task,
)
print()
repeat_task(True)
scheduler.run()

View File

@@ -1,20 +1,3 @@
"""
TODO: implement leaky bucket
- in-app
- [x] in-memory
- [ ] redis
- [ ] redis cluster
- [ ] Flask middleware - https://flask.palletsprojects.com/en/2.1.x/quickstart/#hooking-in-wsgi-middleware
- [ ] NGINX - https://leandromoreira.com/2019/01/25/how-to-build-a-distributed-throttling-system-with-nginx-lua-redis/
- https://www.nginx.com/blog/rate-limiting-nginx/
- [ ] AWS API Gateway
- [ ] HAProxy Stick Tables - https://www.haproxy.com/blog/introduction-to-haproxy-stick-tables
- [ ] Cloudflare (Spectrum?)
TODO: implement fixed window counter
TODO: implement sliding window log
TODO: implement sliding window counter
TODO: use session IDs instead of IP address
"""
import flask as f
from . import algos
@@ -23,7 +6,7 @@ from . import algos
application = f.Flask(__name__)
increment_requests_func = algos.token_bucket
increment_requests_func = algos.leaking_bucket_enqueue
@application.before_request
@@ -34,6 +17,7 @@ def before_request():
except algos.TooManyRequests:
return f.abort(429)
@application.route('/')
@application.route("/")
def home():
return '<!doctype html><title>Hello</title><h1>Hello</h1>'
return "<!doctype html><title>Hello</title><h1>Hello</h1>"