Dynamo: Amazon's Highly Available Key-Value Store
注: この記事は英語の原文を日本語に翻訳したものです。コードブロック、Mermaidダイアグラム、論文タイトル、システム名、技術用語は原文のまま保持しています。
論文概要
著者: Giuseppe DeCandia他 (Amazon) 発表: SOSP 2007 背景: Amazonのショッピングカートおよびセッションストレージ向けに設計
TL;DR
Dynamoは、特定の障害シナリオ下で一貫性を犠牲にする高可用性キーバリューストアです。パーティショニングにConsistent Hashing、競合検出にVector Clock、チューナブルな一貫性のためのQuorum ベースのレプリケーション、可用性のためのSloppy QuorumとHinted Handoff、レプリカ同期のためのMerkle Treeを用いたAnti-Entropyを使用しています。Dynamoは、現在分散データベースで標準となっている多くの技術を先駆けました。
課題
Amazonのeコマースプラットフォームには以下が求められました:
- 常に書き込み可能 - ショッピングカートは障害時でも書き込みを受け入れる必要があります
- 低レイテンシ - 99.9パーセンタイルの応答時間が重要です(テールレイテンシ)
- 高可用性 - 障害 = 売上損失
- スケーラブル - ホリデーシーズンのトラフィックスパイクに対応
従来のRDBMSではこれらの要件を満たせませんでした:
- シングルマスターの書き込みが可用性を制限
- 強い一貫性がレイテンシを増加
- 垂直スケーリングには限界がある
設計哲学
┌─────────────────────────────────────────────────────────────────────────┐
│ Dynamo Design Principles │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ CAP Theorem Choice │ │
│ │ │ │
│ │ Consistency ──────────────── Availability │ │
│ │ │ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Partition Tolerance │ │
│ │ │ │
│ │ Dynamo chooses: AP (Availability + Partition Tolerance) │ │
│ │ Consistency is eventual, conflicts resolved at read time │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ 主要な設計判断: │
│ 1. インクリメンタルなスケーラビリティ(シームレスなノード追加) │
│ 2. 対称性(特別なノードなし) │
│ 3. 分散化(単一障害点なし) │
│ 4. 異種性(混在ハードウェアの対応) │
└─────────────────────────────────────────────────────────────────────────┘システムアーキテクチャ
Virtual Nodeを用いたConsistent Hashing
┌─────────────────────────────────────────────────────────────────────────┐
│ Consistent Hashing │
│ │
│ 基本的なConsistent Hashingの問題: │
│ - 不均一な分散(ランダムな位置) │
│ - ノードの追加/削除時にデータの移動が多すぎる │
│ │
│ 解決策: Virtual Node │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Physical Node A → Virtual Nodes: A1, A2, A3, A4, ... │ │
│ │ Physical Node B → Virtual Nodes: B1, B2, B3, B4, ... │ │
│ │ │ │
│ │ A1 │ │
│ │ ● │ │
│ │ ╱ ╲ │ │
│ │ B2 ● ● A3 │ │
│ │ ╱ ╲ │ │
│ │ ● ● │ │
│ │ B1 A2 │ │
│ │ ╲ ╱ │ │
│ │ A4 ● ● B4 │ │
│ │ ╲ ╱ │ │
│ │ ● │ │
│ │ B3 │ │
│ │ │ │
│ │ 利点: │ │
│ │ - 均一な分散(多くの位置で平均化) │ │
│ │ - 異種性(高性能マシンにより多くのVirtual Node) │ │
│ │ - より滑らかなリバランシング(Virtual Nodeを段階的に削除) │ │
│ └──────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘実装
from dataclasses import dataclass
from typing import Dict, List, Optional, Set
import hashlib
import bisect
@dataclass
class VirtualNode:
physical_node: str
token: int # Position on ring
class ConsistentHashRing:
"""
Consistent hash ring with virtual nodes.
"""
def __init__(self, replication_factor: int = 3):
self.replication_factor = replication_factor
self.ring: List[VirtualNode] = [] # Sorted by token
self.tokens: List[int] = [] # Just tokens, for binary search
self.nodes: Dict[str, int] = {} # node -> num_vnodes
def add_node(self, node: str, num_vnodes: int = 150):
"""Add physical node with virtual nodes"""
self.nodes[node] = num_vnodes
for i in range(num_vnodes):
# Hash node name + vnode index to get position
token = self._hash(f"{node}:{i}")
vnode = VirtualNode(physical_node=node, token=token)
# Insert in sorted order
pos = bisect.bisect_left(self.tokens, token)
self.ring.insert(pos, vnode)
self.tokens.insert(pos, token)
def remove_node(self, node: str):
"""Remove physical node and all its virtual nodes"""
if node not in self.nodes:
return
# Remove all vnodes for this physical node
self.ring = [vn for vn in self.ring if vn.physical_node != node]
self.tokens = [vn.token for vn in self.ring]
del self.nodes[node]
def get_preference_list(self, key: str) -> List[str]:
"""
Get preference list for a key.
Returns N distinct physical nodes responsible for key.
"""
if not self.ring:
return []
token = self._hash(key)
# Find first position >= key's token
pos = bisect.bisect_left(self.tokens, token)
if pos >= len(self.ring):
pos = 0
# Walk ring clockwise until we have N distinct physical nodes
preference_list = []
seen_nodes: Set[str] = set()
for i in range(len(self.ring)):
idx = (pos + i) % len(self.ring)
physical_node = self.ring[idx].physical_node
if physical_node not in seen_nodes:
preference_list.append(physical_node)
seen_nodes.add(physical_node)
if len(preference_list) >= self.replication_factor:
break
return preference_list
def get_coordinator(self, key: str) -> str:
"""Get the coordinator node for a key (first in preference list)"""
return self.get_preference_list(key)[0]
def _hash(self, key: str) -> int:
"""Hash key to ring position"""
return int(hashlib.md5(key.encode()).hexdigest(), 16)Vector Clock
┌─────────────────────────────────────────────────────────────────────────┐
│ Vector Clocks for Conflict Detection │
│ │
│ 問題: 異なるノードで同じキーへの並行書き込み │
│ 解決策: Vector Clockで因果関係を追跡 │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ 例: ショッピングカートの更新 │ │
│ │ │ │
│ │ 初期状態: cart = [], clock = [] │ │
│ │ │ │
│ │ クライアント1がNode Aで: │ │
│ │ 商品Xを追加 → cart = [X], clock = [(A,1)] │ │
│ │ │ │
│ │ クライアント2がAから読み取り、Node Bに書き込み: │ │
│ │ 商品Yを追加 → cart = [X,Y], clock = [(A,1), (B,1)] │ │
│ │ │ │
│ │ クライアント3がAから読み取り(Bの書き込み前)、Node Cに書き込み:│ │
│ │ 商品Zを追加 → cart = [X,Z], clock = [(A,1), (C,1)] │ │
│ │ │ │
│ │ 競合を検出! どちらのClockも支配しない: │ │
│ │ [(A,1), (B,1)] vs [(A,1), (C,1)] │ │
│ │ │ │
│ │ 解決: 両方のバージョンをクライアントに返す │ │
│ │ クライアントがマージ: cart = [X,Y,Z], clock = [(A,1), (B,1), (C,1)]│ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ Clock比較ルール: │
│ - Clock AがBを支配する条件: all(A[i] >= B[i]) かつ any(A[i] > B[i]) │
│ - どちらも支配しない → 並行 → 競合 │
└─────────────────────────────────────────────────────────────────────────┘実装
from dataclasses import dataclass, field
from typing import Dict, List, Tuple, Optional
import copy
@dataclass
class VectorClock:
"""
Vector clock for tracking causality.
Maps node_id -> logical timestamp
"""
clock: Dict[str, int] = field(default_factory=dict)
def increment(self, node_id: str):
"""Increment clock for a node (on write)"""
self.clock[node_id] = self.clock.get(node_id, 0) + 1
def merge(self, other: 'VectorClock') -> 'VectorClock':
"""Merge two clocks (take max of each component)"""
merged = VectorClock()
all_nodes = set(self.clock.keys()) | set(other.clock.keys())
for node in all_nodes:
merged.clock[node] = max(
self.clock.get(node, 0),
other.clock.get(node, 0)
)
return merged
def dominates(self, other: 'VectorClock') -> bool:
"""Check if this clock causally dominates other"""
dominated = False
for node in set(self.clock.keys()) | set(other.clock.keys()):
self_val = self.clock.get(node, 0)
other_val = other.clock.get(node, 0)
if self_val < other_val:
return False # Other has something we don't
if self_val > other_val:
dominated = True
return dominated
def concurrent_with(self, other: 'VectorClock') -> bool:
"""Check if clocks are concurrent (neither dominates)"""
return not self.dominates(other) and not other.dominates(self)
def copy(self) -> 'VectorClock':
return VectorClock(clock=copy.copy(self.clock))
@dataclass
class VersionedValue:
"""Value with its vector clock"""
value: any
clock: VectorClock
timestamp: float # Wall clock for display/pruning
class DynamoStore:
"""
Key-value store with vector clock versioning.
"""
def __init__(self, node_id: str):
self.node_id = node_id
self.data: Dict[str, List[VersionedValue]] = {}
def get(self, key: str) -> List[VersionedValue]:
"""
Get all versions of a key.
Returns list because there may be conflicts.
"""
return self.data.get(key, [])
def put(
self,
key: str,
value: any,
context: Optional[VectorClock] = None
) -> VectorClock:
"""
Put a value with optional context (from previous get).
Returns new vector clock.
"""
if context:
new_clock = context.copy()
else:
new_clock = VectorClock()
new_clock.increment(self.node_id)
new_version = VersionedValue(
value=value,
clock=new_clock,
timestamp=time.time()
)
existing = self.data.get(key, [])
# Remove versions dominated by new version
surviving = [
v for v in existing
if not new_clock.dominates(v.clock)
]
# Add new version
surviving.append(new_version)
self.data[key] = surviving
return new_clock
def reconcile(
self,
key: str,
remote_versions: List[VersionedValue]
):
"""
Reconcile local versions with remote versions.
Used during read repair and anti-entropy.
"""
local = self.data.get(key, [])
all_versions = local + remote_versions
# Keep only versions not dominated by any other
surviving = []
for v in all_versions:
dominated = any(
other.clock.dominates(v.clock)
for other in all_versions
if other is not v
)
if not dominated:
# Check for duplicates (same clock)
is_dup = any(
s.clock.clock == v.clock.clock
for s in surviving
)
if not is_dup:
surviving.append(v)
self.data[key] = survivingQuorumシステム
┌─────────────────────────────────────────────────────────────────────────┐
│ Quorum-Based Replication │
│ │
│ N = レプリカ数 │
│ R = Read Quorum(読み取りノード数) │
│ W = Write Quorum(書き込みノード数) │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ 一貫性の保証: │ │
│ │ │ │
│ │ R + W > N → 強い一貫性(読み取りが最新の書き込みを確認) │ │
│ │ │ │
│ │ 例: N=3, R=2, W=2 │ │
│ │ - 応答前に2ノードに書き込み │ │
│ │ - 2ノードから読み取り │ │
│ │ - 少なくとも1ノードが最新値を保持(オーバーラップ保証) │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ 一般的な構成: │ │
│ │ │ │
│ │ (N=3, R=1, W=3): 高速読み取り、低速書き込み、強い一貫性 │ │
│ │ (N=3, R=3, W=1): 低速読み取り、高速書き込み、古いデータのリスク│ │
│ │ (N=3, R=2, W=2): バランス型(Dynamoデフォルト) │ │
│ │ (N=3, R=1, W=1): 高速だが弱い一貫性 │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ 書き込み操作 (W=2, N=3): │ │
│ │ │ │
│ │ Coordinator ────┬───▶ Replica A ✓ │ │
│ │ ├───▶ Replica B ✓ → 成功 (2 acks) │ │
│ │ └───▶ Replica C (非同期、失敗の可能性あり) │ │
│ └──────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘実装
from dataclasses import dataclass
from typing import List, Optional, Tuple
import asyncio
@dataclass
class QuorumConfig:
n: int = 3 # Replication factor
r: int = 2 # Read quorum
w: int = 2 # Write quorum
class DynamoCoordinator:
"""
Coordinates read/write operations with quorum.
"""
def __init__(
self,
node_id: str,
ring: ConsistentHashRing,
quorum: QuorumConfig
):
self.node_id = node_id
self.ring = ring
self.quorum = quorum
self.local_store = DynamoStore(node_id)
async def get(self, key: str) -> Tuple[any, VectorClock]:
"""
Read with quorum.
Returns merged value and context for subsequent writes.
"""
preference_list = self.ring.get_preference_list(key)
# Send read requests to N nodes
tasks = []
for node in preference_list[:self.quorum.n]:
task = self._read_from_node(node, key)
tasks.append(task)
# Wait for R responses
responses = []
for coro in asyncio.as_completed(tasks):
try:
result = await coro
if result:
responses.append(result)
if len(responses) >= self.quorum.r:
break
except Exception:
continue
if len(responses) < self.quorum.r:
raise InsufficientReplicasError("Could not reach read quorum")
# Merge all versions
all_versions = []
for versions in responses:
all_versions.extend(versions)
# Reconcile (keep non-dominated versions)
reconciled = self._reconcile_versions(all_versions)
# Read repair: update stale replicas asynchronously
asyncio.create_task(
self._read_repair(key, reconciled, preference_list)
)
if len(reconciled) == 1:
return reconciled[0].value, reconciled[0].clock
else:
# Multiple versions - return all for client to resolve
return [v.value for v in reconciled], self._merge_clocks(reconciled)
async def put(
self,
key: str,
value: any,
context: Optional[VectorClock] = None
) -> bool:
"""
Write with quorum.
Context should be from a previous get (for conflict handling).
"""
preference_list = self.ring.get_preference_list(key)
# Prepare versioned value
if context:
new_clock = context.copy()
else:
new_clock = VectorClock()
new_clock.increment(self.node_id)
versioned = VersionedValue(
value=value,
clock=new_clock,
timestamp=time.time()
)
# Send write requests to N nodes
tasks = []
for node in preference_list[:self.quorum.n]:
task = self._write_to_node(node, key, versioned)
tasks.append(task)
# Wait for W acknowledgments
acks = 0
for coro in asyncio.as_completed(tasks):
try:
await coro
acks += 1
if acks >= self.quorum.w:
return True
except Exception:
continue
if acks < self.quorum.w:
raise InsufficientReplicasError("Could not reach write quorum")
return True
def _reconcile_versions(
self,
versions: List[VersionedValue]
) -> List[VersionedValue]:
"""Keep only versions not dominated by others"""
surviving = []
for v in versions:
dominated = any(
other.clock.dominates(v.clock)
for other in versions
if other is not v
)
if not dominated:
# Deduplicate by clock
is_dup = any(
s.clock.clock == v.clock.clock
for s in surviving
)
if not is_dup:
surviving.append(v)
return surviving
def _merge_clocks(
self,
versions: List[VersionedValue]
) -> VectorClock:
"""Merge clocks from multiple versions"""
merged = VectorClock()
for v in versions:
merged = merged.merge(v.clock)
return merged
async def _read_repair(
self,
key: str,
reconciled: List[VersionedValue],
preference_list: List[str]
):
"""Update stale replicas with reconciled versions"""
for node in preference_list[:self.quorum.n]:
try:
await self._write_to_node(node, key, reconciled[0])
except Exception:
pass # Best effortSloppy QuorumとHinted Handoff
┌─────────────────────────────────────────────────────────────────────────┐
│ Sloppy Quorum for High Availability │
│ │
│ 問題: Preference List内のいずれかのNノードがダウンすると │
│ Strict Quorumが失敗する │
│ │
│ 解決策: Hinted Handoffを伴うSloppy Quorum │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ キーKのPreference List: [A, B, C] │ │
│ │ ノードAがダウン │ │
│ │ │ │
│ │ Strict Quorum: W=2でAがダウンすると書き込み失敗 │ │
│ │ │ │
│ │ Sloppy Quorum: │ │
│ │ - リングをさらに歩く │ │
│ │ - 次の正常なノード(D)を見つける │ │
│ │ - B, C, Dに書き込み(DはAへの「ヒント」を保持) │ │
│ │ - Aが復旧したら、DがAにヒントを送信 │ │
│ │ - Dはヒントデータを削除 │ │
│ │ │ │
│ │ Ring │ │
│ │ A (down) │ │
│ │ ● │ │
│ │ ╱ ╲ │ │
│ │ B ● ● C Kのデータは B, C, D に書き込み │ │
│ │ ╱ ╲ Dはヒント保持:「これはAのもの」 │ │
│ │ D ●─────────────────● E │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ トレードオフ: │
│ - 可用性の向上(障害時でも書き込みが成功) │
│ - 耐久性保証の弱体化(データが「間違った」ノードに存在) │
│ - 結果整合性(ヒント配信は非同期) │
└─────────────────────────────────────────────────────────────────────────┘実装
@dataclass
class HintedValue:
"""Value stored as hint for another node"""
target_node: str
key: str
value: VersionedValue
created_at: float
class HintedHandoffManager:
"""
Manages hinted handoff for temporarily failed nodes.
"""
def __init__(self, node_id: str, max_hints_per_node: int = 10000):
self.node_id = node_id
self.hints: Dict[str, List[HintedValue]] = {} # target_node -> hints
self.max_hints = max_hints_per_node
def store_hint(
self,
target_node: str,
key: str,
value: VersionedValue
):
"""Store a hint for a failed node"""
if target_node not in self.hints:
self.hints[target_node] = []
hint = HintedValue(
target_node=target_node,
key=key,
value=value,
created_at=time.time()
)
self.hints[target_node].append(hint)
# Limit hints per node
if len(self.hints[target_node]) > self.max_hints:
self.hints[target_node] = self.hints[target_node][-self.max_hints:]
async def deliver_hints(self, target_node: str):
"""Deliver stored hints when node recovers"""
hints = self.hints.pop(target_node, [])
for hint in hints:
try:
await self._send_hint(target_node, hint)
except Exception:
# Re-queue failed hints
self.store_hint(target_node, hint.key, hint.value)
async def run_hint_delivery(self, node_monitor):
"""Background task to deliver hints to recovered nodes"""
while True:
for target_node in list(self.hints.keys()):
if await node_monitor.is_alive(target_node):
await self.deliver_hints(target_node)
await asyncio.sleep(10) # Check every 10 secondsMerkle Treeを用いたAnti-Entropy
┌─────────────────────────────────────────────────────────────────────────┐
│ Merkle Trees for Replica Sync │
│ │
│ 問題: レプリカ間の差分を効率的に見つけるには? │
│ 素朴な方法: すべてのキーを比較 → O(n)のデータ転送 │
│ 賢い方法: Merkle Tree → O(log n)で差分を検出 │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Merkle Tree構造 │ │
│ │ │ │
│ │ Root Hash │ │
│ │ │ │ │
│ │ ┌────────────┴────────────┐ │ │
│ │ │ │ │ │
│ │ Hash(L) Hash(R) │ │
│ │ │ │ │ │
│ │ ┌────┴────┐ ┌─────┴─────┐ │ │
│ │ │ │ │ │ │ │
│ │ Hash(LL) Hash(LR) Hash(RL) Hash(RR) │ │
│ │ │ │ │ │ │ │
│ │ [keys [keys [keys [keys │ │
│ │ 0-99] 100-199] 200-299] 300-399] │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ 同期プロセス: │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ 1. ルートハッシュを比較 │ │
│ │ - 等しい場合: レプリカは同一、終了 │ │
│ │ - 異なる場合: 下位へ降下 │ │
│ │ │ │
│ │ 2. 子ノードのハッシュを比較 │ │
│ │ - 異なるハッシュのサブツリーのみ降下 │ │
│ │ │ │
│ │ 3. リーフで: 異なるキーを交換 │ │
│ │ │ │
│ │ 結果: O(log n)のハッシュ比較 + 差分のみ転送 │ │
│ └──────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘実装
import hashlib
from typing import Dict, List, Optional, Tuple
class MerkleTree:
"""
Merkle tree for efficient replica comparison.
Each node in tree covers a range of the key space.
"""
def __init__(self, depth: int = 10):
self.depth = depth
self.num_leaves = 2 ** depth
# Tree stored as array: index 1 = root, 2*i = left child, 2*i+1 = right
self.tree: List[Optional[bytes]] = [None] * (2 * self.num_leaves)
self.dirty_leaves: set = set()
def update(self, key: str, value_hash: bytes):
"""Update tree with a key's value hash"""
leaf_idx = self._key_to_leaf(key)
tree_idx = self.num_leaves + leaf_idx
# Update leaf
self.tree[tree_idx] = self._combine_hash(
self.tree[tree_idx] or b'',
value_hash
)
self.dirty_leaves.add(leaf_idx)
def rebuild(self):
"""Rebuild internal nodes from dirty leaves upward"""
# Find all ancestors of dirty leaves
dirty_nodes = set()
for leaf_idx in self.dirty_leaves:
idx = self.num_leaves + leaf_idx
while idx > 1:
idx //= 2
dirty_nodes.add(idx)
# Rebuild from bottom up
for idx in sorted(dirty_nodes, reverse=True):
left = self.tree[2 * idx] or b''
right = self.tree[2 * idx + 1] or b''
self.tree[idx] = self._combine_hash(left, right)
self.dirty_leaves.clear()
def get_root_hash(self) -> bytes:
"""Get root hash of tree"""
return self.tree[1] or b''
def get_node_hash(self, level: int, index: int) -> bytes:
"""Get hash of node at given level and index"""
tree_idx = (2 ** level) + index
return self.tree[tree_idx] or b''
def find_differences(
self,
other: 'MerkleTree'
) -> List[Tuple[int, int]]:
"""
Find leaf ranges with differences.
Returns list of (start_leaf, end_leaf) ranges.
"""
differences = []
self._find_diff_recursive(other, 1, differences)
return differences
def _find_diff_recursive(
self,
other: 'MerkleTree',
idx: int,
differences: List[Tuple[int, int]]
):
"""Recursively find differing subtrees"""
if self.tree[idx] == other.tree[idx]:
return # Subtree identical
if idx >= self.num_leaves:
# Leaf node - this is a difference
leaf_idx = idx - self.num_leaves
differences.append((leaf_idx, leaf_idx + 1))
else:
# Internal node - check children
self._find_diff_recursive(other, 2 * idx, differences)
self._find_diff_recursive(other, 2 * idx + 1, differences)
def _key_to_leaf(self, key: str) -> int:
"""Map key to leaf index"""
key_hash = int(hashlib.md5(key.encode()).hexdigest(), 16)
return key_hash % self.num_leaves
def _combine_hash(self, left: bytes, right: bytes) -> bytes:
"""Combine two hashes"""
return hashlib.sha256(left + right).digest()
class AntiEntropyService:
"""
Background service for replica synchronization.
"""
def __init__(self, store: DynamoStore, ring: ConsistentHashRing):
self.store = store
self.ring = ring
self.merkle_trees: Dict[str, MerkleTree] = {} # key_range -> tree
async def sync_with_replica(self, replica_node: str, key_range: str):
"""Synchronize key range with another replica"""
local_tree = self.merkle_trees.get(key_range)
if not local_tree:
return
# Get remote tree
remote_tree = await self._fetch_merkle_tree(replica_node, key_range)
# Find differences
diff_ranges = local_tree.find_differences(remote_tree)
# Exchange differing keys
for start_leaf, end_leaf in diff_ranges:
local_keys = self._get_keys_in_range(key_range, start_leaf, end_leaf)
remote_keys = await self._fetch_keys_in_range(
replica_node, key_range, start_leaf, end_leaf
)
# Exchange data
for key, versions in remote_keys.items():
self.store.reconcile(key, versions)
for key in local_keys:
versions = self.store.get(key)
await self._send_versions(replica_node, key, versions)
async def run(self):
"""Background anti-entropy loop"""
while True:
# Pick random key range and replica to sync
key_range = self._pick_random_range()
replica = self._pick_random_replica(key_range)
try:
await self.sync_with_replica(replica, key_range)
except Exception:
pass # Best effort
await asyncio.sleep(60) # Sync every minuteGossipベースの障害検出
┌─────────────────────────────────────────────────────────────────────────┐
│ Gossip-Based Failure Detection │
│ │
│ 各ノードが保持する情報: │
│ - 全ノードのリストとそのHeartbeatカウンター │
│ - 最後にHeartbeatを受信したタイムスタンプ │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Gossipプロトコル: │ │
│ │ │ │
│ │ T秒ごとに: │ │
│ │ 1. 自分のHeartbeatカウンターをインクリメント │ │
│ │ 2. ランダムなノードを選択 │ │
│ │ 3. メンバーシップリストを送信 │ │
│ │ 4. 受信したリストをマージ(最大Heartbeatを採用) │ │
│ │ │ │
│ │ T_fail秒間Heartbeatが増加しない場合: │ │
│ │ - ノードを疑わしいとマーク │ │
│ │ │ │
│ │ T_cleanup秒間疑わしい状態が続く場合: │ │
│ │ - ノードを障害とマーク │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ 特性: │
│ - 分散型(中央の監視ノードなし) │
│ - 結果整合性のあるメンバーシップビュー │
│ - O(log N)の収束時間 │
│ - ネットワーク分断に対してロバスト │
└─────────────────────────────────────────────────────────────────────────┘論文からの主要結果
パフォーマンス
| メトリクス | 99.9パーセンタイル |
|---|---|
| 読み取りレイテンシ | 10ms |
| 書き込みレイテンシ | 10ms |
| 成功率 | 99.94% |
耐久性
- データは複数のデータセンターにレプリケーションされています
- Vector Clockにより更新の損失がありません
- Anti-Entropyがあらゆる乖離を検出します
影響とレガシー
直接的な後継システム
- Apache Cassandra - Dynamoに触発されたオープンソースデータベース
- Riak - Dynamoの商用実装
- Voldemort - LinkedInのキーバリューストア
広く採用されたコンセプト
- Consistent Hashing(ほぼすべての分散システムで使用)
- Vector Clock(Conflict-freeデータ型、CRDT)
- Quorumベースのレプリケーション
- Gossipプロトコル
- Sloppy QuorumとHinted Handoff
主な学び
一貫性よりも可用性 - ショッピングカートでは、書き込みを受け入れることが完全な一貫性よりも重要です。
読み取り時の競合解決 - 書き込みをブロックせず、クライアントに競合を解決させます。
均一な分散のためのVirtual Node - ノードごとに複数のポジションを持つことでConsistent Hashingの不均衡を解決します。
Vector Clockで因果関係を追跡 - ブロックなしで並行書き込みを検出します。
チューナブルな一貫性 - N、R、Wパラメータによりアプリケーションがトレードオフを選択できます。
Sloppy Quorumで可用性を維持 - 任意のN個の正常なノードに書き込み、後でハンドオフします。
効率的な同期のためのMerkle Tree - データではなくツリーを比較して差分を検出します。
分散型メンバーシップのためのGossip - クラスタ状態に単一障害点がありません。