Rate Limiter

interview_workbook/systems /app/src/interview_workbook/systems/rate_limiter.py
View Source

Algorithm Notes

Summary: Rate Limiter — notes not yet curated.
Time: Estimate via loops/recurrences; common classes: O(1), O(log n), O(n), O(n log n), O(n^2)
Space: Count auxiliary structures and recursion depth.
Tip: See the Big-O Guide for how to derive bounds and compare trade-offs.

Big-O Guide

Source

from __future__ import annotations

import threading
import time


class TokenBucket:
    """
    Token Bucket rate limiter.

    - capacity: max tokens the bucket can hold
    - rate: tokens added per second (float supported)

    acquire(n=1) returns True if n tokens are available immediately, else False.
    acquire_blocking(n=1, timeout=None) waits up to timeout seconds for tokens.

    Thread-safe.

    Use cases:
      - API request limiting (QPS)
      - Smooth burst handling up to capacity
    """

    def __init__(self, capacity: int, rate: float):
        if capacity <= 0 or rate <= 0:
            raise ValueError("capacity and rate must be > 0")
        self.capacity = float(capacity)
        self.rate = float(rate)
        self._tokens = float(capacity)
        self._last = time.monotonic()
        self._lock = threading.Lock()
        self._cv = threading.Condition(self._lock)

    def _refill(self) -> None:
        now = time.monotonic()
        elapsed = now - self._last
        if elapsed > 0:
            self._tokens = min(self.capacity, self._tokens + elapsed * self.rate)
            self._last = now

    def acquire(self, n: int = 1) -> bool:
        with self._lock:
            self._refill()
            if self._tokens >= n:
                self._tokens -= n
                return True
            return False

    def acquire_blocking(self, n: int = 1, timeout: float | None = None) -> bool:
        end = None if timeout is None else time.monotonic() + timeout
        with self._cv:
            while True:
                self._refill()
                if self._tokens >= n:
                    self._tokens -= n
                    return True
                if timeout is not None:
                    remaining = end - time.monotonic()
                    if remaining <= 0:
                        return False
                    # Wait a bit or remaining time, whichever smaller
                    self._cv.wait(timeout=min(remaining, 0.05))
                else:
                    self._cv.wait(timeout=0.05)


class LeakyBucket:
    """
    Leaky Bucket rate limiter.

    - capacity: max queue size
    - rate: leak rate (items per second)

    offer() returns True if request accepted into bucket, else False when bucket full.
    drip() is called implicitly on offer to leak according to rate.

    Behavior:
      - Smooths bursts by queuing up to capacity
      - Constant leak rate over time

    Thread-safe.
    """

    def __init__(self, capacity: int, rate: float):
        if capacity <= 0 or rate <= 0:
            raise ValueError("capacity and rate must be > 0")
        self.capacity = capacity
        self.rate = rate
        self._queue = 0  # queued items
        self._last = time.monotonic()
        self._lock = threading.Lock()

    def _drip(self) -> None:
        now = time.monotonic()
        elapsed = now - self._last
        if elapsed <= 0:
            return
        leaked = elapsed * self.rate
        if leaked > 0:
            self._queue = max(0, self._queue - leaked)
            self._last = now

    def offer(self) -> bool:
        with self._lock:
            self._drip()
            if self._queue + 1 <= self.capacity:
                self._queue += 1
                return True
            return False


def demo():
    print("Rate Limiter Demo (TokenBucket, LeakyBucket)")
    print("=" * 50)

    # Token bucket: capacity 5 tokens, refill 2 tokens/sec
    tb = TokenBucket(capacity=5, rate=2.0)
    print("TokenBucket test (5 capacity, 2 tokens/sec):")
    accepted = 0
    for _ in range(10):
        ok = tb.acquire()
        print(f"  acquire() -> {ok}")
        if ok:
            accepted += 1
        time.sleep(0.2)  # simulate time passing
    print(f"Accepted immediately: {accepted}/10")
    print()

    # Token bucket blocking acquire
    print("TokenBucket acquire_blocking for 3 tokens with timeout:")
    ok = tb.acquire_blocking(n=3, timeout=3.0)
    print(f"  acquire_blocking(3, timeout=3) -> {ok}")
    print()

    # Leaky bucket: capacity 5 queued, leak 2 items/sec
    lb = LeakyBucket(capacity=5, rate=2.0)
    print("LeakyBucket test (5 capacity, leak 2/sec):")
    results = []
    for _ in range(10):
        ok = lb.offer()
        results.append(ok)
        time.sleep(0.1)
    print(f"  offer() results: {results}")
    print()

    print("Notes & Interview Tips:")
    print("  - Token bucket: limits average rate while allowing bursts up to capacity.")
    print("  - Leaky bucket: enforces a fixed outflow rate by queuing/dropping excess.")
    print(
        "  - Distributed systems: combine local buckets with centralized quota for global rate limiting."
    )
    print("  - Consider time precision, thread/process safety, and fairness across clients.")


if __name__ == "__main__":
    demo()