バックプレッシャー
注: この文書は英語版からの翻訳です。コードブロックおよびMermaidダイアグラムは原文のまま保持しています。
TL;DR
バックプレッシャーは、ダウンストリームコンポーネントが受信データに追いつけない場合に、アップストリームコンポーネントに速度を落とすよう信号を送るフロー制御メカニズムです。無制限にバッファリングする(メモリ枯渇につながる)代わりに、またはデータを暗黙的にドロップする代わりに、バックプレッシャーはシステム全体に負荷情報を伝播するフィードバックループを作成し、プロデューサーがレートを調整できるようにします。
なぜバックプレッシャーが必要なのか?
バックプレッシャーなしの場合:
バックプレッシャーありの場合:
メモリは制限内に収まり、システムは安定を維持します。
バックプレッシャー戦略
1. ブロッキング(同期)
python
import queue
import threading
from typing import TypeVar, Generic
T = TypeVar('T')
class BlockingQueue(Generic[T]):
"""
Bounded queue that blocks producers when full.
Simplest form of backpressure.
"""
def __init__(self, max_size: int):
self.queue = queue.Queue(maxsize=max_size)
def put(self, item: T, timeout: float = None) -> bool:
"""
Block until space available.
Returns False if timeout expires.
"""
try:
self.queue.put(item, block=True, timeout=timeout)
return True
except queue.Full:
return False
def get(self, timeout: float = None) -> T:
"""Block until item available."""
return self.queue.get(block=True, timeout=timeout)
@property
def size(self) -> int:
return self.queue.qsize()
@property
def is_full(self) -> bool:
return self.queue.full()
# Usage
buffer = BlockingQueue[dict](max_size=1000)
def producer():
while True:
data = fetch_data()
# Producer blocks here when queue is full
# This is backpressure in action
buffer.put(data)
def consumer():
while True:
data = buffer.get()
process_slowly(data) # Takes timeBlocking Backpressure Timeline:
Time ─────────────────────────────────────────────────────────►
Producer: [produce][produce][produce][BLOCKED........][produce]
▲
│
Queue: [--][███][█████████████████████████][████████]
│
Queue full!
│
Consumer: [---][consume][consume][consume][consume][consume]2. ドロッピング(損失許容)
python
from enum import Enum
from typing import Optional
from collections import deque
class DropPolicy(Enum):
DROP_OLDEST = "drop_oldest" # Drop front of queue
DROP_NEWEST = "drop_newest" # Drop incoming item
DROP_RANDOM = "drop_random" # Drop random item
class DroppingQueue(Generic[T]):
"""
Bounded queue that drops items when full.
Good when recent data is more important.
"""
def __init__(self, max_size: int, policy: DropPolicy = DropPolicy.DROP_OLDEST):
self.max_size = max_size
self.policy = policy
self.queue = deque(maxlen=max_size if policy == DropPolicy.DROP_OLDEST else None)
self.dropped_count = 0
self.lock = threading.Lock()
def put(self, item: T) -> Optional[T]:
"""
Add item to queue.
Returns dropped item if any.
"""
with self.lock:
if self.policy == DropPolicy.DROP_OLDEST:
if len(self.queue) >= self.max_size:
dropped = self.queue.popleft()
self.dropped_count += 1
self.queue.append(item)
return dropped
self.queue.append(item)
return None
elif self.policy == DropPolicy.DROP_NEWEST:
if len(self.queue) >= self.max_size:
self.dropped_count += 1
return item # Drop incoming
self.queue.append(item)
return None
def get(self) -> Optional[T]:
with self.lock:
if self.queue:
return self.queue.popleft()
return None
# Example: Live metrics - old data less valuable
metrics_queue = DroppingQueue[dict](
max_size=10000,
policy=DropPolicy.DROP_OLDEST
)
def emit_metric(metric: dict):
dropped = metrics_queue.put(metric)
if dropped:
# Optionally log dropped data
logger.debug(f"Dropped old metric: {dropped['timestamp']}")3. サンプリング
python
import random
import time
from dataclasses import dataclass
from typing import Callable
@dataclass
class SamplingConfig:
base_rate: float = 1.0 # 100% at low load
min_rate: float = 0.01 # Never below 1%
load_threshold: float = 0.8 # Start sampling above 80% load
class AdaptiveSampler:
"""
Reduce sampling rate based on load.
Good for metrics, logs, or analytics.
"""
def __init__(self, config: SamplingConfig):
self.config = config
self.current_rate = config.base_rate
self.queue_size = 0
self.max_queue_size = 10000
def update_load(self, queue_size: int, max_size: int):
"""Adjust sampling rate based on queue fill level"""
self.queue_size = queue_size
self.max_queue_size = max_size
load = queue_size / max_size
if load < self.config.load_threshold:
self.current_rate = self.config.base_rate
else:
# Linear reduction from base_rate to min_rate
excess_load = (load - self.config.load_threshold) / (1 - self.config.load_threshold)
self.current_rate = max(
self.config.min_rate,
self.config.base_rate * (1 - excess_load)
)
def should_sample(self) -> bool:
"""Probabilistically decide whether to sample this item"""
return random.random() < self.current_rate
def sample(self, item: T, process: Callable[[T], None]):
if self.should_sample():
process(item)
# Usage: Trace sampling under load
sampler = AdaptiveSampler(SamplingConfig(
base_rate=1.0, # Sample 100% normally
min_rate=0.001, # Sample 0.1% under extreme load
load_threshold=0.7
))
def process_trace(trace: dict):
if sampler.should_sample():
send_to_tracing_backend(trace)Adaptive Sampling:
Queue Load: 0% 50% 70% 90% 100%
│ │ │ │ │
Sample Rate: 100% 100% 50% 10% 1%
│ │ │ │ │
└────────┴────────┴────────┴────────┘
Normal │ Degraded Operation
Operation │4. リアクティブストリーム(プルベース)
python
from abc import ABC, abstractmethod
from typing import TypeVar, Generic
import asyncio
T = TypeVar('T')
class Publisher(ABC, Generic[T]):
"""Reactive streams publisher"""
@abstractmethod
def subscribe(self, subscriber: 'Subscriber[T]'):
pass
class Subscriber(ABC, Generic[T]):
"""Reactive streams subscriber"""
@abstractmethod
def on_subscribe(self, subscription: 'Subscription'):
pass
@abstractmethod
def on_next(self, item: T):
pass
@abstractmethod
def on_error(self, error: Exception):
pass
@abstractmethod
def on_complete(self):
pass
class Subscription(ABC):
"""Reactive streams subscription"""
@abstractmethod
def request(self, n: int):
"""Request n more items (pull-based backpressure)"""
pass
@abstractmethod
def cancel(self):
pass
# Implementation
class BufferedPublisher(Publisher[T]):
def __init__(self, source: asyncio.Queue):
self.source = source
self.subscribers = []
def subscribe(self, subscriber: Subscriber[T]):
subscription = BufferedSubscription(self.source, subscriber)
self.subscribers.append(subscription)
subscriber.on_subscribe(subscription)
class BufferedSubscription(Subscription):
def __init__(self, source: asyncio.Queue, subscriber: Subscriber):
self.source = source
self.subscriber = subscriber
self.requested = 0
self.cancelled = False
self._task = None
def request(self, n: int):
"""Subscriber requests n more items"""
self.requested += n
if self._task is None:
self._task = asyncio.create_task(self._emit_loop())
async def _emit_loop(self):
while not self.cancelled and self.requested > 0:
try:
# Only fetch when there's demand
item = await asyncio.wait_for(
self.source.get(),
timeout=1.0
)
self.requested -= 1
self.subscriber.on_next(item)
except asyncio.TimeoutError:
continue
except Exception as e:
self.subscriber.on_error(e)
break
self._task = None
def cancel(self):
self.cancelled = True
# Usage
class ProcessingSubscriber(Subscriber[dict]):
def __init__(self, batch_size: int = 10):
self.batch_size = batch_size
self.subscription = None
self.buffer = []
def on_subscribe(self, subscription: Subscription):
self.subscription = subscription
# Request initial batch
subscription.request(self.batch_size)
def on_next(self, item: dict):
self.buffer.append(item)
if len(self.buffer) >= self.batch_size:
self._process_batch()
# Request more only after processing
self.subscription.request(self.batch_size)
def _process_batch(self):
# Process items
for item in self.buffer:
process(item)
self.buffer.clear()メッセージキューにおけるバックプレッシャー
Kafka コンシューマーのバックプレッシャー
python
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
class BackpressuredKafkaConsumer:
"""
Kafka consumer with backpressure handling.
"""
def __init__(
self,
topic: str,
group_id: str,
max_poll_records: int = 500,
max_poll_interval_ms: int = 300000,
processing_threshold: float = 0.8
):
self.consumer = KafkaConsumer(
topic,
group_id=group_id,
# Limit records per poll (built-in backpressure)
max_poll_records=max_poll_records,
# Allow time for slow processing
max_poll_interval_ms=max_poll_interval_ms,
enable_auto_commit=False
)
self.processing_threshold = processing_threshold
self.current_batch_size = max_poll_records
self.min_batch_size = 10
def consume_with_backpressure(self, process_batch: callable):
while True:
start_time = time.time()
# Poll with current batch size
records = self.consumer.poll(
timeout_ms=1000,
max_records=self.current_batch_size
)
if not records:
continue
# Process records
for tp, messages in records.items():
for message in messages:
process_batch(message)
# Commit after processing
self.consumer.commit()
# Adjust batch size based on processing time
processing_time = time.time() - start_time
self._adjust_batch_size(processing_time, len(records))
def _adjust_batch_size(self, processing_time: float, record_count: int):
"""Adaptive batch sizing based on processing speed"""
# Target: process within 80% of max_poll_interval
target_time = (self.consumer.config['max_poll_interval_ms'] / 1000) * self.processing_threshold
if processing_time > target_time:
# Too slow - reduce batch size
self.current_batch_size = max(
self.min_batch_size,
int(self.current_batch_size * 0.8)
)
print(f"Reducing batch size to {self.current_batch_size}")
elif processing_time < target_time * 0.5 and record_count == self.current_batch_size:
# Fast processing - can increase batch size
self.current_batch_size = min(
self.consumer.config['max_poll_records'],
int(self.current_batch_size * 1.2)
)RabbitMQ プリフェッチ
python
import pika
from typing import Callable
class BackpressuredRabbitConsumer:
"""
RabbitMQ consumer using prefetch for backpressure.
"""
def __init__(
self,
queue_name: str,
prefetch_count: int = 10
):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
self.queue_name = queue_name
# Prefetch limits unacked messages
# This is RabbitMQ's backpressure mechanism
self.channel.basic_qos(prefetch_count=prefetch_count)
def consume(self, callback: Callable):
"""
Consume with backpressure.
Only receives prefetch_count messages at a time.
Must ack before receiving more.
"""
def on_message(channel, method, properties, body):
try:
callback(body)
# Ack releases slot for next message
channel.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# Nack and requeue on failure
channel.basic_nack(
delivery_tag=method.delivery_tag,
requeue=True
)
self.channel.basic_consume(
queue=self.queue_name,
on_message_callback=on_message
)
self.channel.start_consuming()
# Visualization
"""
Prefetch = 3:
RabbitMQ Queue: [1][2][3][4][5][6][7][8][9][10]...
│ │ │
└──┴──┴──► Consumer (processing 1,2,3)
Consumer acks message 1:
RabbitMQ Queue: [4][5][6][7][8][9][10]...
│ │ │
└──┴──┴──► Consumer (now has 2,3,4)
Always exactly prefetch_count in-flight
"""HTTPバックプレッシャー
サーバーサイドスロットリング
python
from flask import Flask, request, jsonify
from functools import wraps
import time
import threading
app = Flask(__name__)
class LoadBasedThrottler:
"""Throttle requests based on server load"""
def __init__(
self,
max_concurrent: int = 100,
queue_size: int = 500,
load_shed_threshold: float = 0.9
):
self.max_concurrent = max_concurrent
self.queue_size = queue_size
self.load_shed_threshold = load_shed_threshold
self.active_requests = 0
self.queued_requests = 0
self.lock = threading.Lock()
def acquire(self, timeout: float = 30.0) -> bool:
"""Try to acquire a request slot"""
deadline = time.time() + timeout
while time.time() < deadline:
with self.lock:
# Check if we should shed load
load = self.active_requests / self.max_concurrent
if load >= self.load_shed_threshold and self.queued_requests > 0:
return False # Shed load
if self.active_requests < self.max_concurrent:
self.active_requests += 1
return True
if self.queued_requests < self.queue_size:
self.queued_requests += 1
else:
return False # Queue full
time.sleep(0.01) # Wait for slot
with self.lock:
self.queued_requests -= 1
return False
def release(self):
"""Release a request slot"""
with self.lock:
self.active_requests -= 1
def get_stats(self) -> dict:
with self.lock:
return {
'active': self.active_requests,
'queued': self.queued_requests,
'max_concurrent': self.max_concurrent,
'load': self.active_requests / self.max_concurrent
}
throttler = LoadBasedThrottler()
def with_backpressure(f):
@wraps(f)
def wrapper(*args, **kwargs):
if not throttler.acquire(timeout=30.0):
return jsonify({
'error': 'Service overloaded',
'retry_after': 5
}), 503, {'Retry-After': '5'}
try:
return f(*args, **kwargs)
finally:
throttler.release()
return wrapper
@app.route('/api/heavy-operation', methods=['POST'])
@with_backpressure
def heavy_operation():
# Simulate heavy processing
time.sleep(2)
return jsonify({'status': 'success'})
@app.route('/health/load')
def load_health():
stats = throttler.get_stats()
status_code = 200 if stats['load'] < 0.8 else 503
return jsonify(stats), status_codeクライアントサイドレスポンス
python
import requests
import time
from typing import Optional, Callable
import random
class BackpressureAwareClient:
"""HTTP client that respects server backpressure signals"""
def __init__(
self,
base_url: str,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0
):
self.base_url = base_url
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.session = requests.Session()
def request(
self,
method: str,
path: str,
**kwargs
) -> requests.Response:
"""Make request with exponential backoff on 429/503"""
for attempt in range(self.max_retries):
try:
response = self.session.request(
method,
f"{self.base_url}{path}",
**kwargs
)
if response.status_code == 429 or response.status_code == 503:
# Server is telling us to back off
retry_after = self._get_retry_after(response)
delay = retry_after or self._calculate_backoff(attempt)
print(f"Server backpressure: waiting {delay:.1f}s (attempt {attempt + 1})")
time.sleep(delay)
continue
return response
except requests.exceptions.ConnectionError:
# Server might be overloaded
delay = self._calculate_backoff(attempt)
time.sleep(delay)
raise MaxRetriesExceeded(f"Failed after {self.max_retries} attempts")
def _get_retry_after(self, response: requests.Response) -> Optional[float]:
"""Parse Retry-After header"""
retry_after = response.headers.get('Retry-After')
if retry_after:
try:
return float(retry_after)
except ValueError:
pass
return None
def _calculate_backoff(self, attempt: int) -> float:
"""Exponential backoff with jitter"""
delay = self.base_delay * (2 ** attempt)
delay = min(delay, self.max_delay)
# Add jitter to prevent thundering herd
jitter = random.uniform(0, delay * 0.1)
return delay + jitter
# Usage
client = BackpressureAwareClient("https://api.example.com")
response = client.request("POST", "/api/heavy-operation", json={"data": "..."})ストリーミングバックプレッシャー
gRPC ストリーミング
python
import grpc
from concurrent import futures
import queue
import threading
class BackpressuredStreamService(StreamServiceServicer):
"""gRPC service with streaming backpressure"""
def BidirectionalStream(self, request_iterator, context):
"""
gRPC streaming with flow control.
Client controls consumption rate.
"""
buffer = queue.Queue(maxsize=100)
producer_done = threading.Event()
def produce():
try:
for request in request_iterator:
# Block if buffer is full (backpressure)
buffer.put(process_request(request), block=True)
finally:
producer_done.set()
# Start producer thread
producer = threading.Thread(target=produce)
producer.start()
# Yield responses as buffer has items
while not (producer_done.is_set() and buffer.empty()):
try:
response = buffer.get(timeout=0.1)
yield response
except queue.Empty:
continue
producer.join()
# Client-side flow control
def streaming_client_with_backpressure(stub):
"""Client that controls its consumption rate"""
def request_generator():
for i in range(1000):
yield Request(id=i)
# Client-side rate limiting
time.sleep(0.1) # Don't overwhelm server
# Process responses at our own pace
for response in stub.BidirectionalStream(request_generator()):
process_slowly(response) # Taking time is OK
# gRPC handles backpressure via HTTP/2 flow control非同期ジェネレーター
python
import asyncio
from typing import AsyncIterator, TypeVar
T = TypeVar('T')
class BackpressuredPipeline:
"""Async pipeline with built-in backpressure"""
def __init__(self, buffer_size: int = 10):
self.buffer_size = buffer_size
async def produce(self) -> AsyncIterator[dict]:
"""Producer yields items, respecting consumer pace"""
for i in range(1000):
data = await fetch_data(i)
yield data
# Yielding allows consumer to control pace
async def transform(
self,
source: AsyncIterator[T],
buffer_size: int = None
) -> AsyncIterator[T]:
"""Transform stage with bounded buffer"""
buffer_size = buffer_size or self.buffer_size
buffer = asyncio.Queue(maxsize=buffer_size)
async def fill_buffer():
async for item in source:
# Blocks when buffer full (backpressure)
await buffer.put(item)
await buffer.put(None) # Sentinel
# Start filling buffer
filler = asyncio.create_task(fill_buffer())
while True:
item = await buffer.get()
if item is None:
break
transformed = await process(item)
yield transformed
await filler
async def consume(self, source: AsyncIterator[dict]):
"""Consumer pulls at its own pace"""
async for item in source:
await slow_operation(item)
# Usage
async def main():
pipeline = BackpressuredPipeline(buffer_size=10)
# Compose pipeline
source = pipeline.produce()
transformed = pipeline.transform(source)
# Consumer controls the pace
await pipeline.consume(transformed)バックプレッシャーメトリクス
python
from dataclasses import dataclass
from prometheus_client import Gauge, Counter, Histogram
import time
@dataclass
class BackpressureMetrics:
queue_size: Gauge
queue_capacity: Gauge
items_dropped: Counter
wait_time: Histogram
throughput: Gauge
class MonitoredQueue:
"""Queue with backpressure monitoring"""
def __init__(self, name: str, capacity: int):
self.name = name
self.capacity = capacity
self.queue = asyncio.Queue(maxsize=capacity)
# Metrics
self.metrics = BackpressureMetrics(
queue_size=Gauge(
f'{name}_queue_size',
'Current queue size'
),
queue_capacity=Gauge(
f'{name}_queue_capacity',
'Queue capacity'
),
items_dropped=Counter(
f'{name}_items_dropped_total',
'Items dropped due to backpressure'
),
wait_time=Histogram(
f'{name}_put_wait_seconds',
'Time waiting to put items',
buckets=[0.001, 0.01, 0.1, 1, 10, 60]
),
throughput=Gauge(
f'{name}_throughput_per_second',
'Current throughput'
)
)
self.metrics.queue_capacity.set(capacity)
self._last_count = 0
self._last_time = time.time()
async def put(self, item, timeout: float = None) -> bool:
start = time.time()
try:
if timeout:
await asyncio.wait_for(
self.queue.put(item),
timeout=timeout
)
else:
await self.queue.put(item)
wait_time = time.time() - start
self.metrics.wait_time.observe(wait_time)
self.metrics.queue_size.set(self.queue.qsize())
return True
except asyncio.TimeoutError:
self.metrics.items_dropped.inc()
return False
async def get(self):
item = await self.queue.get()
self.metrics.queue_size.set(self.queue.qsize())
self._update_throughput()
return item
def _update_throughput(self):
now = time.time()
elapsed = now - self._last_time
if elapsed >= 1.0:
throughput = (self.queue.qsize() - self._last_count) / elapsed
self.metrics.throughput.set(throughput)
self._last_count = self.queue.qsize()
self._last_time = now重要なポイント
制限付きキューは不可欠: 無制限キューはOOMまで問題を隠蔽します。制限付きキューはバックプレッシャーを明示的にします
適切な戦略を選択する: 正確性にはブロッキング、可用性にはドロッピング、可観測性にはサンプリングを使用してください
バックプレッシャーを伝播する: 一つのレイヤーで圧力を吸収するのではなく、ソースまで伝播してください
キューの深さを監視する: キューサイズはキャパシティの問題に対する早期警告信号です
プッシュよりプルを優先する: プルベースシステム(リアクティブストリーム)はバックプレッシャーを自然なものにします
クライアントの協力: クライアントは429/503レスポンスを尊重し、指数バックオフを実装すべきです
適応的な動作: 観測された負荷に基づいて、バッチサイズ、サンプリングレート、並行性を調整してください