Kafka: A Distributed Messaging System for Log Processing
Paper Overview
- Title: Kafka: a Distributed Messaging System for Log Processing
- Authors: Jay Kreps, Neha Narkhede, Jun Rao (LinkedIn)
- Published: NetDB Workshop 2011
- Context: LinkedIn needed high-throughput, low-latency log processing
TL;DR
Kafka is a distributed commit log that provides:
- High throughput through sequential disk I/O and batching
- Scalability via partitioned topics
- Durability through replication
- Simple consumer model with offset-based tracking
Problem Statement
Log Processing Challenges
┌─────────────────────────────────────────────────────────────────┐
│ LinkedIn's Requirements │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Activity Data: │
│ ┌─────────────────────────────────────────────┐ │
│ │ - Page views: billions per day │ │
│ │ - User actions: clicks, searches, etc. │ │
│ │ - System metrics: CPU, memory, latency │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ Use Cases: │
│ ┌─────────────────────────────────────────────┐ │
│ │ - Real-time analytics dashboards │ │
│ │ - Offline batch processing (Hadoop) │ │
│ │ - Search indexing │ │
│ │ - Recommendation systems │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ Existing Solutions Fall Short: │
│ ┌─────────────────────────────────────────────┐ │
│ │ - Traditional MQ: Too slow, not scalable │ │
│ │ - Log files: No real-time, hard to manage │ │
│ │ - Custom solutions: Complex, fragile │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘Kafka Architecture
Core Concepts
┌─────────────────────────────────────────────────────────────────┐
│ Kafka Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ TOPIC: Named feed of messages │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Topic "clicks" │ │
│ │ ┌────────────────────────────────────────────────────┐ │ │
│ │ │ Partition 0: [M0][M1][M2][M3][M4][M5]... │ │ │
│ │ └────────────────────────────────────────────────────┘ │ │
│ │ ┌────────────────────────────────────────────────────┐ │ │
│ │ │ Partition 1: [M0][M1][M2][M3][M4]... │ │ │
│ │ └────────────────────────────────────────────────────┘ │ │
│ │ ┌────────────────────────────────────────────────────┐ │ │
│ │ │ Partition 2: [M0][M1][M2][M3][M4][M5][M6]... │ │ │
│ │ └────────────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ PARTITION: Ordered, immutable sequence of messages │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Offset: 0 1 2 3 4 5 6 7 │ │
│ │ ┌────┬────┬────┬────┬────┬────┬────┬────┐ │ │
│ │ │ M0 │ M1 │ M2 │ M3 │ M4 │ M5 │ M6 │ M7 │ │ │
│ │ └────┴────┴────┴────┴────┴────┴────┴────┘ │ │
│ │ ▲ │ │
│ │ append-only │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ BROKER: Server that stores partitions │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Broker 1 Broker 2 Broker 3 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ P0, P3 │ │ P1, P4 │ │ P2, P5 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘Message Flow
┌─────────────────────────────────────────────────────────────────┐
│ Message Flow │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Producers Kafka Cluster Consumers│
│ ┌───────┐ ┌─────────────┐ ┌───────┐│
│ │ App 1 │──────────────────│ │────────────│ App A ││
│ └───────┘ │ │ └───────┘│
│ ┌───────┐ publish │ Broker │ consume ┌───────┐│
│ │ App 2 │──────────────────│ Cluster │────────────│ App B ││
│ └───────┘ │ │ └───────┘│
│ ┌───────┐ │ │ ┌───────┐│
│ │ App 3 │──────────────────│ │────────────│ App C ││
│ └───────┘ └─────────────┘ └───────┘│
│ │ │
│ │ │
│ ┌─────┴─────┐ │
│ │ ZooKeeper │ │
│ │ (metadata)│ │
│ └───────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘Log-Based Storage
Append-Only Log
python
class KafkaLog:
"""Kafka's log-based storage implementation."""
def __init__(self, log_dir: str, segment_size: int = 1024 * 1024 * 1024):
self.log_dir = log_dir
self.segment_size = segment_size # 1GB default
self.segments = [] # List of log segments
self.active_segment = None
self.next_offset = 0
def append(self, messages: list) -> list:
"""
Append messages to log.
Returns list of offsets for appended messages.
Key insight: Sequential writes are FAST on disk.
"""
offsets = []
for message in messages:
# Check if we need a new segment
if self._need_new_segment():
self._roll_segment()
# Write message to active segment
offset = self.next_offset
self.active_segment.append(offset, message)
offsets.append(offset)
self.next_offset += 1
return offsets
def read(self, start_offset: int, max_bytes: int) -> list:
"""
Read messages starting from offset.
Returns messages up to max_bytes.
"""
messages = []
bytes_read = 0
# Find segment containing start_offset
segment = self._find_segment(start_offset)
while segment and bytes_read < max_bytes:
# Read from segment
segment_messages = segment.read(
start_offset,
max_bytes - bytes_read
)
for msg in segment_messages:
messages.append(msg)
bytes_read += len(msg.value)
start_offset = msg.offset + 1
# Move to next segment
segment = self._next_segment(segment)
return messages
def _need_new_segment(self) -> bool:
"""Check if active segment is full."""
if not self.active_segment:
return True
return self.active_segment.size >= self.segment_size
def _roll_segment(self):
"""Create new segment, close old one."""
if self.active_segment:
self.active_segment.close()
self.segments.append(self.active_segment)
self.active_segment = LogSegment(
base_offset=self.next_offset,
path=f"{self.log_dir}/{self.next_offset}.log"
)
class LogSegment:
"""Individual log segment file."""
def __init__(self, base_offset: int, path: str):
self.base_offset = base_offset
self.path = path
self.file = open(path, 'ab+')
self.index = SparseIndex() # Offset -> file position
self.size = 0
def append(self, offset: int, message: bytes):
"""Append message to segment file."""
# Write message with header
position = self.file.tell()
record = self._encode_record(offset, message)
self.file.write(record)
self.size += len(record)
# Update sparse index (every N messages)
if offset % 4096 == 0:
self.index.add(offset, position)
def read(self, offset: int, max_bytes: int) -> list:
"""Read messages from offset."""
# Use index to find approximate position
position = self.index.lookup(offset)
self.file.seek(position)
messages = []
bytes_read = 0
while bytes_read < max_bytes:
record = self._read_record()
if record is None:
break
if record.offset >= offset:
messages.append(record)
bytes_read += len(record.value)
return messagesEfficient I/O
python
class EfficientIO:
"""Kafka's I/O optimizations."""
def __init__(self):
self.page_cache = {} # OS page cache simulation
def zero_copy_send(self, socket, file_path: str,
offset: int, length: int):
"""
Zero-copy transfer from file to socket.
Traditional:
1. Read file -> kernel buffer
2. Copy kernel buffer -> user buffer
3. Copy user buffer -> socket buffer
4. Send socket buffer -> NIC
Zero-copy (sendfile):
1. Read file -> kernel buffer
2. Send kernel buffer -> NIC
Eliminates 2 copies and context switches!
"""
import os
# os.sendfile(socket.fileno(), file.fileno(), offset, length)
# This is a system call that does zero-copy
pass
def batched_compression(self, messages: list) -> bytes:
"""
Compress multiple messages together.
Better compression ratio than individual messages.
"""
import gzip
# Batch messages into single payload
batch = b''.join(
self._encode_message(m) for m in messages
)
# Compress entire batch
compressed = gzip.compress(batch)
return compressed
def page_cache_friendly_writes(self):
"""
Kafka leverages OS page cache.
1. Write to memory-mapped file
2. OS flushes to disk asynchronously
3. Reads served from page cache
Result: Near-memory speed for recent data
"""
passProducer
Publishing Messages
python
class KafkaProducer:
"""Kafka producer implementation."""
def __init__(self, bootstrap_servers: list):
self.brokers = bootstrap_servers
self.metadata = self._fetch_metadata()
self.batch_size = 16384 # 16KB
self.linger_ms = 5 # Wait up to 5ms for more messages
self.batches = defaultdict(list) # Topic-partition -> messages
def send(self, topic: str, key: bytes, value: bytes,
partition: int = None) -> Future:
"""
Send message to topic.
Returns Future that resolves when message is acknowledged.
"""
# Determine partition
if partition is None:
partition = self._partition(topic, key)
# Create produce record
record = ProducerRecord(
topic=topic,
partition=partition,
key=key,
value=value,
timestamp=time.time_ns()
)
# Add to batch
tp = TopicPartition(topic, partition)
future = Future()
self.batches[tp].append((record, future))
# Check if batch is full
if self._batch_full(tp):
self._send_batch(tp)
return future
def _partition(self, topic: str, key: bytes) -> int:
"""
Determine partition for message.
If key is provided: hash(key) % num_partitions
If no key: round-robin
"""
num_partitions = self.metadata.partitions_for_topic(topic)
if key:
# Murmur2 hash for consistent partitioning
hash_value = murmur2(key)
return hash_value % num_partitions
else:
# Round-robin for key-less messages
return self._next_partition(topic)
def _send_batch(self, tp: TopicPartition):
"""Send accumulated batch to broker."""
batch = self.batches[tp]
if not batch:
return
# Find leader for partition
leader = self.metadata.leader_for(tp)
# Create produce request
records = [record for record, _ in batch]
request = ProduceRequest(
topic=tp.topic,
partition=tp.partition,
records=records,
acks='all', # Wait for all replicas
timeout_ms=30000
)
try:
# Send to leader
response = self._send_request(leader, request)
# Complete futures
for i, (_, future) in enumerate(batch):
offset = response.base_offset + i
future.set_result(RecordMetadata(
topic=tp.topic,
partition=tp.partition,
offset=offset
))
except Exception as e:
for _, future in batch:
future.set_exception(e)
finally:
self.batches[tp] = []
class Partitioner:
"""Custom partitioning strategies."""
def partition_by_key(self, key: bytes, num_partitions: int) -> int:
"""Hash-based partitioning for ordering by key."""
return murmur2(key) % num_partitions
def partition_round_robin(self, num_partitions: int) -> int:
"""Round-robin for load balancing."""
self.counter = getattr(self, 'counter', 0)
partition = self.counter % num_partitions
self.counter += 1
return partition
def partition_by_custom_logic(self, record, num_partitions: int) -> int:
"""Custom logic based on message content."""
# Example: partition by user region
region = record.headers.get('region')
region_to_partition = {'us': 0, 'eu': 1, 'asia': 2}
return region_to_partition.get(region, 0)Consumer
Consumer Groups
┌─────────────────────────────────────────────────────────────────┐
│ Consumer Groups │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Topic "orders" with 4 partitions │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ P0 P1 P2 P3 │ │
│ └──┬───────────┬───────────┬───────────┬─────────────────┘ │
│ │ │ │ │ │
│ │ │ │ │ │
│ Consumer Group A │
│ (3 consumers) │
│ ┌─────────────────────────────────────────────┐ │
│ │ Consumer A1 Consumer A2 Consumer A3 │ │
│ │ (P0, P1) (P2) (P3) │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ Consumer Group B │
│ (2 consumers) │
│ ┌────────────────────────────────────┐ │
│ │ Consumer B1 Consumer B2 │ │
│ │ (P0, P1) (P2, P3) │ │
│ └────────────────────────────────────┘ │
│ │
│ Key Points: │
│ - Each partition assigned to exactly one consumer in group │
│ - Consumer can handle multiple partitions │
│ - Different groups receive all messages independently │
│ │
└─────────────────────────────────────────────────────────────────┘Consumer Implementation
python
class KafkaConsumer:
"""Kafka consumer implementation."""
def __init__(self, group_id: str, bootstrap_servers: list):
self.group_id = group_id
self.brokers = bootstrap_servers
self.subscriptions = set()
self.assignments = {} # TopicPartition -> offset
self.coordinator = None
def subscribe(self, topics: list):
"""Subscribe to topics."""
self.subscriptions.update(topics)
self._join_group()
def poll(self, timeout_ms: int = 1000) -> list:
"""
Poll for new messages.
Returns list of ConsumerRecords.
"""
records = []
for tp, offset in self.assignments.items():
# Fetch from broker
leader = self._leader_for(tp)
fetch_response = self._fetch(
leader, tp, offset, max_bytes=1048576
)
for record in fetch_response.records:
records.append(ConsumerRecord(
topic=tp.topic,
partition=tp.partition,
offset=record.offset,
key=record.key,
value=record.value,
timestamp=record.timestamp
))
# Update local offset
self.assignments[tp] = record.offset + 1
return records
def commit(self, offsets: dict = None):
"""
Commit offsets to Kafka.
Offsets are stored in internal __consumer_offsets topic.
"""
if offsets is None:
offsets = self.assignments.copy()
# Send commit request to coordinator
request = OffsetCommitRequest(
group_id=self.group_id,
offsets=offsets
)
self.coordinator.commit_offsets(request)
def _join_group(self):
"""Join consumer group and get partition assignments."""
# Find group coordinator
self.coordinator = self._find_coordinator()
# Join group
join_response = self.coordinator.join_group(
group_id=self.group_id,
member_id='',
protocol_type='consumer',
protocols=[('range', self.subscriptions)]
)
if join_response.is_leader:
# Leader assigns partitions
assignments = self._assign_partitions(
join_response.members,
self.subscriptions
)
self.coordinator.sync_group(assignments)
else:
# Follower receives assignments
sync_response = self.coordinator.sync_group({})
self.assignments = sync_response.assignments
class PartitionAssignor:
"""Partition assignment strategies."""
def range_assign(self, consumers: list,
partitions: list) -> dict:
"""
Range assignment: consecutive partitions to each consumer.
Good for: Joining data across topics with same partitioning
"""
assignments = defaultdict(list)
partitions = sorted(partitions)
num_consumers = len(consumers)
partitions_per_consumer = len(partitions) // num_consumers
extra = len(partitions) % num_consumers
partition_idx = 0
for i, consumer in enumerate(sorted(consumers)):
count = partitions_per_consumer + (1 if i < extra else 0)
for _ in range(count):
assignments[consumer].append(partitions[partition_idx])
partition_idx += 1
return assignments
def round_robin_assign(self, consumers: list,
partitions: list) -> dict:
"""
Round-robin: distribute partitions evenly.
Good for: Load balancing when topics have different sizes
"""
assignments = defaultdict(list)
consumers = sorted(consumers)
for i, partition in enumerate(sorted(partitions)):
consumer = consumers[i % len(consumers)]
assignments[consumer].append(partition)
return assignmentsOffset Management
python
class OffsetManager:
"""Manage consumer offsets."""
def __init__(self):
self.committed_offsets = {} # Stored in __consumer_offsets
self.pending_offsets = {}
def auto_commit(self, assignments: dict, interval_ms: int = 5000):
"""
Automatic offset commit.
Risk: May lose messages on crash between process and commit
"""
while True:
time.sleep(interval_ms / 1000)
self.commit(assignments)
def manual_commit_sync(self, offsets: dict):
"""
Synchronous manual commit.
Use case: At-least-once processing
1. Poll messages
2. Process messages
3. Commit offsets
If crash between 2-3, messages reprocessed (at-least-once)
"""
self._commit_to_kafka(offsets)
def manual_commit_async(self, offsets: dict, callback):
"""
Asynchronous commit with callback.
Higher throughput but harder error handling.
"""
future = self._commit_to_kafka_async(offsets)
future.add_callback(callback)
def reset_offset(self, tp: TopicPartition, strategy: str):
"""
Reset offset when no committed offset exists.
Strategies:
- 'earliest': Start from beginning
- 'latest': Start from end (new messages only)
"""
if strategy == 'earliest':
return self._get_earliest_offset(tp)
elif strategy == 'latest':
return self._get_latest_offset(tp)Replication
Leader-Follower Replication
┌─────────────────────────────────────────────────────────────────┐
│ Partition Replication │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Partition 0 (Replication Factor = 3) │
│ │
│ Broker 1 Broker 2 Broker 3 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ LEADER │ │ FOLLOWER │ │ FOLLOWER │ │
│ │ │ │ │ │ │ │
│ │ [0][1][2] │───────>│ [0][1][2] │ │ [0][1][2] │ │
│ │ [3][4][5] │ │ [3][4][5] │<───────│ [3][4] │ │
│ │ [6][7] │ │ [6] │ │ │ │
│ │ ▲ │ │ │ │ │ │
│ └──────┼──────┘ └─────────────┘ └─────────────┘ │
│ │ │
│ Producers write │
│ to leader only │
│ │
│ ISR (In-Sync Replicas): {Broker 1, Broker 2} │
│ - Broker 3 is behind, not in ISR │
│ │
└─────────────────────────────────────────────────────────────────┘Replication Protocol
python
class ReplicationManager:
"""Kafka replication management."""
def __init__(self, broker_id: int):
self.broker_id = broker_id
self.replicas = {} # partition -> ReplicaState
self.isr = {} # partition -> set of in-sync replicas
def handle_produce(self, partition: int, records: list,
acks: str) -> ProduceResponse:
"""
Handle produce request on leader.
acks options:
- '0': Fire and forget (fastest, may lose data)
- '1': Wait for leader ack (balanced)
- 'all'/-1: Wait for all ISR acks (safest)
"""
# Append to local log
local_log = self.replicas[partition].log
base_offset = local_log.append(records)
if acks == '0':
# No wait
return ProduceResponse(base_offset=base_offset)
if acks == '1':
# Just leader
return ProduceResponse(base_offset=base_offset)
if acks == 'all' or acks == '-1':
# Wait for ISR
high_watermark = self._wait_for_isr(
partition, base_offset + len(records)
)
return ProduceResponse(
base_offset=base_offset,
high_watermark=high_watermark
)
def _wait_for_isr(self, partition: int, target_offset: int) -> int:
"""Wait until all ISR replicas have caught up."""
while True:
all_caught_up = True
for replica_id in self.isr[partition]:
if replica_id == self.broker_id:
continue
replica_offset = self._get_replica_offset(
partition, replica_id
)
if replica_offset < target_offset:
all_caught_up = False
break
if all_caught_up:
return target_offset
time.sleep(0.001) # 1ms
def fetch_from_leader(self, partition: int,
fetch_offset: int) -> list:
"""Follower fetches from leader."""
leader = self._find_leader(partition)
response = leader.fetch(
partition=partition,
offset=fetch_offset,
max_bytes=1048576
)
# Append to local log
self.replicas[partition].log.append(response.records)
return response.records
class HighWatermark:
"""
High watermark: offset up to which all ISR replicas have received.
Consumers can only read up to high watermark.
This ensures consumers don't read uncommitted data.
"""
def __init__(self):
self.hwm = 0 # High watermark offset
self.leo = 0 # Log end offset
def update_hwm(self, isr_offsets: dict):
"""
Update high watermark based on ISR offsets.
HWM = min(offsets of all ISR replicas)
"""
if isr_offsets:
self.hwm = min(isr_offsets.values())
def can_consumer_read(self, offset: int) -> bool:
"""Consumer can only read up to HWM."""
return offset < self.hwmZooKeeper Integration
Metadata Management
python
class ZooKeeperMetadata:
"""Kafka's use of ZooKeeper (pre-KRaft)."""
def __init__(self, zk_client):
self.zk = zk_client
def get_broker_list(self) -> list:
"""Get list of live brokers."""
# /brokers/ids/[0,1,2,...]
broker_ids = self.zk.get_children('/brokers/ids')
brokers = []
for bid in broker_ids:
data = self.zk.get(f'/brokers/ids/{bid}')
brokers.append(json.loads(data))
return brokers
def get_topic_partitions(self, topic: str) -> dict:
"""Get partition info for topic."""
# /brokers/topics/{topic}
data = self.zk.get(f'/brokers/topics/{topic}')
return json.loads(data)
def get_partition_leader(self, topic: str,
partition: int) -> int:
"""Get current leader for partition."""
# /brokers/topics/{topic}/partitions/{partition}/state
path = f'/brokers/topics/{topic}/partitions/{partition}/state'
data = self.zk.get(path)
state = json.loads(data)
return state['leader']
def register_broker(self, broker_id: int, host: str, port: int):
"""Register broker on startup."""
# Ephemeral node - deleted when broker disconnects
path = f'/brokers/ids/{broker_id}'
data = json.dumps({
'host': host,
'port': port,
'timestamp': time.time()
})
self.zk.create(path, data, ephemeral=True)
def elect_controller(self, broker_id: int) -> bool:
"""
Controller election via ZooKeeper.
First broker to create /controller wins.
"""
try:
self.zk.create('/controller',
str(broker_id),
ephemeral=True)
return True
except NodeExistsError:
return FalsePerformance Optimizations
Batching and Compression
python
class PerformanceOptimizations:
"""Kafka performance techniques."""
def producer_batching(self):
"""
Batch multiple messages into single request.
Benefits:
- Fewer network round trips
- Better compression ratio
- More efficient disk writes
"""
batch = []
batch_size = 0
max_batch_size = 16384 # 16KB
while batch_size < max_batch_size:
msg = self.queue.get(timeout=0.005) # linger.ms
if msg:
batch.append(msg)
batch_size += len(msg)
# Send entire batch as one request
self.send_batch(batch)
def compression_comparison(self, messages: list):
"""
Compression codec comparison.
GZIP: Best ratio, highest CPU
Snappy: Fast, moderate ratio
LZ4: Fastest, good ratio
ZSTD: Best balance of ratio and speed
"""
import gzip
import snappy
import lz4.frame
import zstd
data = b''.join(messages)
results = {
'gzip': gzip.compress(data),
'snappy': snappy.compress(data),
'lz4': lz4.frame.compress(data),
'zstd': zstd.compress(data)
}
for codec, compressed in results.items():
ratio = len(compressed) / len(data)
print(f"{codec}: {ratio:.2%} of original")
def sequential_io(self):
"""
Why Kafka is fast: Sequential I/O.
Random I/O: ~100 ops/sec (seek time)
Sequential I/O: ~100 MB/sec (no seek)
Kafka only appends, never modifies.
This enables sustained high throughput.
"""
passExactly-Once Semantics
Idempotent Producer
python
class IdempotentProducer:
"""
Exactly-once producer (Kafka 0.11+).
Guarantees each message written exactly once,
even with retries.
"""
def __init__(self):
self.producer_id = None
self.sequence_numbers = {} # partition -> sequence
def initialize(self):
"""Get producer ID from broker."""
response = self.broker.init_producer_id()
self.producer_id = response.producer_id
def send_idempotent(self, topic: str, partition: int,
record: bytes) -> int:
"""
Send with idempotency.
Broker tracks (producer_id, partition, sequence).
Duplicates are detected and deduplicated.
"""
tp = TopicPartition(topic, partition)
# Get next sequence number
if tp not in self.sequence_numbers:
self.sequence_numbers[tp] = 0
sequence = self.sequence_numbers[tp]
# Send with PID and sequence
request = ProduceRequest(
producer_id=self.producer_id,
sequence=sequence,
records=[record]
)
response = self.broker.produce(request)
if response.error == 'DUPLICATE_SEQUENCE':
# Already written, safe to ignore
pass
else:
self.sequence_numbers[tp] = sequence + 1
return response.offset
class TransactionalProducer:
"""
Transactional producer for atomic writes.
Enables exactly-once across multiple partitions.
"""
def __init__(self, transactional_id: str):
self.transactional_id = transactional_id
self.producer_id = None
self.epoch = 0
def begin_transaction(self):
"""Begin new transaction."""
self.broker.begin_transaction(
transactional_id=self.transactional_id,
producer_id=self.producer_id,
epoch=self.epoch
)
def send(self, topic: str, partition: int, record: bytes):
"""Send within transaction."""
self.broker.produce_transactional(
transactional_id=self.transactional_id,
topic=topic,
partition=partition,
record=record
)
def commit_transaction(self):
"""Commit transaction atomically."""
self.broker.commit_transaction(
transactional_id=self.transactional_id,
producer_id=self.producer_id,
epoch=self.epoch
)
def abort_transaction(self):
"""Abort transaction, discard all writes."""
self.broker.abort_transaction(
transactional_id=self.transactional_id,
producer_id=self.producer_id,
epoch=self.epoch
)Key Results
Production Performance
┌─────────────────────────────────────────────────────────────────┐
│ Kafka Performance │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Throughput (per broker): │
│ ┌─────────────────────────────────────────────┐ │
│ │ Producer: 200,000+ messages/sec │ │
│ │ Consumer: 400,000+ messages/sec │ │
│ │ Aggregate: 2 million+ msg/sec (cluster) │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ Latency: │
│ ┌─────────────────────────────────────────────┐ │
│ │ Produce (acks=1): 2-5ms │ │
│ │ Produce (acks=all): 5-15ms │ │
│ │ Consume: 1-2ms │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ Storage Efficiency: │
│ ┌─────────────────────────────────────────────┐ │
│ │ With compression: 5-10x reduction │ │
│ │ Sequential writes: ~600 MB/sec per disk │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ At LinkedIn (2011): │
│ ┌─────────────────────────────────────────────┐ │
│ │ 10+ billion messages per day │ │
│ │ 1+ TB of data per day │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘Influence and Legacy
Impact on Industry
- Log-centric architecture: Made append-only logs mainstream
- Stream processing: Enabled Kafka Streams, ksqlDB
- Event sourcing: Foundation for event-driven systems
- Microservices: Standard for inter-service communication
Evolution
┌──────────────────────────────────────────────────────────────┐
│ Kafka Evolution │
├──────────────────────────────────────────────────────────────┤
│ │
│ 2011: Original Paper │
│ - Basic pub/sub │
│ - Simple consumer model │
│ │
│ 2015: Kafka 0.9 │
│ - New consumer API │
│ - Security (SSL, SASL) │
│ │
│ 2017: Kafka 0.11 │
│ - Exactly-once semantics │
│ - Idempotent producer │
│ - Transactions │
│ │
│ 2022: Kafka 3.3 (KRaft) │
│ - Remove ZooKeeper dependency │
│ - Self-managed metadata │
│ - Simplified operations │
│ │
└──────────────────────────────────────────────────────────────┘Key Takeaways
- Sequential I/O is fast: Append-only enables high throughput
- Batch everything: Messages, compression, network I/O
- Simple consumer model: Offset-based is elegant and efficient
- Partitioning for scale: Horizontal scaling via partitions
- Replication for durability: ISR ensures no data loss
- Consumer groups for parallelism: Easy to scale consumption
- Log as truth: All data in the log, everything else derived