So I've been working on this project where different parts of the system needed to talk to each other. You know, the usual stuff - user signs up, order gets updated, notifications need to be sent... The typical advice? "Just use RabbitMQ or Redis!" But hmm... that felt like bringing a tank to a knife fight 🤷♂️. Especially that time to the market matters and I was the only dev there. So I wanted to be able to move fast.
I needed something simple:
Here's what I came up with:
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Dict, List
from dataclasses import dataclass
@dataclass
class Event:
event_type: str
class SimpleEventBus:
def __init__(self):
self._handlers: Dict[str, List[Callable]] = {}
self._executor = ThreadPoolExecutor(max_workers=4)
def publish(self, event: Event) -> None:
if event.event_type in self._handlers:
for handler in self._handlers[event.event_type]:
# Fire-and-forget execution
self._executor.submit(handler, event)
def subscribe(self, event_type: str, handler: Callable) -> None:
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
This one is crucial to understand what's happening here.
First thing you need to know - Python has this thing called the GIL. It's basically Python's way of managing memory safely in a multi-threaded environment. Here's how it actually works:
A small detail that many don't know: When you're doing CPU work (like calculations), you're stuck with one thread at a time. But for I/O work (which is what most event handlers do), Python can run multiple threads effectively in parallel. This is perfect for our event bus since most handlers are doing I/O anyway!