Kafka: A Distributed Messaging System for Log Processing
注: この記事は英語の原文を日本語に翻訳したものです。コードブロック、Mermaidダイアグラム、論文タイトル、システム名、技術用語は原文のまま保持しています。
論文概要
- タイトル: Kafka: a Distributed Messaging System for Log Processing
- 著者: Jay Kreps, Neha Narkhede, Jun Rao (LinkedIn)
- 発表: NetDB Workshop 2011
- 背景: LinkedInは高スループット・低レイテンシのログ処理を必要としていました
TL;DR
Kafkaは分散コミットログで、以下を提供します:
- シーケンシャルディスクI/Oとバッチングによる高スループット
- パーティション化されたトピックによるスケーラビリティ
- レプリケーションによる耐久性
- オフセットベースの追跡によるシンプルなコンシューマモデル
課題
ログ処理の課題
┌─────────────────────────────────────────────────────────────────┐
│ LinkedInの要件 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ アクティビティデータ: │
│ ┌─────────────────────────────────────────────┐ │
│ │ - ページビュー: 1日あたり数十億 │ │
│ │ - ユーザーアクション: クリック、検索など │ │
│ │ - システムメトリクス: CPU、メモリ、レイテンシ│ │
│ └─────────────────────────────────────────────┘ │
│ │
│ ユースケース: │
│ ┌─────────────────────────────────────────────┐ │
│ │ - リアルタイム分析ダッシュボード │ │
│ │ - オフラインバッチ処理(Hadoop) │ │
│ │ - 検索インデックス作成 │ │
│ │ - レコメンデーションシステム │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 既存ソリューションの限界: │
│ ┌─────────────────────────────────────────────┐ │
│ │ - 従来のMQ: 遅すぎる、スケーラブルでない │ │
│ │ - ログファイル: リアルタイム性なし、管理困難│ │
│ │ - カスタムソリューション: 複雑で脆弱 │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘Kafkaアーキテクチャ
コアコンセプト
┌─────────────────────────────────────────────────────────────────┐
│ Kafkaアーキテクチャ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ TOPIC: メッセージの名前付きフィード │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Topic "clicks" │ │
│ │ ┌────────────────────────────────────────────────────┐ │ │
│ │ │ Partition 0: [M0][M1][M2][M3][M4][M5]... │ │ │
│ │ └────────────────────────────────────────────────────┘ │ │
│ │ ┌────────────────────────────────────────────────────┐ │ │
│ │ │ Partition 1: [M0][M1][M2][M3][M4]... │ │ │
│ │ └────────────────────────────────────────────────────┘ │ │
│ │ ┌────────────────────────────────────────────────────┐ │ │
│ │ │ Partition 2: [M0][M1][M2][M3][M4][M5][M6]... │ │ │
│ │ └────────────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ PARTITION: 順序付けされた不変のメッセージシーケンス │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Offset: 0 1 2 3 4 5 6 7 │ │
│ │ ┌────┬────┬────┬────┬────┬────┬────┬────┐ │ │
│ │ │ M0 │ M1 │ M2 │ M3 │ M4 │ M5 │ M6 │ M7 │ │ │
│ │ └────┴────┴────┴────┴────┴────┴────┴────┘ │ │
│ │ ▲ │ │
│ │ 追記のみ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ BROKER: パーティションを格納するサーバー │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Broker 1 Broker 2 Broker 3 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ P0, P3 │ │ P1, P4 │ │ P2, P5 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘メッセージフロー
ログベースストレージ
追記専用ログ
properties
# server.properties — ログベースストレージのブローカー主要設定
#
# Kafkaのストレージはディスク上の追記専用コミットログです。
# 重要な知見: シーケンシャル書き込みは高速です(ディスクあたり~600 MB/s)。
# ── セグメントとリテンション ──────────────────────────────────────────
# 各パーティションはこのサイズのセグメントファイルに分割されます(デフォルト1 GB)。
# アクティブセグメントがこの制限に達すると、Kafkaは新しいセグメントに切り替えます。
log.segment.bytes=1073741824
# 削除前のデータ保持期間(デフォルト7日)。
log.retention.hours=168
# 代替: パーティションの合計サイズがこれを超えたときにセグメントを削除。
# log.retention.bytes=-1 # -1 = 無制限
# ログクリーナーが削除対象のセグメントをチェックする頻度。
log.retention.check.interval.ms=300000
# ── フラッシュポリシー(通常はOSページキャッシュに委任) ────────────────
# ディスクへの強制フラッシュ前のメッセージ数。
# デフォルト: 最良のスループットのためOSページキャッシュに依存。
# log.flush.interval.messages=10000
# log.flush.interval.ms=1000
# ── ディレクトリ ──────────────────────────────────────────────────
# ログデータのカンマ区切りディレクトリリスト。
# 複数ディスクに分散するとスループットが向上します。
log.dirs=/var/kafka-logsshell
# パーティションのディスク上のログ構造を検査
ls -l /var/kafka-logs/clicks-0/
# 出力例:
# 00000000000000000000.index <- スパースオフセットインデックス
# 00000000000000000000.log <- セグメントファイル(メッセージ)
# 00000000000000000000.timeindex
# 00000000000052428800.index <- 次のセグメント(オフセット52428800でロール)
# 00000000000052428800.log
# セグメントファイルからメッセージをダンプ
kafka-dump-log.sh \
--files /var/kafka-logs/clicks-0/00000000000000000000.log \
--print-data-log効率的なI/O
Kafka I/O最適化
ゼロコピー転送(sendfileシステムコール):
従来のパス: ゼロコピーパス:
1. ファイル → カーネルバッファ 1. ファイル → カーネルバッファ
2. カーネルバッファ → ユーザーバッファ 2. カーネルバッファ → NIC(直接)
3. ユーザーバッファ → ソケットバッファ
4. ソケットバッファ → NIC 2回のコピーと
2回のコンテキストスイッチを排除!
バッチ圧縮:
メッセージは個別ではなくバッチで圧縮されます。
類似メッセージが冗長なバイトパターンを共有するため、
はるかに良い圧縮率が得られます。
ページキャッシュフレンドリーな書き込み:
1. プロデューサーがメモリマップされたセグメントファイルに追記。
2. OSページキャッシュが書き込みを吸収し、非同期でフラッシュ。
3. コンシューマーが最新データをページキャッシュから直接読み取り。
結果: 最新(末尾)データに対してメモリに近い速度。プロデューサー
メッセージの発行
shell
# ── トピック作成 ────────────────────────────────────────────────
kafka-topics.sh --bootstrap-server localhost:9092 \
--create \
--topic clicks \
--partitions 6 \
--replication-factor 3
# トピック設定の確認
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic clicks
# ── メッセージの生成 ────────────────────────────────────────────
# インタラクティブプロデューサー(キーセパレータ付きkey:value)
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic clicks \
--property parse.key=true \
--property key.separator=:
# > user-123:{"page":"/home","ts":1700000000}
# > user-456:{"page":"/cart","ts":1700000001}
# ファイルからの生成
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic clicks < clickstream.jsonl
# ── パーティショニング動作 ───────────────────────────────────────
# キーが提供された場合:
# partition = murmur2(key) % num_partitions
# → 同じキーのメッセージは常に同じパーティションに格納
# (キー単位の順序を保証)。
#
# キーが提供されない場合:
# デフォルトパーティショナーはスティッキーラウンドロビン(バッチ対応)
# を使用して均等な負荷分散を行います。
#
# カスタムパーティショニングはJavaプロデューサーで設定:
# props.put("partitioner.class",
# "com.example.RegionPartitioner");コンシューマー
コンシューマーグループ
┌─────────────────────────────────────────────────────────────────┐
│ コンシューマーグループ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Topic "orders"(4パーティション) │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ P0 P1 P2 P3 │ │
│ └──┬───────────┬───────────┬───────────┬─────────────────┘ │
│ │ │ │ │ │
│ │ │ │ │ │
│ Consumer Group A │
│ (3コンシューマー) │
│ ┌─────────────────────────────────────────────┐ │
│ │ Consumer A1 Consumer A2 Consumer A3 │ │
│ │ (P0, P1) (P2) (P3) │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ Consumer Group B │
│ (2コンシューマー) │
│ ┌────────────────────────────────────┐ │
│ │ Consumer B1 Consumer B2 │ │
│ │ (P0, P1) (P2, P3) │ │
│ └────────────────────────────────────┘ │
│ │
│ 重要なポイント: │
│ - 各パーティションはグループ内の1つのコンシューマーに割り当て │
│ - コンシューマーは複数パーティションを処理可能 │
│ - 異なるグループは全メッセージを独立して受信 │
│ │
└─────────────────────────────────────────────────────────────────┘コンシューマー実装
java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
/**
* Kafka consumer with manual offset commit (at-least-once).
*
* Partition assignment strategies (set via partition.assignment.strategy):
* - RangeAssignor: consecutive partitions per consumer
* (good for co-partitioned joins)
* - RoundRobinAssignor: even spread across consumers
* - CooperativeStickyAssignor: incremental rebalance, minimal partition moves
*/
public class ClickConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "click-analytics");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Use cooperative rebalancing to avoid stop-the-world pauses
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(List.of("clicks"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("partition=%d offset=%d key=%s value=%s%n",
record.partition(), record.offset(),
record.key(), record.value());
// ... process record ...
}
// Manual synchronous commit after processing
// If crash between process and commit → messages reprocessed (at-least-once)
consumer.commitSync();
}
}
}
}shell
# ── コンソールコンシューマーでの簡易消費 ──────────────────────
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic clicks \
--group click-analytics \
--from-beginning \
--property print.key=true \
--property print.timestamp=true
# ── コンシューマーグループ管理 ────────────────────────────────
# 全コンシューマーグループの一覧
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# グループの詳細: パーティション割り当て、ラグ、現在のオフセットを確認
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group click-analytics
# オフセットを最初にリセット(グループは非アクティブである必要あり)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group click-analytics \
--topic clicks \
--reset-offsets --to-earliest --execute
# 特定のタイムスタンプにオフセットをリセット
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group click-analytics \
--topic clicks \
--reset-offsets --to-datetime 2024-01-01T00:00:00.000 --executeオフセット管理
java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
/**
* Offset management strategies.
*
* Offsets are stored in the internal __consumer_offsets topic.
*/
public class OffsetManagementExamples {
/** Auto-commit: simplest but may lose messages on crash. */
static Properties autoCommitConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
// Risk: crash between poll and next auto-commit → message loss
return props;
}
/** Manual sync commit: at-least-once guarantee. */
static void manualSyncCommit(KafkaConsumer<String, String> consumer) {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000)); // 1. poll
for (ConsumerRecord<String, String> r : records) {
process(r); // 2. process
}
consumer.commitSync(); // 3. commit
// If crash between 2 and 3 → messages reprocessed (at-least-once)
}
}
/** Manual async commit: higher throughput, harder error handling. */
static void manualAsyncCommit(KafkaConsumer<String, String> consumer) {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> r : records) {
process(r);
}
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.err.println("Commit failed: " + exception.getMessage());
}
});
}
}
/**
* auto.offset.reset strategies (when no committed offset exists):
* "earliest" — start from the beginning of the partition
* "latest" — start from the end (new messages only)
*/
static void resetOffsetConfig(Properties props, String strategy) {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, strategy);
}
private static void process(ConsumerRecord<String, String> r) { /* ... */ }
}レプリケーション
Leader-Followerレプリケーション
┌─────────────────────────────────────────────────────────────────┐
│ パーティションレプリケーション │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Partition 0(レプリケーションファクター = 3) │
│ │
│ Broker 1 Broker 2 Broker 3 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ LEADER │ │ FOLLOWER │ │ FOLLOWER │ │
│ │ │ │ │ │ │ │
│ │ [0][1][2] │───────>│ [0][1][2] │ │ [0][1][2] │ │
│ │ [3][4][5] │ │ [3][4][5] │<───────│ [3][4] │ │
│ │ [6][7] │ │ [6] │ │ │ │
│ │ ▲ │ │ │ │ │ │
│ └──────┼──────┘ └─────────────┘ └─────────────┘ │
│ │ │
│ プロデューサーは │
│ Leaderにのみ書き込み │
│ │
│ ISR(同期レプリカ): {Broker 1, Broker 2} │
│ - Broker 3は遅れており、ISRに含まれない │
│ │
└─────────────────────────────────────────────────────────────────┘レプリケーションプロトコル
properties
# server.properties — レプリケーションと耐久性の設定
#
# acks動作(プロデューサー側で設定):
# acks=0 Fire and forget(最速、データ損失の可能性あり)
# acks=1 Leaderの書き込みのみを待機(バランス型)
# acks=all 全同期レプリカの応答を待機(最も安全)
# ── レプリケーションファクター(トピックレベルのデフォルト)─────────────────────
default.replication.factor=3
# ── 同期レプリカ(ISR)制御 ──────────────────────────────────
# acks=allでの生成が成功する前に応答が必要な最小レプリカ数。
# replication.factor=3で2に設定すると、書き込みをブロックせずに
# 1つのブローカー障害を許容できます。
min.insync.replicas=2
# FollowerがISRから除外されるまでの許容遅延。
replica.lag.time.max.ms=30000
# ── アンクリーンリーダー選出 ─────────────────────────────────
# trueの場合、同期していないレプリカがLeaderになれます(データ損失のリスク)。
# 強い耐久性のためにはfalseを維持。
unclean.leader.election.enable=falseHigh Watermark (HWM)
Leader Follower-1 Follower-2
[0–7] [0–7] [0–5] ← Log End Offset (LEO)
▲
HWM = min(全ISRレプリカのLEO) = 5
- コンシューマーはHWMまでしか読み取れません(offset < 5)。
- これにより、コンシューマーがコミットされていないデータを読まないことが保証されます。
- Follower-2は遅れており、replica.lag.time.max.msを超えると
ISRから除外されます。shell
# トピックのISRとLeader割り当てを検査
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic clicks
# 出力例:
# Topic: clicks Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2
# トピックレベルでmin.insync.replicasを変更
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter --entity-type topics --entity-name clicks \
--add-config min.insync.replicas=2ZooKeeper連携
メタデータ管理
shell
# ── ZooKeeperベースのメタデータ(KRaft以前、Kafka < 3.3)───────────
# 登録済みブローカーの一覧
zookeeper-shell.sh localhost:2181 ls /brokers/ids
# ブローカー詳細の取得
zookeeper-shell.sh localhost:2181 get /brokers/ids/0
# パーティション状態の取得(Leader、ISR)
zookeeper-shell.sh localhost:2181 \
get /brokers/topics/clicks/partitions/0/state
# 現在のコントローラーの取得
zookeeper-shell.sh localhost:2181 get /controller
# ── KRaftモード(Kafka 3.3+、ZooKeeper不要)────────────────────
# クラスターIDの生成
kafka-storage.sh random-uuid
# KRaft用のストレージディレクトリをフォーマット
kafka-storage.sh format \
--config server.properties \
--cluster-id <generated-uuid>
# クラスターメタデータの確認(KRaft)
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log \
--cluster-id <cluster-id>
# KRaftモードでのブローカー一覧
kafka-broker-api-versions.sh --bootstrap-server localhost:9092パフォーマンス最適化
バッチングと圧縮
shell
# ── プロデューサーバッチングと圧縮のチューニング ───────────────────
# プロデューサープロパティとして設定(またはコマンドラインでオーバーライド)。
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic clicks \
--producer-property batch.size=16384 \
--producer-property linger.ms=5 \
--producer-property compression.type=lz4
# バッチングの利点:
# - ネットワークラウンドトリップの削減
# - より良い圧縮率(類似メッセージがパターンを共有)
# - より効率的なシーケンシャルディスク書き込み
# ── 圧縮コーデック比較 ─────────────────────────────────────────
# コーデック| 圧縮率 | CPU負荷 | メモ
# ---------|-----------|----------|------------------------------
# gzip | 最良 | 最高 | コールド/アーカイブデータに最適
# snappy | 中程度 | 低 | 汎用デフォルトとして良好
# lz4 | 良好 | 最低 | レイテンシ重視に最適
# zstd | 非常に良好| 中程度 | 圧縮率と速度の最良バランス
# ── Kafkaが高速な理由: シーケンシャルI/O ────────────────────────
# ランダムI/O: ~100 ops/sec(ディスクシーク時間が支配的)
# シーケンシャルI/O: ~600 MB/sec(シークなし、フルディスク帯域幅)
#
# Kafkaは追記のみ — 既存データを変更しません。
# これにより、コモディティハードウェアでの持続的な高スループットが可能になります。Exactly-Onceセマンティクス
冪等プロデューサー
java
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* Idempotent producer (Kafka 0.11+).
*
* The broker tracks (producer_id, partition, sequence_number).
* Duplicate records from retries are detected and deduplicated automatically.
* Setting enable.idempotence=true is the only change needed.
*/
public class IdempotentProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// Enable idempotent writes (implied by enable.idempotence=true):
// acks=all, retries=Integer.MAX_VALUE, max.in.flight.requests.per.connection<=5
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record =
new ProducerRecord<>("clicks", "user-123", "{\"page\":\"/home\"}");
// The broker deduplicates any retried send with the same sequence number.
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Sent to partition=%d offset=%d%n",
metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
}
}
}java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.*;
import java.time.Duration;
import java.util.*;
/**
* Transactional producer for atomic writes across multiple partitions.
*
* Enables exactly-once semantics in consume-transform-produce patterns.
*/
public class TransactionalProducerExample {
public static void main(String[] args) {
// ── Producer with transactions ──────────────────────────
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-1");
// enable.idempotence is automatically true when transactional.id is set
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
producer.initTransactions();
// ── Consumer (read-committed isolation) ─────────────────
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(List.of("orders"));
// ── Consume-transform-produce loop ──────────────────────
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
if (records.isEmpty()) continue;
producer.beginTransaction();
try {
for (ConsumerRecord<String, String> r : records) {
// Transform and produce to output topic
String enriched = enrich(r.value());
producer.send(new ProducerRecord<>(
"enriched-orders", r.key(), enriched));
}
// Commit consumer offsets within the same transaction
producer.sendOffsetsToTransaction(
currentOffsets(records), consumer.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
}
}
private static String enrich(String value) { return value; /* ... */ }
private static Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata>
currentOffsets(ConsumerRecords<String, String> records) {
Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
records.partitions().forEach(tp ->
offsets.put(tp, new OffsetAndMetadata(
records.records(tp).get(records.records(tp).size() - 1).offset() + 1)));
return offsets;
}
}主要な結果
本番パフォーマンス
┌─────────────────────────────────────────────────────────────────┐
│ Kafkaパフォーマンス │
├─────────────────────────────────────────────────────────────────┤
│ │
│ スループット(ブローカーあたり): │
│ ┌─────────────────────────────────────────────┐ │
│ │ プロデューサー: 200,000+メッセージ/秒 │ │
│ │ コンシューマー: 400,000+メッセージ/秒 │ │
│ │ 合計: 2,000,000+メッセージ/秒(クラスタ) │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ レイテンシ: │
│ ┌─────────────────────────────────────────────┐ │
│ │ 生成(acks=1): 2-5ms │ │
│ │ 生成(acks=all): 5-15ms │ │
│ │ 消費: 1-2ms │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ ストレージ効率: │
│ ┌─────────────────────────────────────────────┐ │
│ │ 圧縮あり: 5-10倍の削減 │ │
│ │ シーケンシャル書き込み: ディスクあたり~600 MB/秒│ │
│ └─────────────────────────────────────────────┘ │
│ │
│ LinkedInでの実績(2011年): │
│ ┌─────────────────────────────────────────────┐ │
│ │ 1日あたり100億以上のメッセージ │ │
│ │ 1日あたり1TB以上のデータ │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘影響とレガシー
業界への影響
- ログ中心アーキテクチャ: 追記専用ログを主流にしました
- ストリーム処理: Kafka Streams、ksqlDBを実現しました
- イベントソーシング: イベント駆動システムの基盤です
- マイクロサービス: サービス間通信の標準です
進化
┌──────────────────────────────────────────────────────────────┐
│ Kafkaの進化 │
├──────────────────────────────────────────────────────────────┤
│ │
│ 2011: 初期論文 │
│ - 基本的なpub/sub │
│ - シンプルなコンシューマモデル │
│ │
│ 2015: Kafka 0.9 │
│ - 新コンシューマAPI │
│ - セキュリティ(SSL、SASL) │
│ │
│ 2017: Kafka 0.11 │
│ - Exactly-onceセマンティクス │
│ - 冪等プロデューサー │
│ - トランザクション │
│ │
│ 2022: Kafka 3.3 (KRaft) │
│ - ZooKeeper依存性の除去 │
│ - 自己管理メタデータ │
│ - 運用の簡素化 │
│ │
└──────────────────────────────────────────────────────────────┘重要なポイント
- シーケンシャルI/Oは高速: 追記専用が高スループットを実現します
- すべてをバッチ化: メッセージ、圧縮、ネットワークI/O
- シンプルなコンシューマモデル: オフセットベースはエレガントで効率的です
- スケールのためのパーティショニング: パーティションによる水平スケーリング
- 耐久性のためのレプリケーション: ISRがデータ損失を防止します
- 並列性のためのコンシューマーグループ: 消費のスケーリングが容易です
- ログが真実: すべてのデータはログにあり、その他はすべて派生物です