From 877c749f37659d284a7c45d62fde83f16c201572 Mon Sep 17 00:00:00 2001 From: Zev Averbach Date: Fri, 11 Aug 2023 18:43:36 +0300 Subject: [PATCH] implemented leaky bucket, made a script to run its dequeuing. formatted with black --- .gitignore | 1 + my_limiter/algos.py | 83 ++++++++++++++++++++-------- my_limiter/run_tasks_leaky_bucket.py | 27 +++++++++ my_limiter/wsgi.py | 5 +- 4 files changed, 92 insertions(+), 24 deletions(-) create mode 100644 my_limiter/run_tasks_leaky_bucket.py diff --git a/.gitignore b/.gitignore index 33f7cf2..94d0767 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ __pycache__/ env/ *.pyc +.vscode/ diff --git a/my_limiter/algos.py b/my_limiter/algos.py index ff306b2..b210abf 100644 --- a/my_limiter/algos.py +++ b/my_limiter/algos.py @@ -12,9 +12,6 @@ import redis r = redis.Redis() -MAX_CAPACITY = 8 - - class TooManyRequests(Exception): pass @@ -23,24 +20,61 @@ class EntryDoesntExist(Exception): pass +MAX_CAPACITY = 8 +STORE_NAME_PREFIX_LEAKING_BUCKET = "leaking_bucket:queue:tasks" +LEAKING_BUCKET_INDEX_NAME = "exporter:queue:tasks:index" -def leaking_bucket(identifier: str, data: str) -> None: + +def leaking_bucket_enqueue(identifier: str, data: str) -> None: """ When a request arrives, the system checks if the queue for this particular `identifier` is full. If it is not full, the request is added to the queue. - Otherwise, the request is dropped. + Otherwise, the request is dropped. - Requests are pulled from the queue and processed at regular intervals. - (a separate process implemented elsewhere) - TODO: implement that other process! + Requests are pulled from the queue and processed at regular intervals in + `leaking_bucket_dequeue` + TODO: implement `leaking_bucket_dequeue` - [ ] done """ - STORE_NAME_PREFIX = "leaking_bucket:queue:tasks" - store_name = f"{STORE_NAME_PREFIX}:{identifier}" - + store_name = f"{STORE_NAME_PREFIX_LEAKING_BUCKET}:{identifier}" + if r.llen(store_name) == MAX_CAPACITY: raise TooManyRequests r.lpush(store_name, data) + # this is to enable iterating through all the queues in the system + r.sadd(LEAKING_BUCKET_INDEX_NAME, identifier) + + +RUN_LEAKING_BUCKET_TASKS_EVERY_X_SECONDS = 15 +NUM_TASKS_TO_RUN_FOR_EACH_USER_AT_INTERVAL = 2 + + +def leaking_bucket_dequeue(): + """ + Iterate through all leaking bucket queues and process at least one task + from each of them. + + To be run on a schedule. + """ + + def run_task(data): + ... + + for identifier_bytes in r.smembers(LEAKING_BUCKET_INDEX_NAME): + identifier = identifier_bytes.decode() + task_list = f"{STORE_NAME_PREFIX_LEAKING_BUCKET}:{identifier}" + print( + f"{dt.datetime.now().isoformat()}: dequeueing " + f"{NUM_TASKS_TO_RUN_FOR_EACH_USER_AT_INTERVAL} tasks from {task_list}" + ) + for _ in range(NUM_TASKS_TO_RUN_FOR_EACH_USER_AT_INTERVAL): + data = r.rpop(task_list) + if data is not None: + data = data.decode() + print(f"running task with data '{data}'") + run_task(data) + else: + print("there wasn't anything there") TOKEN_BUCKET = {} @@ -57,9 +91,9 @@ def get_entry_from_token_bucket(identifier: str) -> dict | None: def token_bucket(identifier: str) -> str: """ - Tokens are put in the bucket at preset rates periodically. - Once the bucket is full, no more tokens are added. - The refiller puts NUM_TOKENS_TO_REFILL tokens into the bucket every minute. + Tokens are put in the bucket at preset rates periodically. + Once the bucket is full, no more tokens are added. + The refiller puts NUM_TOKENS_TO_REFILL tokens into the bucket every minute. To be explicit, there is a token bucket for every `identifier`, aka every user/IP @@ -70,17 +104,22 @@ def token_bucket(identifier: str) -> str: entry = get_entry_from_token_bucket(identifier) if entry is None: - TOKEN_BUCKET[identifier] = {'tokens': MAX_CAPACITY, 'last_refilled': dt.datetime.now().timestamp()} + TOKEN_BUCKET[identifier] = { + "tokens": MAX_CAPACITY, + "last_refilled": dt.datetime.now().timestamp(), + } else: - last_refilled = entry['last_refilled'] - now = dt.datetime.now().timestamp() + last_refilled = entry["last_refilled"] + now = dt.datetime.now().timestamp() if now >= last_refilled + REFILL_EVERY_SECONDS: - num_tokens_to_refill = int((now - last_refilled) // REFILL_EVERY_SECONDS * NUM_TOKENS_TO_REFILL) - entry['last_refilled'] = dt.datetime.now().timestamp() - entry['tokens'] = min(entry['tokens'] + num_tokens_to_refill, MAX_CAPACITY) + num_tokens_to_refill = int( + (now - last_refilled) // REFILL_EVERY_SECONDS * NUM_TOKENS_TO_REFILL + ) + entry["last_refilled"] = dt.datetime.now().timestamp() + entry["tokens"] = min(entry["tokens"] + num_tokens_to_refill, MAX_CAPACITY) - left = TOKEN_BUCKET[identifier]['tokens'] + left = TOKEN_BUCKET[identifier]["tokens"] if left == 0: raise TooManyRequests - TOKEN_BUCKET[identifier]['tokens'] -= 1 \ No newline at end of file + TOKEN_BUCKET[identifier]["tokens"] -= 1 diff --git a/my_limiter/run_tasks_leaky_bucket.py b/my_limiter/run_tasks_leaky_bucket.py new file mode 100644 index 0000000..d1d7041 --- /dev/null +++ b/my_limiter/run_tasks_leaky_bucket.py @@ -0,0 +1,27 @@ +""" +adapted from https://pravash-techie.medium.com/python-sched-for-automating-tasks-in-python-396618864658 +""" +import sched +import time + +from algos import leaking_bucket_dequeue, RUN_LEAKING_BUCKET_TASKS_EVERY_X_SECONDS + +scheduler = sched.scheduler(time.time, time.sleep) + + +def repeat_task(first_time=False) -> None: + scheduler.enter( + delay=RUN_LEAKING_BUCKET_TASKS_EVERY_X_SECONDS if not first_time else 0, + priority=1, + action=leaking_bucket_dequeue, + ) + scheduler.enter( + delay=RUN_LEAKING_BUCKET_TASKS_EVERY_X_SECONDS if not first_time else 0, + priority=1, + action=repeat_task, + ) + + +print() +repeat_task(True) +scheduler.run() diff --git a/my_limiter/wsgi.py b/my_limiter/wsgi.py index cadffc4..b80f651 100644 --- a/my_limiter/wsgi.py +++ b/my_limiter/wsgi.py @@ -40,6 +40,7 @@ def before_request(): except algos.TooManyRequests: return f.abort(429) -@application.route('/') + +@application.route("/") def home(): - return 'Hello

Hello

' \ No newline at end of file + return "Hello

Hello

"