ゼロトラストアーキテクチャ
Note: This article was translated from English to Japanese. The original version is available at
10-security/05-zero-trust-architecture.md.
TL;DR
ゼロトラストは境界ベースのセキュリティ(「内部ネットワークを信頼する」)を「決して信頼せず、常に検証する」に置き換えます。すべてのリクエストはネットワークの場所に関係なく認証・認可されます。アイデンティティが新たな境界になります。
境界セキュリティの問題
従来のモデル(城と堀)
Problem: Once inside the perimeter, everything trusts everything
- Compromised laptop → access to all internal systems
- Lateral movement is trivial
- VPN = keys to the kingdom境界セキュリティが失敗する理由
- クラウド導入:リソースが複数のネットワークにまたがる
- リモートワーク:ユーザーがどこからでも接続する
- BYOD:個人デバイスが企業ネットワーク上にある
- サプライチェーン攻撃:信頼されたベンダーが侵害される
- 内部脅威:悪意のあるまたは侵害された内部関係者
- 高度な攻撃者:境界はいずれ突破される
ゼロトラストの原則
基本理念
1. Never Trust, Always Verify
- No implicit trust based on network location
- Every request is fully authenticated and authorized
2. Assume Breach
- Design as if attackers are already inside
- Minimize blast radius of any compromise
3. Least Privilege Access
- Minimum permissions needed for the task
- Just-in-time and just-enough access
4. Verify Explicitly
- Use all available data points for decisions
- Identity, device, location, behavior, data sensitivityゼロトラストアーキテクチャ
境界としてのアイデンティティ
強力なアイデンティティ検証
python
class ZeroTrustAuthenticator:
def authenticate(self, request):
# 1. Verify user identity
user_identity = self.verify_user_identity(request)
if not user_identity:
return AuthResult.DENIED, "Invalid user credentials"
# 2. Verify device identity and health
device_identity = self.verify_device(request)
if not device_identity.is_managed:
return AuthResult.STEP_UP_REQUIRED, "Unmanaged device"
if not device_identity.is_compliant:
return AuthResult.DENIED, "Device not compliant"
# 3. Check context (location, time, behavior)
context = self.evaluate_context(request, user_identity)
# 4. Calculate risk score
risk_score = self.calculate_risk_score(
user_identity,
device_identity,
context
)
# 5. Make access decision based on policy
return self.policy_engine.evaluate(
user_identity,
device_identity,
context,
risk_score,
request.resource
)デバイスの信頼
Device Trust Levels:
Level 0 - Unknown Device
├── No access to sensitive resources
├── Limited functionality
└── Prompted to enroll device
Level 1 - Known Device
├── Device registered
├── Basic security checks pass
└── Access to standard resources
Level 2 - Managed Device
├── MDM enrolled
├── Security policies enforced
├── Encryption verified
└── Access to sensitive resources
Level 3 - Compliant Device
├── All of Level 2
├── Up-to-date patches
├── No malware detected
├── Hardware attestation
└── Access to highly sensitive resourcesデバイスヘルスチェック
python
class DeviceHealthChecker:
def check_device_health(self, device):
checks = {
'os_version': self.check_os_version(device),
'patch_level': self.check_patch_level(device),
'encryption': self.check_disk_encryption(device),
'firewall': self.check_firewall_enabled(device),
'antivirus': self.check_antivirus_status(device),
'jailbreak': self.check_not_jailbroken(device),
'screen_lock': self.check_screen_lock(device),
}
# All checks must pass for compliant status
is_compliant = all(checks.values())
return DeviceHealthResult(
is_compliant=is_compliant,
checks=checks,
last_checked=datetime.utcnow()
)マイクロセグメンテーション
ネットワークセグメンテーション
Each segment has explicit allow rules, default denyサービスレベルのセグメンテーション
yaml
# Service mesh policy (e.g., Istio)
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: payment-service-policy
namespace: production
spec:
selector:
matchLabels:
app: payment-service
rules:
- from:
- source:
principals: ["cluster.local/ns/production/sa/order-service"]
to:
- operation:
methods: ["POST"]
paths: ["/api/v1/payments"]
- from:
- source:
principals: ["cluster.local/ns/production/sa/admin-service"]
to:
- operation:
methods: ["GET"]
paths: ["/api/v1/payments/*"]継続的な検証
セッションの再評価
python
class ContinuousVerification:
def __init__(self):
self.verification_interval = 300 # 5 minutes
async def monitor_session(self, session):
while session.is_active:
# Reevaluate trust factors
current_risk = await self.evaluate_current_risk(session)
if current_risk > session.allowed_risk_threshold:
# Risk increased - take action
if current_risk > CRITICAL_THRESHOLD:
await self.terminate_session(session)
elif current_risk > HIGH_THRESHOLD:
await self.require_step_up_auth(session)
else:
await self.reduce_permissions(session)
await asyncio.sleep(self.verification_interval)
async def evaluate_current_risk(self, session):
factors = {
'location_change': await self.check_location_anomaly(session),
'behavior_anomaly': await self.check_behavior_anomaly(session),
'device_health': await self.check_device_health(session.device),
'threat_intel': await self.check_threat_intelligence(session),
'time_anomaly': self.check_time_anomaly(session),
}
return self.calculate_composite_risk(factors)行動分析
python
class UserBehaviorAnalytics:
def analyze_request(self, user, request):
baseline = self.get_user_baseline(user)
anomalies = []
# Location analysis
if not self.is_typical_location(user, request.ip):
anomalies.append(AnomalyType.UNUSUAL_LOCATION)
# Time analysis
if not self.is_typical_time(user, request.timestamp):
anomalies.append(AnomalyType.UNUSUAL_TIME)
# Access pattern analysis
if self.is_unusual_resource_access(user, request.resource):
anomalies.append(AnomalyType.UNUSUAL_RESOURCE)
# Volume analysis
if self.is_unusual_volume(user, request.timestamp):
anomalies.append(AnomalyType.UNUSUAL_VOLUME)
# Velocity analysis (impossible travel)
if self.is_impossible_travel(user, request):
anomalies.append(AnomalyType.IMPOSSIBLE_TRAVEL)
return RiskAssessment(anomalies=anomalies)
def is_impossible_travel(self, user, request):
last_location = self.get_last_location(user)
if not last_location:
return False
current_location = self.geolocate(request.ip)
distance = self.calculate_distance(last_location, current_location)
time_diff = request.timestamp - last_location.timestamp
# Speed > 1000 km/h is physically impossible
speed = distance / (time_diff.total_seconds() / 3600)
return speed > 1000BeyondCorpモデル(Googleの実装)
アーキテクチャ
主要コンポーネント
1. Device Inventory
- Every device has unique certificate
- Device properties tracked centrally
- Health status continuously updated
2. User/Group Database
- SSO integration
- Group memberships
- Job functions and access levels
3. Access Proxy
- All access goes through proxy
- Terminates TLS
- Enforces authentication
- Makes policy decisions
4. Access Control Engine
- Combines all trust signals
- Evaluates against policies
- Returns allow/deny decisions
5. Trust Inference Pipeline
- Continuously calculates trust levels
- Incorporates threat intelligence
- Updates in near-real-time実装戦略
フェーズ1:識別とカタログ化
1. Identify all resources
- Applications (internal and SaaS)
- Data stores
- Infrastructure
- APIs
2. Catalog users and devices
- User inventory
- Device inventory
- Service accounts
3. Map access patterns
- Who accesses what
- From where
- How often
4. Classify data sensitivity
- Public
- Internal
- Confidential
- Restrictedフェーズ2:アイデンティティの強化
1. Implement strong authentication
- MFA everywhere
- Passwordless where possible
- Hardware security keys for privileged users
2. Deploy device trust
- Device certificates
- MDM/endpoint management
- Device health attestation
3. Establish identity source of truth
- Single identity provider
- Unified directory
- Automated provisioning/deprovisioningフェーズ3:マイクロセグメンテーション
1. Segment networks
- Define security zones
- Implement network policies
- Deploy next-gen firewalls
2. Implement service mesh
- mTLS between services
- Service-to-service authorization
- Traffic encryption
3. Deploy application-level controls
- Web application firewall
- API gateway with auth
- Database access controlsフェーズ4:継続的な監視
1. Deploy SIEM
- Aggregate security logs
- Correlation rules
- Alerting
2. Implement UEBA
- Baseline normal behavior
- Detect anomalies
- Risk scoring
3. Automate response
- Automated containment
- Session termination
- Access revocationAPIのためのゼロトラスト
ポリシー適用ポイントとしてのAPIゲートウェイ
python
class ZeroTrustAPIGateway:
async def handle_request(self, request):
# 1. Authenticate caller (user or service)
identity = await self.authenticate(request)
if not identity:
return Response(401, "Authentication required")
# 2. Validate device/client
client_trust = await self.evaluate_client_trust(request)
if client_trust.level < MINIMUM_TRUST_LEVEL:
return Response(403, "Client trust level insufficient")
# 3. Evaluate context
context = await self.build_context(request, identity, client_trust)
# 4. Check authorization
authz_decision = await self.policy_engine.authorize(
identity,
request.resource,
request.action,
context
)
if not authz_decision.allowed:
return Response(403, authz_decision.reason)
# 5. Log for audit
await self.audit_log.record(request, identity, authz_decision)
# 6. Forward to backend
response = await self.forward_to_backend(request, identity)
# 7. Inspect response (DLP)
await self.inspect_response(response, identity, context)
return responseサービス間認証
python
# Using SPIFFE/SPIRE for workload identity
class ServiceIdentity:
def __init__(self, spire_client):
self.spire = spire_client
async def get_identity(self):
# Workload gets identity from SPIRE agent
svid = await self.spire.fetch_x509_svid()
return svid
async def call_service(self, target_service, request):
# Get our identity
svid = await self.get_identity()
# Create mTLS connection
ssl_context = ssl.create_default_context()
ssl_context.load_cert_chain(
certfile=svid.cert_chain,
keyfile=svid.private_key
)
# Make request with mTLS
async with aiohttp.ClientSession() as session:
async with session.post(
f"https://{target_service}/api",
json=request,
ssl=ssl_context
) as response:
return await response.json()課題とトレードオフ
パフォーマンスへの影響
Challenge: Every request requires multiple checks
- Identity verification
- Device health check
- Policy evaluation
- Context analysis
Mitigation:
- Cache trust decisions (with short TTL)
- Use efficient policy engines (e.g., OPA)
- Distribute policy enforcement points
- Asynchronous verification where acceptableユーザーエクスペリエンス
Challenge: Additional authentication friction
Mitigation:
- Risk-based authentication (step-up when needed)
- SSO reduces authentication prompts
- Passwordless authentication
- Transparent device authentication
- Remember trusted devices (within policy)レガシーシステム
Challenge: Older systems don't support modern auth
Mitigation:
- Place proxy/gateway in front
- Implement identity bridging
- Gradual migration plan
- Segment legacy systems more strictly複雑さ
Challenge: Significant increase in system complexity
Mitigation:
- Incremental implementation
- Strong automation
- Comprehensive monitoring
- Clear documentation
- Training for ops teamsメトリクスと監視
主要メトリクス
python
class ZeroTrustMetrics:
def __init__(self):
self.metrics = {
# Authentication metrics
'auth_attempts': Counter(),
'auth_failures': Counter(),
'mfa_challenges': Counter(),
# Authorization metrics
'access_granted': Counter(),
'access_denied': Counter(),
'policy_violations': Counter(),
# Device metrics
'compliant_devices': Gauge(),
'non_compliant_devices': Gauge(),
'unknown_devices': Gauge(),
# Risk metrics
'high_risk_sessions': Gauge(),
'anomaly_detections': Counter(),
'session_terminations': Counter(),
# Performance
'policy_eval_latency': Histogram(),
'auth_latency': Histogram(),
}ダッシュボード
Zero Trust Dashboard:
Access Overview (Last 24h)
Granted: 45,231 (↑ 5%) | Denied: 1,247 (↓ 12%) | Step-up: 892 (↑ 23%)
Device Compliance: 85% Compliant
Risk Score Distribution: Low 75% | Medium 18% | High 7%