From 32eddd3de61092a02591aa29fc9899fe64f8aa59 Mon Sep 17 00:00:00 2001 From: Zev Averbach Date: Fri, 11 Aug 2023 17:22:52 +0300 Subject: [PATCH] implemented leaky bucket, changed ip to identifier, added a bunch of documentation so the repo can serve as much of a journal entry as a collection of code. added a README in that effort too. --- my_limiter/README.md | 5 ++++ my_limiter/algos.py | 70 +++++++++++++++++++++++++++++++++++++------- my_limiter/wsgi.py | 10 +++++-- 3 files changed, 72 insertions(+), 13 deletions(-) create mode 100644 my_limiter/README.md diff --git a/my_limiter/README.md b/my_limiter/README.md new file mode 100644 index 0000000..d0b846c --- /dev/null +++ b/my_limiter/README.md @@ -0,0 +1,5 @@ +This repository contains my implementations and explorations of rate limiting, drawn initially from the book _System Design Interview_. + +I'm embarking on this in order to get a great software engineering (probably back-end) job, and at the moment I have Happy Scribe in mind since I have an interview with them on August 24th, directly after our family vacation in Greece. + +Over a year ago (early 2022) I did a job search, and some of the interview processes ended right before or after the system design phase, so it's obviously something I need in my portfolio to truly be considered for senior dev roles. While I'd love to get a job as a junior or mid-level, my salary requirements and age (45) push me towards senior. That, and maybe the fact that I'm a decent programmer by now. \ No newline at end of file diff --git a/my_limiter/algos.py b/my_limiter/algos.py index 1afb7c6..ff306b2 100644 --- a/my_limiter/algos.py +++ b/my_limiter/algos.py @@ -1,28 +1,76 @@ +""" +These are implementations of different (in-application) rate limiting algorithms. + +`identifier` is used as the first (usually only) argument for each implementation +because it might refer to IP address, a session ID, or perhaps an API key or token. +""" import datetime as dt +import redis -TOKEN_BUCKET = {} -TIME_INTERVAL_SECONDS = 15 +r = redis.Redis() + + +MAX_CAPACITY = 8 + class TooManyRequests(Exception): pass -def token_bucket(ip: str) -> str: +class EntryDoesntExist(Exception): + pass + + + +def leaking_bucket(identifier: str, data: str) -> None: + """ + When a request arrives, the system checks if the queue for this particular + `identifier` is full. If it is not full, the request is added to the queue. + Otherwise, the request is dropped. + + Requests are pulled from the queue and processed at regular intervals. + (a separate process implemented elsewhere) + TODO: implement that other process! + - [ ] done + """ + STORE_NAME_PREFIX = "leaking_bucket:queue:tasks" + store_name = f"{STORE_NAME_PREFIX}:{identifier}" + + if r.llen(store_name) == MAX_CAPACITY: + raise TooManyRequests + r.lpush(store_name, data) + + +TOKEN_BUCKET = {} + + +def get_entry_from_token_bucket(identifier: str) -> dict | None: + """ + This is implemented independently in order to decouple it from its caller. + Here it is initially implemented in-memory, but for scalability we'd + want to use something more long-lived. + """ + return TOKEN_BUCKET.get(identifier) + + +def token_bucket(identifier: str) -> str: """ Tokens are put in the bucket at preset rates periodically. Once the bucket is full, no more tokens are added. The refiller puts NUM_TOKENS_TO_REFILL tokens into the bucket every minute. - """ - REFILL_EVERY_SECONDS = TIME_INTERVAL_SECONDS - NUM_TOKENS_TO_REFILL = 4 - MAX_CAPACITY = 8 - entry = TOKEN_BUCKET.get(ip) + To be explicit, there is a token bucket for every `identifier`, + aka every user/IP + """ + REFILL_EVERY_SECONDS = 15 + NUM_TOKENS_TO_REFILL = 4 + + entry = get_entry_from_token_bucket(identifier) if entry is None: - TOKEN_BUCKET[ip] = {'tokens': MAX_CAPACITY, 'last_refilled': dt.datetime.now().timestamp()} + TOKEN_BUCKET[identifier] = {'tokens': MAX_CAPACITY, 'last_refilled': dt.datetime.now().timestamp()} else: last_refilled = entry['last_refilled'] now = dt.datetime.now().timestamp() @@ -31,8 +79,8 @@ def token_bucket(ip: str) -> str: entry['last_refilled'] = dt.datetime.now().timestamp() entry['tokens'] = min(entry['tokens'] + num_tokens_to_refill, MAX_CAPACITY) - left = TOKEN_BUCKET[ip]['tokens'] + left = TOKEN_BUCKET[identifier]['tokens'] if left == 0: raise TooManyRequests - TOKEN_BUCKET[ip]['tokens'] -= 1 \ No newline at end of file + TOKEN_BUCKET[identifier]['tokens'] -= 1 \ No newline at end of file diff --git a/my_limiter/wsgi.py b/my_limiter/wsgi.py index 61f01f3..cadffc4 100644 --- a/my_limiter/wsgi.py +++ b/my_limiter/wsgi.py @@ -1,7 +1,10 @@ """ +TODO: implement token bucket + - [ ] in-app + - [x] in-memory + - [ ] redis TODO: implement leaky bucket - in-app - - [x] in-memory - [ ] redis - [ ] redis cluster - [ ] Flask middleware - https://flask.palletsprojects.com/en/2.1.x/quickstart/#hooking-in-wsgi-middleware @@ -10,10 +13,13 @@ TODO: implement leaky bucket - [ ] AWS API Gateway - [ ] HAProxy Stick Tables - https://www.haproxy.com/blog/introduction-to-haproxy-stick-tables - [ ] Cloudflare (Spectrum?) +TODO: implement expiring tokens TODO: implement fixed window counter TODO: implement sliding window log TODO: implement sliding window counter -TODO: use session IDs instead of IP address +TODO: use session IDs or API keys instead of IP address +TODO: set headers appropriately in each case: https://www.ietf.org/archive/id/draft-polli-ratelimit-headers-02.html#name-ratelimit-headers-currently +TODO: implement different rate limiting for each endpoint, using a `cost` variable for a given task """ import flask as f