Backpressure
TL;DR
Backpressure is a flow control mechanism where downstream components signal upstream components to slow down when they cannot keep up with incoming data. Instead of buffering indefinitely (leading to memory exhaustion) or dropping data silently, backpressure creates a feedback loop that propagates load information throughout the system, allowing producers to adjust their rate.
Why Backpressure?
Without backpressure:
┌─────────────────────────────────────────────────────────────────────┐
│ Unbounded Queue Problem │
│ │
│ Producer Queue Consumer │
│ (fast: 1000/s) (unbounded) (slow: 100/s) │
│ │
│ [████] ──────► [████████████...] ──────► [██] │
│ │ │
│ ▼ │
│ Memory grows │
│ without bound │
│ │ │
│ ▼ │
│ OutOfMemoryError! │
│ System crashes │
└─────────────────────────────────────────────────────────────────────┘With backpressure:
┌─────────────────────────────────────────────────────────────────────┐
│ Backpressure Flow Control │
│ │
│ Producer Queue Consumer │
│ (adjusts to (bounded) (100/s) │
│ 100/s) │
│ │
│ [██] ──────► [████████] ──────► [██] │
│ ▲ │ │ │
│ │ │ │ │
│ └───────────────┘ │ │
│ "Slow down!" │ │
│ └─────────────────────────────────┘ │
│ Propagate back │
│ │
│ Memory stays bounded, system stays stable │
└─────────────────────────────────────────────────────────────────────┘Backpressure Strategies
1. Blocking (Synchronous)
import queue
import threading
from typing import TypeVar, Generic
T = TypeVar('T')
class BlockingQueue(Generic[T]):
"""
Bounded queue that blocks producers when full.
Simplest form of backpressure.
"""
def __init__(self, max_size: int):
self.queue = queue.Queue(maxsize=max_size)
def put(self, item: T, timeout: float = None) -> bool:
"""
Block until space available.
Returns False if timeout expires.
"""
try:
self.queue.put(item, block=True, timeout=timeout)
return True
except queue.Full:
return False
def get(self, timeout: float = None) -> T:
"""Block until item available."""
return self.queue.get(block=True, timeout=timeout)
@property
def size(self) -> int:
return self.queue.qsize()
@property
def is_full(self) -> bool:
return self.queue.full()
# Usage
buffer = BlockingQueue[dict](max_size=1000)
def producer():
while True:
data = fetch_data()
# Producer blocks here when queue is full
# This is backpressure in action
buffer.put(data)
def consumer():
while True:
data = buffer.get()
process_slowly(data) # Takes timeBlocking Backpressure Timeline:
Time ─────────────────────────────────────────────────────────►
Producer: [produce][produce][produce][BLOCKED........][produce]
▲
│
Queue: [--][███][█████████████████████████][████████]
│
Queue full!
│
Consumer: [---][consume][consume][consume][consume][consume]2. Dropping (Lossy)
from enum import Enum
from typing import Optional
from collections import deque
class DropPolicy(Enum):
DROP_OLDEST = "drop_oldest" # Drop front of queue
DROP_NEWEST = "drop_newest" # Drop incoming item
DROP_RANDOM = "drop_random" # Drop random item
class DroppingQueue(Generic[T]):
"""
Bounded queue that drops items when full.
Good when recent data is more important.
"""
def __init__(self, max_size: int, policy: DropPolicy = DropPolicy.DROP_OLDEST):
self.max_size = max_size
self.policy = policy
self.queue = deque(maxlen=max_size if policy == DropPolicy.DROP_OLDEST else None)
self.dropped_count = 0
self.lock = threading.Lock()
def put(self, item: T) -> Optional[T]:
"""
Add item to queue.
Returns dropped item if any.
"""
with self.lock:
if self.policy == DropPolicy.DROP_OLDEST:
if len(self.queue) >= self.max_size:
dropped = self.queue.popleft()
self.dropped_count += 1
self.queue.append(item)
return dropped
self.queue.append(item)
return None
elif self.policy == DropPolicy.DROP_NEWEST:
if len(self.queue) >= self.max_size:
self.dropped_count += 1
return item # Drop incoming
self.queue.append(item)
return None
def get(self) -> Optional[T]:
with self.lock:
if self.queue:
return self.queue.popleft()
return None
# Example: Live metrics - old data less valuable
metrics_queue = DroppingQueue[dict](
max_size=10000,
policy=DropPolicy.DROP_OLDEST
)
def emit_metric(metric: dict):
dropped = metrics_queue.put(metric)
if dropped:
# Optionally log dropped data
logger.debug(f"Dropped old metric: {dropped['timestamp']}")3. Sampling
import random
import time
from dataclasses import dataclass
from typing import Callable
@dataclass
class SamplingConfig:
base_rate: float = 1.0 # 100% at low load
min_rate: float = 0.01 # Never below 1%
load_threshold: float = 0.8 # Start sampling above 80% load
class AdaptiveSampler:
"""
Reduce sampling rate based on load.
Good for metrics, logs, or analytics.
"""
def __init__(self, config: SamplingConfig):
self.config = config
self.current_rate = config.base_rate
self.queue_size = 0
self.max_queue_size = 10000
def update_load(self, queue_size: int, max_size: int):
"""Adjust sampling rate based on queue fill level"""
self.queue_size = queue_size
self.max_queue_size = max_size
load = queue_size / max_size
if load < self.config.load_threshold:
self.current_rate = self.config.base_rate
else:
# Linear reduction from base_rate to min_rate
excess_load = (load - self.config.load_threshold) / (1 - self.config.load_threshold)
self.current_rate = max(
self.config.min_rate,
self.config.base_rate * (1 - excess_load)
)
def should_sample(self) -> bool:
"""Probabilistically decide whether to sample this item"""
return random.random() < self.current_rate
def sample(self, item: T, process: Callable[[T], None]):
if self.should_sample():
process(item)
# Usage: Trace sampling under load
sampler = AdaptiveSampler(SamplingConfig(
base_rate=1.0, # Sample 100% normally
min_rate=0.001, # Sample 0.1% under extreme load
load_threshold=0.7
))
def process_trace(trace: dict):
if sampler.should_sample():
send_to_tracing_backend(trace)Adaptive Sampling:
Queue Load: 0% 50% 70% 90% 100%
│ │ │ │ │
Sample Rate: 100% 100% 50% 10% 1%
│ │ │ │ │
└────────┴────────┴────────┴────────┘
Normal │ Degraded Operation
Operation │4. Reactive Streams (Pull-Based)
from abc import ABC, abstractmethod
from typing import TypeVar, Generic
import asyncio
T = TypeVar('T')
class Publisher(ABC, Generic[T]):
"""Reactive streams publisher"""
@abstractmethod
def subscribe(self, subscriber: 'Subscriber[T]'):
pass
class Subscriber(ABC, Generic[T]):
"""Reactive streams subscriber"""
@abstractmethod
def on_subscribe(self, subscription: 'Subscription'):
pass
@abstractmethod
def on_next(self, item: T):
pass
@abstractmethod
def on_error(self, error: Exception):
pass
@abstractmethod
def on_complete(self):
pass
class Subscription(ABC):
"""Reactive streams subscription"""
@abstractmethod
def request(self, n: int):
"""Request n more items (pull-based backpressure)"""
pass
@abstractmethod
def cancel(self):
pass
# Implementation
class BufferedPublisher(Publisher[T]):
def __init__(self, source: asyncio.Queue):
self.source = source
self.subscribers = []
def subscribe(self, subscriber: Subscriber[T]):
subscription = BufferedSubscription(self.source, subscriber)
self.subscribers.append(subscription)
subscriber.on_subscribe(subscription)
class BufferedSubscription(Subscription):
def __init__(self, source: asyncio.Queue, subscriber: Subscriber):
self.source = source
self.subscriber = subscriber
self.requested = 0
self.cancelled = False
self._task = None
def request(self, n: int):
"""Subscriber requests n more items"""
self.requested += n
if self._task is None:
self._task = asyncio.create_task(self._emit_loop())
async def _emit_loop(self):
while not self.cancelled and self.requested > 0:
try:
# Only fetch when there's demand
item = await asyncio.wait_for(
self.source.get(),
timeout=1.0
)
self.requested -= 1
self.subscriber.on_next(item)
except asyncio.TimeoutError:
continue
except Exception as e:
self.subscriber.on_error(e)
break
self._task = None
def cancel(self):
self.cancelled = True
# Usage
class ProcessingSubscriber(Subscriber[dict]):
def __init__(self, batch_size: int = 10):
self.batch_size = batch_size
self.subscription = None
self.buffer = []
def on_subscribe(self, subscription: Subscription):
self.subscription = subscription
# Request initial batch
subscription.request(self.batch_size)
def on_next(self, item: dict):
self.buffer.append(item)
if len(self.buffer) >= self.batch_size:
self._process_batch()
# Request more only after processing
self.subscription.request(self.batch_size)
def _process_batch(self):
# Process items
for item in self.buffer:
process(item)
self.buffer.clear()Pull-Based Backpressure:
Subscriber: "Give me 10 items"
│
▼
Publisher: [1][2][3][4][5][6][7][8][9][10] ──────► Subscriber
│
│ Subscriber processes...
│
Subscriber: "Give me 10 more"
│
▼
Publisher: [11][12][13]... ──────► Subscriber
Key: Publisher only sends what subscriber requests
No buffer overflow possibleBackpressure in Message Queues
Kafka Consumer Backpressure
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
class BackpressuredKafkaConsumer:
"""
Kafka consumer with backpressure handling.
"""
def __init__(
self,
topic: str,
group_id: str,
max_poll_records: int = 500,
max_poll_interval_ms: int = 300000,
processing_threshold: float = 0.8
):
self.consumer = KafkaConsumer(
topic,
group_id=group_id,
# Limit records per poll (built-in backpressure)
max_poll_records=max_poll_records,
# Allow time for slow processing
max_poll_interval_ms=max_poll_interval_ms,
enable_auto_commit=False
)
self.processing_threshold = processing_threshold
self.current_batch_size = max_poll_records
self.min_batch_size = 10
def consume_with_backpressure(self, process_batch: callable):
while True:
start_time = time.time()
# Poll with current batch size
records = self.consumer.poll(
timeout_ms=1000,
max_records=self.current_batch_size
)
if not records:
continue
# Process records
for tp, messages in records.items():
for message in messages:
process_batch(message)
# Commit after processing
self.consumer.commit()
# Adjust batch size based on processing time
processing_time = time.time() - start_time
self._adjust_batch_size(processing_time, len(records))
def _adjust_batch_size(self, processing_time: float, record_count: int):
"""Adaptive batch sizing based on processing speed"""
# Target: process within 80% of max_poll_interval
target_time = (self.consumer.config['max_poll_interval_ms'] / 1000) * self.processing_threshold
if processing_time > target_time:
# Too slow - reduce batch size
self.current_batch_size = max(
self.min_batch_size,
int(self.current_batch_size * 0.8)
)
print(f"Reducing batch size to {self.current_batch_size}")
elif processing_time < target_time * 0.5 and record_count == self.current_batch_size:
# Fast processing - can increase batch size
self.current_batch_size = min(
self.consumer.config['max_poll_records'],
int(self.current_batch_size * 1.2)
)RabbitMQ Prefetch
import pika
from typing import Callable
class BackpressuredRabbitConsumer:
"""
RabbitMQ consumer using prefetch for backpressure.
"""
def __init__(
self,
queue_name: str,
prefetch_count: int = 10
):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
self.queue_name = queue_name
# Prefetch limits unacked messages
# This is RabbitMQ's backpressure mechanism
self.channel.basic_qos(prefetch_count=prefetch_count)
def consume(self, callback: Callable):
"""
Consume with backpressure.
Only receives prefetch_count messages at a time.
Must ack before receiving more.
"""
def on_message(channel, method, properties, body):
try:
callback(body)
# Ack releases slot for next message
channel.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# Nack and requeue on failure
channel.basic_nack(
delivery_tag=method.delivery_tag,
requeue=True
)
self.channel.basic_consume(
queue=self.queue_name,
on_message_callback=on_message
)
self.channel.start_consuming()
# Visualization
"""
Prefetch = 3:
RabbitMQ Queue: [1][2][3][4][5][6][7][8][9][10]...
│ │ │
└──┴──┴──► Consumer (processing 1,2,3)
Consumer acks message 1:
RabbitMQ Queue: [4][5][6][7][8][9][10]...
│ │ │
└──┴──┴──► Consumer (now has 2,3,4)
Always exactly prefetch_count in-flight
"""HTTP Backpressure
Server-Side Throttling
from flask import Flask, request, jsonify
from functools import wraps
import time
import threading
app = Flask(__name__)
class LoadBasedThrottler:
"""Throttle requests based on server load"""
def __init__(
self,
max_concurrent: int = 100,
queue_size: int = 500,
load_shed_threshold: float = 0.9
):
self.max_concurrent = max_concurrent
self.queue_size = queue_size
self.load_shed_threshold = load_shed_threshold
self.active_requests = 0
self.queued_requests = 0
self.lock = threading.Lock()
def acquire(self, timeout: float = 30.0) -> bool:
"""Try to acquire a request slot"""
deadline = time.time() + timeout
while time.time() < deadline:
with self.lock:
# Check if we should shed load
load = self.active_requests / self.max_concurrent
if load >= self.load_shed_threshold and self.queued_requests > 0:
return False # Shed load
if self.active_requests < self.max_concurrent:
self.active_requests += 1
return True
if self.queued_requests < self.queue_size:
self.queued_requests += 1
else:
return False # Queue full
time.sleep(0.01) # Wait for slot
with self.lock:
self.queued_requests -= 1
return False
def release(self):
"""Release a request slot"""
with self.lock:
self.active_requests -= 1
def get_stats(self) -> dict:
with self.lock:
return {
'active': self.active_requests,
'queued': self.queued_requests,
'max_concurrent': self.max_concurrent,
'load': self.active_requests / self.max_concurrent
}
throttler = LoadBasedThrottler()
def with_backpressure(f):
@wraps(f)
def wrapper(*args, **kwargs):
if not throttler.acquire(timeout=30.0):
return jsonify({
'error': 'Service overloaded',
'retry_after': 5
}), 503, {'Retry-After': '5'}
try:
return f(*args, **kwargs)
finally:
throttler.release()
return wrapper
@app.route('/api/heavy-operation', methods=['POST'])
@with_backpressure
def heavy_operation():
# Simulate heavy processing
time.sleep(2)
return jsonify({'status': 'success'})
@app.route('/health/load')
def load_health():
stats = throttler.get_stats()
status_code = 200 if stats['load'] < 0.8 else 503
return jsonify(stats), status_codeClient-Side Response
import requests
import time
from typing import Optional, Callable
import random
class BackpressureAwareClient:
"""HTTP client that respects server backpressure signals"""
def __init__(
self,
base_url: str,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0
):
self.base_url = base_url
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.session = requests.Session()
def request(
self,
method: str,
path: str,
**kwargs
) -> requests.Response:
"""Make request with exponential backoff on 429/503"""
for attempt in range(self.max_retries):
try:
response = self.session.request(
method,
f"{self.base_url}{path}",
**kwargs
)
if response.status_code == 429 or response.status_code == 503:
# Server is telling us to back off
retry_after = self._get_retry_after(response)
delay = retry_after or self._calculate_backoff(attempt)
print(f"Server backpressure: waiting {delay:.1f}s (attempt {attempt + 1})")
time.sleep(delay)
continue
return response
except requests.exceptions.ConnectionError:
# Server might be overloaded
delay = self._calculate_backoff(attempt)
time.sleep(delay)
raise MaxRetriesExceeded(f"Failed after {self.max_retries} attempts")
def _get_retry_after(self, response: requests.Response) -> Optional[float]:
"""Parse Retry-After header"""
retry_after = response.headers.get('Retry-After')
if retry_after:
try:
return float(retry_after)
except ValueError:
pass
return None
def _calculate_backoff(self, attempt: int) -> float:
"""Exponential backoff with jitter"""
delay = self.base_delay * (2 ** attempt)
delay = min(delay, self.max_delay)
# Add jitter to prevent thundering herd
jitter = random.uniform(0, delay * 0.1)
return delay + jitter
# Usage
client = BackpressureAwareClient("https://api.example.com")
response = client.request("POST", "/api/heavy-operation", json={"data": "..."})Streaming Backpressure
gRPC Streaming
import grpc
from concurrent import futures
import queue
import threading
class BackpressuredStreamService(StreamServiceServicer):
"""gRPC service with streaming backpressure"""
def BidirectionalStream(self, request_iterator, context):
"""
gRPC streaming with flow control.
Client controls consumption rate.
"""
buffer = queue.Queue(maxsize=100)
producer_done = threading.Event()
def produce():
try:
for request in request_iterator:
# Block if buffer is full (backpressure)
buffer.put(process_request(request), block=True)
finally:
producer_done.set()
# Start producer thread
producer = threading.Thread(target=produce)
producer.start()
# Yield responses as buffer has items
while not (producer_done.is_set() and buffer.empty()):
try:
response = buffer.get(timeout=0.1)
yield response
except queue.Empty:
continue
producer.join()
# Client-side flow control
def streaming_client_with_backpressure(stub):
"""Client that controls its consumption rate"""
def request_generator():
for i in range(1000):
yield Request(id=i)
# Client-side rate limiting
time.sleep(0.1) # Don't overwhelm server
# Process responses at our own pace
for response in stub.BidirectionalStream(request_generator()):
process_slowly(response) # Taking time is OK
# gRPC handles backpressure via HTTP/2 flow controlAsync Generators
import asyncio
from typing import AsyncIterator, TypeVar
T = TypeVar('T')
class BackpressuredPipeline:
"""Async pipeline with built-in backpressure"""
def __init__(self, buffer_size: int = 10):
self.buffer_size = buffer_size
async def produce(self) -> AsyncIterator[dict]:
"""Producer yields items, respecting consumer pace"""
for i in range(1000):
data = await fetch_data(i)
yield data
# Yielding allows consumer to control pace
async def transform(
self,
source: AsyncIterator[T],
buffer_size: int = None
) -> AsyncIterator[T]:
"""Transform stage with bounded buffer"""
buffer_size = buffer_size or self.buffer_size
buffer = asyncio.Queue(maxsize=buffer_size)
async def fill_buffer():
async for item in source:
# Blocks when buffer full (backpressure)
await buffer.put(item)
await buffer.put(None) # Sentinel
# Start filling buffer
filler = asyncio.create_task(fill_buffer())
while True:
item = await buffer.get()
if item is None:
break
transformed = await process(item)
yield transformed
await filler
async def consume(self, source: AsyncIterator[dict]):
"""Consumer pulls at its own pace"""
async for item in source:
await slow_operation(item)
# Usage
async def main():
pipeline = BackpressuredPipeline(buffer_size=10)
# Compose pipeline
source = pipeline.produce()
transformed = pipeline.transform(source)
# Consumer controls the pace
await pipeline.consume(transformed)Backpressure Metrics
from dataclasses import dataclass
from prometheus_client import Gauge, Counter, Histogram
import time
@dataclass
class BackpressureMetrics:
queue_size: Gauge
queue_capacity: Gauge
items_dropped: Counter
wait_time: Histogram
throughput: Gauge
class MonitoredQueue:
"""Queue with backpressure monitoring"""
def __init__(self, name: str, capacity: int):
self.name = name
self.capacity = capacity
self.queue = asyncio.Queue(maxsize=capacity)
# Metrics
self.metrics = BackpressureMetrics(
queue_size=Gauge(
f'{name}_queue_size',
'Current queue size'
),
queue_capacity=Gauge(
f'{name}_queue_capacity',
'Queue capacity'
),
items_dropped=Counter(
f'{name}_items_dropped_total',
'Items dropped due to backpressure'
),
wait_time=Histogram(
f'{name}_put_wait_seconds',
'Time waiting to put items',
buckets=[0.001, 0.01, 0.1, 1, 10, 60]
),
throughput=Gauge(
f'{name}_throughput_per_second',
'Current throughput'
)
)
self.metrics.queue_capacity.set(capacity)
self._last_count = 0
self._last_time = time.time()
async def put(self, item, timeout: float = None) -> bool:
start = time.time()
try:
if timeout:
await asyncio.wait_for(
self.queue.put(item),
timeout=timeout
)
else:
await self.queue.put(item)
wait_time = time.time() - start
self.metrics.wait_time.observe(wait_time)
self.metrics.queue_size.set(self.queue.qsize())
return True
except asyncio.TimeoutError:
self.metrics.items_dropped.inc()
return False
async def get(self):
item = await self.queue.get()
self.metrics.queue_size.set(self.queue.qsize())
self._update_throughput()
return item
def _update_throughput(self):
now = time.time()
elapsed = now - self._last_time
if elapsed >= 1.0:
throughput = (self.queue.qsize() - self._last_count) / elapsed
self.metrics.throughput.set(throughput)
self._last_count = self.queue.qsize()
self._last_time = nowKey Takeaways
Bounded queues are essential: Unbounded queues hide problems until OOM; bounded queues make backpressure explicit
Choose the right strategy: Blocking for correctness, dropping for availability, sampling for observability
Propagate backpressure: Don't absorb pressure at one layer—propagate it back to the source
Monitor queue depths: Queue size is your early warning signal for capacity issues
Prefer pull over push: Pull-based systems (reactive streams) make backpressure natural
Client cooperation: Clients should respect 429/503 responses and implement exponential backoff
Adaptive behavior: Adjust batch sizes, sampling rates, or concurrency based on observed load