メッセージの順序保証
注記: この記事は英語版
/05-messaging/03-message-ordering.mdの日本語翻訳です。
TL;DR
メッセージの順序保証は、メッセージが送信された順序で配信されるかどうかを決定します。選択肢は順序保証なし(最高パフォーマンス)から全体順序保証(最低パフォーマンス)まであります。ほとんどのシステムはパーティションベースの順序保証を使用します。同じキーのメッセージは順序が保証され、異なるキーは入り混じる可能性があります。ビジネス要件に基づいて選択してください。真の全体順序保証はめったに必要なく、コストが高いです。
なぜ順序が重要か
問題
ユーザーアクション(この順序で送信):
1. アカウント作成
2. プロフィール更新
3. アカウント削除
順序が入れ替わって配信された場合:
削除が最初に到着 → 「アカウントが見つかりません」エラー
更新が到着 → 孤立したデータが作成される
作成が到着 → アカウントが再び存在する
結果: 状態の破損1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
順序が重要な場合
重要:
- 金融取引(入金が引き落としの前に)
- ステートマシン遷移
- ログ集約
- レプリケーション
あまり重要でない:
- アナリティクスイベント(後で並べ替え可能)
- 通知(わずかな順序入れ替えは許容)
- 独立した操作1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
順序保証のレベル
順序保証なし
メッセージは任意の順序で到着する可能性がある
プロデューサーが送信: A, B, C
コンシューマーが受信: C, A, B(任意の順列)
利点:
- 最大スループット
- 容易なスケーリング
- 調整が不要
使用する場合:
- 操作が独立している
- コンシューマーが任意の順序を処理できる1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
プロデューサー内FIFO
各プロデューサーのメッセージは順序通りに到着する
異なるプロデューサーは入り混じる可能性がある
プロデューサー1: A1, B1, C1 → 順序通りに到着
プロデューサー2: A2, B2, C2 → 順序通りに到着
ただし全体として: A1, A2, B1, C2, B2, C1(入り混じり)
使用する場合:
- 同一ソースからのイベントが順序付けされる必要がある
- 異なるソースは独立している1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
パーティション/キー内FIFO
同じキーのメッセージは順序が保証される
異なるキーは入り混じる可能性がある
Key=user1: login, update, logout → 順序保証
Key=user2: login, purchase → 順序保証
ただし: user1.login, user2.login, user1.update...(入り混じり)
最も一般的なアプローチ
Kafka、SQS FIFOがこれを使用1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
全体順序保証
すべてのメッセージが厳密なグローバル順序
送信: A, B, C, D, E
受信: A, B, C, D, E(正確に)
必要条件:
- 単一パーティション/キュー
- または分散合意
コストが高く、スループットを制限する
真に必要なケースはまれ1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
Kafkaの順序保証
パーティションベース
3パーティションのトピック:
Partition 0: [A, D, G]
Partition 1: [B, E, H]
Partition 2: [C, F, I]
パーティション内: 厳密な順序保証
パーティション間: 順序保証なし
プロデューサー:
- Key = null: パーティションへのラウンドロビン
- Key = "user123": 一貫したパーティションへのハッシュ1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
コンシューマーグループ
Consumer Group A:
Consumer 1 ← Partition 0
Consumer 2 ← Partition 1
Consumer 3 ← Partition 2
各パーティションは1つのコンシューマーが処理する
パーティション内の順序が保持される
コンシューマーが失敗した場合:
パーティションが再割り当てされる
最後にコミットされたオフセットから継続する1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
順序保証
python
# Producer: Same key = same partition = ordered
producer.send(topic='events', key='user123', value=event1)
producer.send(topic='events', key='user123', value=event2)
# event1 always before event2 for user123
# Consumer: Process in order
for message in consumer:
process(message)
consumer.commit() # Commit offset1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
SQS FIFOの順序保証
Message Group ID
python
# Messages with same group ID are ordered
sqs.send_message(
QueueUrl=queue_url,
MessageBody='{"action": "create"}',
MessageGroupId='user-123',
MessageDeduplicationId='msg-001'
)
sqs.send_message(
QueueUrl=queue_url,
MessageBody='{"action": "update"}',
MessageGroupId='user-123',
MessageDeduplicationId='msg-002'
)
# Consumer receives in order for user-1231
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
重複排除
FIFOキューは以下で重複排除します:
- MessageDeduplicationId(明示的)
- コンテンツハッシュ(コンテンツベースの重複排除が有効な場合)
ウィンドウ: 5分
ウィンドウ内の同一ID → メッセージはドロップ1
2
3
4
5
6
2
3
4
5
6
スループット制限
Standard SQS: 無制限のスループット
FIFO SQS: 300 msg/sec(バッチングで3000)
メッセージグループあたり: 最大300 msg/sec
スケールするには複数グループを使用1
2
3
4
5
2
3
4
5
順序保証の実装
シーケンス番号
python
class OrderedProducer:
def __init__(self):
self.sequence = {} # key → last sequence
def send(self, key, message):
seq = self.sequence.get(key, 0) + 1
self.sequence[key] = seq
message['_seq'] = seq
queue.send(key=key, message=message)
class OrderedConsumer:
def __init__(self):
self.expected_seq = {} # key → expected next
self.buffer = {} # key → out-of-order messages
def process(self, key, message):
seq = message['_seq']
expected = self.expected_seq.get(key, 1)
if seq == expected:
# In order - process
handle(message)
self.expected_seq[key] = seq + 1
# Check buffer for next messages
self.process_buffered(key)
elif seq > expected:
# Out of order - buffer
self.buffer.setdefault(key, {})[seq] = message
# seq < expected: Duplicate, ignore1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
リシーケンシングバッファ
受信(順序外): 3, 1, 4, 2, 5
バッファの状態:
3を受信: buffer=[3], 1を待機
1を受信: 1を処理, buffer=[3], 2を待機
4を受信: buffer=[3,4], 2を待機
2を受信: 2,3,4を処理, buffer=[], 5を待機
5を受信: 5を処理
考慮事項:
- バッファサイズの制限
- 欠落シーケンスのタイムアウト
- ギャップ検出1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
ギャップの処理
python
def handle_potential_gap(key, expected, received):
gap_start = expected
gap_end = received - 1
# Wait for gap to fill
wait_until = time.time() + GAP_TIMEOUT
while time.time() < wait_until:
if gap_filled(key, gap_start, gap_end):
return True
sleep(0.1)
# Gap timeout - decide action
if GAP_POLICY == 'skip':
log.warn(f"Skipping gap {gap_start}-{gap_end}")
return True
elif GAP_POLICY == 'fail':
raise GapError(f"Gap detected: {gap_start}-{gap_end}")1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
順序保証とスケーリング
パーティション戦略
エンティティIDによる:
user-123 → partition 0
user-456 → partition 1
user-123のすべてのイベントが順序保証 ✓
時間バケットによる:
イベント 00:00-00:05 → partition 0
イベント 00:05-00:10 → partition 1
バケット内で時間順
ハッシュによる:
hash(key) % num_partitions
均一分散1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
パーティションの増加
初期: 4パーティション
Key A → partition 1
Key B → partition 3
パーティション追加後: 8パーティション
Key A → partition 5(異なる!)
Key B → partition 3(変わる可能性あり)
問題: キー→パーティションのマッピングが変わる
解決策:
- 初めから多めにパーティション分割する(100以上)
- コンシステントハッシングを使用する
- パーティション増加をコンシューマーと調整する1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
並列処理の制限
厳密に順序付けされたキュー:
最大並列度 = キーの数
1000のユニークキー = 1000の並列操作
単一のキーに高ボリュームがある場合:
そのキーがボトルネックになる
タイムウィンドウイングまたはサブキーを検討する1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
一般的なパターン
エンティティごとの順序
python
# All events for an entity go to same partition
def get_partition_key(event):
return event.entity_id
# Examples:
# Order events → key = order_id
# User events → key = user_id
# Session events → key = session_id1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
因果関係による順序
イベントBがイベントAに依存する場合:
同じパーティションキーを使用する
ユーザーが注文を作成 → 注文イベント
両方のキー: order_id
作成が更新の前に保証される
ただし: ユーザープロフィール更新は注文の順序を必要としない
異なるパーティションキーでOK1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
ハイブリッド順序
クリティカルパス: FIFOキュー(順序保証あり、遅い)
ベストエフォート: Standardキュー(高速、順序保証なし)
Create/Update/Delete → FIFO(順序が重要)
アナリティクスイベント → Standard(順序は不要)1
2
3
4
5
2
3
4
5
トレードオフ
| 順序レベル | スループット | レイテンシ | 複雑さ |
|---|---|---|---|
| なし | 最高 | 最低 | 最低 |
| プロデューサーごと | 高 | 低 | 低 |
| キーごと | 中 | 中 | 中 |
| 全体 | 最低 | 最高 | 最高 |
決定フレームワーク
質問1: メッセージは共有状態に影響するか?
いいえ → 順序保証は不要
はい → 続ける
質問2: 状態はキーでパーティション分割されているか?
はい → キーごとの順序保証で十分
いいえ → 続ける
質問3: 全体順序保証が本当に必要か?
通常はいいえ → 設計を再検討する
はい → パフォーマンスペナルティを受け入れる1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
順序問題のデバッグ
順序外の検出
python
def detect_out_of_order(messages):
issues = []
last_seq = {}
for msg in messages:
key = msg.partition_key
seq = msg.sequence
if key in last_seq:
if seq <= last_seq[key]:
issues.append({
'key': key,
'expected': last_seq[key] + 1,
'got': seq
})
last_seq[key] = seq
return issues1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
順序のためのロギング
python
logger.info(f"Received message",
extra={
'message_id': msg.id,
'partition': msg.partition,
'offset': msg.offset,
'key': msg.key,
'sequence': msg.sequence,
'timestamp': msg.timestamp
}
)
# Enables post-hoc ordering analysis1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
Kafkaパーティション順序の詳細
順序保証のスコープ
単一パーティション内:
オフセットによる全体順序。コンシューマーは0, 1, 2, 3...と順次読み取る。
パーティション間:
順序保証なし。Partition 0のオフセット5はPartition 1のオフセット5より
新しいかもしれないし古いかもしれない。poll()は任意の順序でバッチを返す。1
2
3
4
5
6
2
3
4
5
6
パーティションキーの選択
キーは因果的に関連するメッセージをグループ化する:
注文ライフサイクル → order_id(作成、支払い、発送 → 順序保証)
ユーザーアクティビティ → user_id(ログイン、クリック、ログアウト → 順序保証)
デバイステレメトリ → device_id(時系列の読み取り値)
アンチパターン:
random_uuid → 負荷を分散するが順序が壊れる
event_type → 無関係なエンティティをグループ化する1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
ホットパーティション問題
著名ユーザーのuser_id → 1つのパーティションに100倍のトラフィック、他はアイドル。
緩和策:
1. 複合キー(user_id + session_id)— ユーザー単位からセッション単位の順序保証にトレード
2. 不均衡を受け入れる — ホットコンシューマーを垂直にスケール、パーティションラグを監視
3. アプリレベルシャーディング — 仮想サブユーザーに分割し、下流でマージ1
2
3
4
5
6
2
3
4
5
6
リバランスと順序
コンシューマーグループリバランス(コンシューマーの参加/離脱/クラッシュ):
Eagerプロトコル(2.4以前のデフォルト):
すべてのコンシューマーがリバランス中にフェッチを停止
短い処理ギャップだが、順序外配信はない
リバランス後、最後にコミットされたオフセットから再開
Cooperativeリバランシング(Kafka 2.4+):
取り消されたパーティションのみ停止、他は処理を継続
影響範囲を縮小 — 大規模コンシューマーグループに推奨1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
インフライトリクエストとリトライ
プロデューサー設定: max.in.flight.requests.per.connection
5に設定(デフォルト):
Batch 1が失敗、Batch 2が成功、Batch 1がリトライ
→ ブローカーが受信: Batch 2, Batch 1 → 順序外
1に設定:
1リクエストずつ → 正しい順序だが、スループットが低下
冪等プロデューサー(enable.idempotence=true):
ブローカーがプロデューサーのシーケンスを追跡し、順序外の書き込みを拒否
max.in.flight=5でも安全。順序重視のワークロードに推奨。1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
サービス間の順序保証
根本的な問題
単一サービス: BがE2でクラッシュ → 再起動 → オフセットからリプレイ → 問題なし
サービス間: BがE1→F1、E2→F2を処理し、Cにパブリッシュ
F2がF1よりCに早く到着(異なるトピック/パーティション)→ 順序外
独立したトピックとサービス間の順序はどのブローカーも保証しません。1
2
3
4
5
6
2
3
4
5
6
サービス間順序のためのシーケンス番号
python
# Publisher embeds monotonic sequence per aggregate
def publish(self, aggregate_id, event):
version = self.store.increment_version(aggregate_id)
event['aggregate_version'] = version
broker.send(key=aggregate_id, value=event)
# Consumer enforces version ordering
def on_event(self, event):
version = event['aggregate_version']
last = self.store.get_last_version(event['aggregate_id'])
if version == last + 1: # Expected → apply
self.apply(event)
self.store.set_last_version(event['aggregate_id'], version)
elif version <= last: # Duplicate → skip
pass
else: # Future → buffer
self.buffer(event['aggregate_id'], event)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ベクタークロックによる因果順序
イベントがエンティティ間で因果関係の依存性を持つ場合:
Event A(ユーザー作成) → clock {user_svc: 1}
Event B(注文作成) → clock {order_svc: 1, user_svc: 1}
コンシューマーがBをAより先に受信:
user_svc:1が欠落 → Bをバッファ → Aを受信 → A、次にBを処理
よりシンプルな代替 — 因果トークン:
Event AがトークンT1を生成。Event Bが依存関係を宣言: [T1]。
コンシューマーが確認: T1を見たか? いいえ → Bをバッファ。
パーティションキー順序が不十分な場合(エンティティ間チェーン)にのみ使用。1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
順序保証 vs パフォーマンスのトレードオフ
保証のスペクトラム
| 保証 | スループット | 並列度 | 使用する場合 |
|---|---|---|---|
| 順序なし(ファンアウト) | 最大 | 無制限 | 通知、アナリティクス、ログ転送 |
| パーティション順序(キーごと) | 高 | パーティション数 | 注文ライフサイクル、ユーザーアクティビティ、デバイステレメトリ |
| 全体順序(単一パーティション) | 最低 | コンシューマー1台 | 金融台帳、分散ログ、チェンジログ |
スループット(概算)
Kafka(3ブローカー、100バイトメッセージ):
順序なし: ~2M msg/sec | パーティション: ~1M | 全体: ~50K
SQS:
Standard: ~120K msg/sec | FIFOグループあたり: 300(バッチ3000)
パーティション順序と全体順序のギャップは20倍以上です。1
2
3
4
5
6
7
2
3
4
5
6
7
90%ルール
ほとんどのシステムはパーティションレベルの順序保証だけで十分です。
問いかけ: 「異なるエンティティのイベントに相対的な順序が必要か?」
ほぼ常にいいえ:
ユーザーA vs ユーザーB → 独立
注文#100 vs #101 → 独立
本当にはいの場合:
共通のアグリゲートキーの下にエンティティをマージできるか?
シングルライターパターンを使用できるか?
代替手段を尽くした後にのみ: 単一パーティションペナルティを受け入れる。1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
順序回復パターン
順序外の検出
検出シグナル:
- シーケンスジャンプ: 5を受信、3を期待(ギャップ = [3, 4])
- タイムスタンプの後退: event.ts < last_processed.ts
- バージョンスキップ: aggregate_versionが2から5にジャンプ
モニタリング:
メトリクス: ordering_gap_detected{topic, partition, consumer_group}
ギャップ率がベースラインを超えたらアラート1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
バッファリング戦略
ギャップが埋まるまで将来のイベントを保持:
last_processed=2, buffer={5: event5}
3を受信 → 処理、4を受信 → 処理、バッファに5 → 処理
リスク: イベントが本当に失われた場合、バッファが無制限に増大
緩和策: キーごとのバッファサイズを制限(例: 最大1000イベント)1
2
3
4
5
6
7
2
3
4
5
6
7
タイムアウトして続行
N秒間欠落イベントを待った後、ギャップを受け入れる:
ギャップ [3,4] を検出 → 30秒タイマーを開始
タイムアウト → ギャップをログ、5にスキップ、バッファされたイベントを処理
遅延到着(タイムアウト後に3または4):
オプションA: 無視(通り過ぎた)
オプションB: 冪等であれば遡及的に適用
オプションC: 手動レビューのためデッドレターキュー1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
ソースからの再処理
ソースがリプレイをサポートしている場合、欠落イベントをリクエスト:
Kafka: consumer.seek(partition, offset) — 前方に再消費
SQS: Visibility Timeout後にメッセージが戻る — 自動リトライ
カスタム: GET /events?aggregate_id=X&after_version=2 — 直接取得
重要な原則: コンシューマーが冪等である場合にのみリプレイは安全です。
冪等性パターンについては 04-delivery-guarantees.md を参照してください。1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
重要なポイント
- 全体順序はコストが高い - 本当に必要でない限り避ける
- キーごとの順序で通常は十分 - エンティティIDでパーティション分割する
- 同じキー → 同じパーティション → 同じコンシューマー - 順序のチェーン
- シーケンス番号で検証が可能 - ギャップと重複を検出する
- 順序外のメッセージをバッファする - ギャップにはタイムアウトを設定
- パーティション増 = 並列度増 - ただしパーティション内順序保証
- スケーリングがキーマッピングに影響 - 初めから多めにパーティション分割
- 独立したキーを設計する - 並列度を最大化する