implemented leaky bucket, made a script to run its dequeuing. formatted with black

This commit is contained in:
2023-08-11 18:43:36 +03:00
parent 32eddd3de6
commit 877c749f37
4 changed files with 92 additions and 24 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
__pycache__/
env/
*.pyc
.vscode/

View File

@@ -12,9 +12,6 @@ import redis
r = redis.Redis()
MAX_CAPACITY = 8
class TooManyRequests(Exception):
pass
@@ -23,24 +20,61 @@ class EntryDoesntExist(Exception):
pass
MAX_CAPACITY = 8
STORE_NAME_PREFIX_LEAKING_BUCKET = "leaking_bucket:queue:tasks"
LEAKING_BUCKET_INDEX_NAME = "exporter:queue:tasks:index"
def leaking_bucket(identifier: str, data: str) -> None:
def leaking_bucket_enqueue(identifier: str, data: str) -> None:
"""
When a request arrives, the system checks if the queue for this particular
`identifier` is full. If it is not full, the request is added to the queue.
Otherwise, the request is dropped.
Otherwise, the request is dropped.
Requests are pulled from the queue and processed at regular intervals.
(a separate process implemented elsewhere)
TODO: implement that other process!
Requests are pulled from the queue and processed at regular intervals in
`leaking_bucket_dequeue`
TODO: implement `leaking_bucket_dequeue`
- [ ] done
"""
STORE_NAME_PREFIX = "leaking_bucket:queue:tasks"
store_name = f"{STORE_NAME_PREFIX}:{identifier}"
store_name = f"{STORE_NAME_PREFIX_LEAKING_BUCKET}:{identifier}"
if r.llen(store_name) == MAX_CAPACITY:
raise TooManyRequests
r.lpush(store_name, data)
# this is to enable iterating through all the queues in the system
r.sadd(LEAKING_BUCKET_INDEX_NAME, identifier)
RUN_LEAKING_BUCKET_TASKS_EVERY_X_SECONDS = 15
NUM_TASKS_TO_RUN_FOR_EACH_USER_AT_INTERVAL = 2
def leaking_bucket_dequeue():
"""
Iterate through all leaking bucket queues and process at least one task
from each of them.
To be run on a schedule.
"""
def run_task(data):
...
for identifier_bytes in r.smembers(LEAKING_BUCKET_INDEX_NAME):
identifier = identifier_bytes.decode()
task_list = f"{STORE_NAME_PREFIX_LEAKING_BUCKET}:{identifier}"
print(
f"{dt.datetime.now().isoformat()}: dequeueing "
f"{NUM_TASKS_TO_RUN_FOR_EACH_USER_AT_INTERVAL} tasks from {task_list}"
)
for _ in range(NUM_TASKS_TO_RUN_FOR_EACH_USER_AT_INTERVAL):
data = r.rpop(task_list)
if data is not None:
data = data.decode()
print(f"running task with data '{data}'")
run_task(data)
else:
print("there wasn't anything there")
TOKEN_BUCKET = {}
@@ -57,9 +91,9 @@ def get_entry_from_token_bucket(identifier: str) -> dict | None:
def token_bucket(identifier: str) -> str:
"""
Tokens are put in the bucket at preset rates periodically.
Once the bucket is full, no more tokens are added.
The refiller puts NUM_TOKENS_TO_REFILL tokens into the bucket every minute.
Tokens are put in the bucket at preset rates periodically.
Once the bucket is full, no more tokens are added.
The refiller puts NUM_TOKENS_TO_REFILL tokens into the bucket every minute.
To be explicit, there is a token bucket for every `identifier`,
aka every user/IP
@@ -70,17 +104,22 @@ def token_bucket(identifier: str) -> str:
entry = get_entry_from_token_bucket(identifier)
if entry is None:
TOKEN_BUCKET[identifier] = {'tokens': MAX_CAPACITY, 'last_refilled': dt.datetime.now().timestamp()}
TOKEN_BUCKET[identifier] = {
"tokens": MAX_CAPACITY,
"last_refilled": dt.datetime.now().timestamp(),
}
else:
last_refilled = entry['last_refilled']
now = dt.datetime.now().timestamp()
last_refilled = entry["last_refilled"]
now = dt.datetime.now().timestamp()
if now >= last_refilled + REFILL_EVERY_SECONDS:
num_tokens_to_refill = int((now - last_refilled) // REFILL_EVERY_SECONDS * NUM_TOKENS_TO_REFILL)
entry['last_refilled'] = dt.datetime.now().timestamp()
entry['tokens'] = min(entry['tokens'] + num_tokens_to_refill, MAX_CAPACITY)
num_tokens_to_refill = int(
(now - last_refilled) // REFILL_EVERY_SECONDS * NUM_TOKENS_TO_REFILL
)
entry["last_refilled"] = dt.datetime.now().timestamp()
entry["tokens"] = min(entry["tokens"] + num_tokens_to_refill, MAX_CAPACITY)
left = TOKEN_BUCKET[identifier]['tokens']
left = TOKEN_BUCKET[identifier]["tokens"]
if left == 0:
raise TooManyRequests
TOKEN_BUCKET[identifier]['tokens'] -= 1
TOKEN_BUCKET[identifier]["tokens"] -= 1

View File

@@ -0,0 +1,27 @@
"""
adapted from https://pravash-techie.medium.com/python-sched-for-automating-tasks-in-python-396618864658
"""
import sched
import time
from algos import leaking_bucket_dequeue, RUN_LEAKING_BUCKET_TASKS_EVERY_X_SECONDS
scheduler = sched.scheduler(time.time, time.sleep)
def repeat_task(first_time=False) -> None:
scheduler.enter(
delay=RUN_LEAKING_BUCKET_TASKS_EVERY_X_SECONDS if not first_time else 0,
priority=1,
action=leaking_bucket_dequeue,
)
scheduler.enter(
delay=RUN_LEAKING_BUCKET_TASKS_EVERY_X_SECONDS if not first_time else 0,
priority=1,
action=repeat_task,
)
print()
repeat_task(True)
scheduler.run()

View File

@@ -40,6 +40,7 @@ def before_request():
except algos.TooManyRequests:
return f.abort(429)
@application.route('/')
@application.route("/")
def home():
return '<!doctype html><title>Hello</title><h1>Hello</h1>'
return "<!doctype html><title>Hello</title><h1>Hello</h1>"