Intro

interview_workbook/concurrency /app/src/interview_workbook/concurrency/intro.py
View Source

Algorithm Notes

Summary: Intro — notes not yet curated.
Time: Estimate via loops/recurrences; common classes: O(1), O(log n), O(n), O(n log n), O(n^2)
Space: Count auxiliary structures and recursion depth.
Tip: See the Big-O Guide for how to derive bounds and compare trade-offs.

Big-O Guide

Source

import asyncio
import threading
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from queue import Queue

"""
Concurrency & Parallelism in Python - Practical Overview

Covers:
- Threading (and GIL limitations)
- Producer-Consumer pattern with threads
- Multiprocessing (true parallel CPU-bound work)
- asyncio (async/await patterns for IO-bound concurrency)
- concurrent.futures (unified high-level API)

Guidance:
- CPU-bound: Prefer multiprocessing or native extensions (NumPy) due to the GIL.
- IO-bound (network/file/db): threading or asyncio are effective.
- Avoid oversubscription: too many threads/processes can degrade performance.
"""


# ------------------------
# Threading Basics (IO-Bound)
# ------------------------
def fetch_io_simulated(id: int, delay: float = 0.3) -> tuple[int, float]:
    """Simulate IO-bound task (e.g., HTTP request) with time.sleep."""
    time.sleep(delay)
    return id, delay


def threading_demo(
    num_tasks: int = 10, delay: float = 0.3, max_workers: int = 4
) -> list[tuple[int, float]]:
    """
    Use ThreadPoolExecutor for IO-bound tasks. Demonstrates GIL-friendly concurrency.
    """
    start = time.perf_counter()
    results: list[tuple[int, float]] = []
    with ThreadPoolExecutor(max_workers=max_workers) as tp:
        futures = [tp.submit(fetch_io_simulated, i, delay) for i in range(num_tasks)]
        for fut in as_completed(futures):
            results.append(fut.result())
    elapsed = time.perf_counter() - start
    print(
        f"[threading] Completed {num_tasks} I/O tasks in {elapsed:.2f}s with {max_workers} threads"
    )
    return results


# ------------------------
# Producer-Consumer (Threads + Queue)
# ------------------------
def producer_consumer_demo(num_items: int = 20, num_workers: int = 3) -> None:
    """
    Classic producer-consumer using Queue for thread-safe communication.
    """
    q: Queue[int] = Queue(maxsize=50)

    def producer():
        for i in range(num_items):
            q.put(i)
            # Simulate production time
            time.sleep(0.01)
        # Signal shutdown to workers
        for _ in range(num_workers):
            q.put(None)  # type: ignore

    def worker(wid: int):
        while True:
            item = q.get()
            if item is None:
                q.task_done()
                print(f"[worker-{wid}] shutting down")
                break
            # Simulate processing
            time.sleep(0.02)
            print(f"[worker-{wid}] processed {item}")
            q.task_done()

    threads = [threading.Thread(target=worker, args=(w,)) for w in range(num_workers)]
    for t in threads:
        t.start()

    prod = threading.Thread(target=producer)
    prod.start()

    q.join()
    prod.join()
    for t in threads:
        t.join()


# ------------------------
# Multiprocessing (CPU-Bound)
# ------------------------
def cpu_bound(n: int) -> int:
    """
    CPU-bound task: sum of squares 0..n-1 (toy example).
    Real CPU tasks might be image processing, ML, crypto, etc.
    """
    total = 0
    for i in range(n):
        total += i * i
    return total


def multiprocessing_demo(work_items: list[int], max_workers: int = 4) -> list[int]:
    """
    Use ProcessPoolExecutor for true parallelism on CPU-bound tasks.
    """
    start = time.perf_counter()
    results: list[int] = []
    with ProcessPoolExecutor(max_workers=max_workers) as pp:
        futures = [pp.submit(cpu_bound, n) for n in work_items]
        for fut in as_completed(futures):
            results.append(fut.result())
    elapsed = time.perf_counter() - start
    print(
        f"[multiprocessing] Completed {len(work_items)} CPU tasks in {elapsed:.2f}s with {max_workers} processes"
    )
    return results


# ------------------------
# asyncio (IO-Bound, High Concurrency)
# ------------------------
async def async_fetch_io_simulated(id: int, delay: float = 0.3) -> tuple[int, float]:
    await asyncio.sleep(delay)
    return id, delay


async def asyncio_demo(
    num_tasks: int = 20, delay: float = 0.1, concurrency: int = 10
) -> list[tuple[int, float]]:
    """
    Run many IO-bound tasks concurrently with asyncio & semaphore (rate limiting).
    """
    sem = asyncio.Semaphore(concurrency)
    results: list[tuple[int, float]] = []

    async def bounded_task(i: int):
        async with sem:
            res = await async_fetch_io_simulated(i, delay)
            results.append(res)

    start = time.perf_counter()
    await asyncio.gather(*(bounded_task(i) for i in range(num_tasks)))
    elapsed = time.perf_counter() - start
    print(
        f"[asyncio] Completed {num_tasks} async I/O tasks in {elapsed:.2f}s with concurrency={concurrency}"
    )
    return results


def demo():
    print("Concurrency & Parallelism Demo")
    print("=" * 40)

    # Threading demo (I/O-bound)
    threading_demo(num_tasks=12, delay=0.2, max_workers=4)
    print()

    # Producer-Consumer
    print("Producer-Consumer (threads + Queue):")
    producer_consumer_demo(num_items=12, num_workers=3)
    print()

    # Multiprocessing demo (CPU-bound)
    work = [5_000_00, 600_000, 700_000, 800_000]  # adjust workload to your machine
    multiprocessing_demo(work_items=work, max_workers=4)
    print()

    # asyncio demo (I/O-bound, high concurrency)
    print("asyncio high-concurrency demo:")
    asyncio.run(asyncio_demo(num_tasks=25, delay=0.05, concurrency=8))
    print()

    print("Notes & Interview Tips:")
    print(
        "  - GIL: only one Python bytecode thread runs at a time; use threads for I/O, processes for CPU."
    )
    print(
        "  - asyncio: single-threaded concurrency via event loop; great for sockets/HTTP/db with async drivers."
    )
    print("  - concurrent.futures: unified interface; ThreadPoolExecutor vs ProcessPoolExecutor.")
    print("  - Producer-Consumer: Queue provides safe cross-thread communication and backpressure.")
    print("  - Be mindful of context switching overhead, and use appropriate pool sizes.")


if __name__ == "__main__":
    demo()