Outbox Pattern
TL;DR
The outbox pattern ensures reliable message publishing by writing messages to a database table (outbox) in the same transaction as business data. A separate process reads from the outbox and publishes to the message broker. This guarantees atomicity between database writes and message publishing, solving the dual-write problem.
The Dual-Write Problem
Naive Approach
python
def create_order(order):
# Step 1: Save to database
db.save(order)
# Step 2: Publish event
message_queue.publish(OrderCreated(order))Failure Scenarios
Scenario 1: DB succeeds, publish fails
db.save(order) ✓ (committed)
mq.publish(event) ✗ (failed)
Result: Order exists, but no event
Downstream systems never know
Scenario 2: Publish succeeds, DB fails
db.save(order) (pending)
mq.publish(event) ✓ (published)
db.commit() ✗ (rolled back)
Result: Event exists, but no order
Downstream systems process phantom orderWhy Distributed Transactions Don't Help
XA/2PC:
- Not supported by most message brokers
- Slow (blocks on coordinator)
- Complex failure handling
Need simpler, more reliable approachThe Outbox Solution
Architecture
┌─────────────────────────────────────────────────────────┐
│ Database │
│ ┌─────────────┐ ┌─────────────────────────────┐ │
│ │ Orders │ │ Outbox │ │
│ │ ┌───────┐ │ │ ┌─────────────────────┐ │ │
│ │ │ Order │ │◄───┼──│ id, payload, status │ │ │
│ │ └───────┘ │ │ └─────────────────────┘ │ │
│ └─────────────┘ └─────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
▲ │
│ │ Poll
│ ▼
┌────────┴──────┐ ┌─────────────────────┐ ┌────────┐
│ Application │ │ Outbox Publisher │───►│ Broker │
└───────────────┘ └─────────────────────┘ └────────┘How It Works
1. Application writes business data AND outbox record
in SAME transaction
2. Transaction commits atomically
Both order and outbox record exist, or neither
3. Background process polls outbox
Reads unpublished messages
4. Publisher sends to message broker
Message delivered to queue/topic
5. Publisher marks outbox record as published
Prevents duplicate publishingImplementation
Outbox Table Schema
sql
CREATE TABLE outbox (
id UUID PRIMARY KEY,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
published_at TIMESTAMP NULL,
INDEX idx_outbox_unpublished (published_at) WHERE published_at IS NULL
);Writing to Outbox
python
def create_order(order_data):
with db.transaction():
# Create order
order = Order(**order_data)
db.add(order)
# Write to outbox (same transaction)
outbox_entry = OutboxEntry(
id=uuid4(),
aggregate_type="Order",
aggregate_id=str(order.id),
event_type="OrderCreated",
payload=json.dumps({
"order_id": str(order.id),
"customer_id": order.customer_id,
"total": order.total
})
)
db.add(outbox_entry)
# Transaction commits atomically
return orderOutbox Publisher (Polling)
python
class OutboxPublisher:
def __init__(self, db, broker):
self.db = db
self.broker = broker
def run(self):
while True:
self.publish_pending()
sleep(100) # Poll interval
def publish_pending(self):
# Get unpublished messages
entries = self.db.query("""
SELECT * FROM outbox
WHERE published_at IS NULL
ORDER BY created_at
LIMIT 100
FOR UPDATE SKIP LOCKED
""")
for entry in entries:
try:
# Publish to broker
self.broker.publish(
topic=f"{entry.aggregate_type}.{entry.event_type}",
message=entry.payload,
headers={"event_id": str(entry.id)}
)
# Mark as published
self.db.execute("""
UPDATE outbox
SET published_at = NOW()
WHERE id = %s
""", entry.id)
except BrokerError:
# Will retry on next poll
log.error(f"Failed to publish {entry.id}")CDC-Based Outbox
Using Change Data Capture
Instead of polling, use database log
Database ──► CDC (Debezium) ──► Kafka
Outbox table changes captured from binlog/WAL
Lower latency than polling
No separate publisher processDebezium Configuration
json
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "db.example.com",
"database.dbname": "myapp",
"table.include.list": "public.outbox",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload"
}CDC Benefits
+ Lower latency (near real-time)
+ No polling load on database
+ Guaranteed ordering (from log)
+ No missed messages
- More infrastructure (Debezium, Kafka Connect)
- CDC setup complexity
- Database must support log accessHandling Duplicates
Why Duplicates Happen
Scenario:
1. Publisher reads message from outbox
2. Publisher sends to broker ✓
3. Publisher crashes before marking published
4. New publisher instance starts
5. Same message published again
Consumer receives duplicate messageIdempotent Consumers
python
class OrderEventConsumer:
def handle(self, event):
event_id = event.headers["event_id"]
# Check if already processed
if self.is_processed(event_id):
log.info(f"Duplicate event {event_id}, skipping")
return
# Process event
self.process(event)
# Mark as processed
self.mark_processed(event_id)
def is_processed(self, event_id):
return redis.sismember("processed_events", event_id)
def mark_processed(self, event_id):
redis.sadd("processed_events", event_id)
redis.expire("processed_events", 86400) # 24hTransactional Deduplication
python
def handle(event):
event_id = event.headers["event_id"]
with db.transaction():
# Try to insert processing record
try:
db.execute("""
INSERT INTO processed_events (event_id, processed_at)
VALUES (%s, NOW())
""", event_id)
except UniqueViolation:
# Already processed
return
# Process event (same transaction)
process(event)Ordering Guarantees
Per-Aggregate Ordering
sql
-- Outbox entries ordered by aggregate
SELECT * FROM outbox
WHERE published_at IS NULL
ORDER BY aggregate_id, created_at
FOR UPDATE SKIP LOCKEDPartition by Aggregate
python
def publish(entry):
broker.publish(
topic="order-events",
key=entry.aggregate_id, # Same aggregate → same partition
value=entry.payload
)Handling Out-of-Order
If strict ordering required:
1. Single publisher per aggregate type
2. Or: Sequence numbers in messages
3. Or: Consumer reordering bufferCleanup Strategies
Delete After Publishing
python
# Immediately delete after successful publish
db.execute("DELETE FROM outbox WHERE id = %s", entry.id)Soft Delete with Cleanup
python
# Mark as published
db.execute("""
UPDATE outbox SET published_at = NOW() WHERE id = %s
""", entry.id)
# Separate cleanup job
@scheduled(cron="0 * * * *") # Hourly
def cleanup_outbox():
db.execute("""
DELETE FROM outbox
WHERE published_at < NOW() - INTERVAL '7 days'
""")Archive Before Delete
python
@scheduled(cron="0 0 * * *") # Daily
def archive_outbox():
# Move to archive table
db.execute("""
INSERT INTO outbox_archive
SELECT * FROM outbox
WHERE published_at < NOW() - INTERVAL '7 days'
""")
# Delete from main table
db.execute("""
DELETE FROM outbox
WHERE published_at < NOW() - INTERVAL '7 days'
""")Monitoring
Key Metrics
Outbox lag:
Count of unpublished messages
Should stay low
Publish latency:
Time from created_at to published_at
Indicates processing speed
Publish failures:
Rate of failed publish attempts
Indicates broker issues
Outbox size:
Total table size
Should be boundedAlerting
yaml
alerts:
- name: OutboxLagHigh
condition: count(unpublished) > 1000
for: 5m
- name: OutboxLatencyHigh
condition: avg(publish_latency) > 30s
for: 5m
- name: OutboxPublishFailing
condition: publish_error_rate > 0.01
for: 5mHealth Check
python
def outbox_health():
oldest_unpublished = db.query("""
SELECT MIN(created_at)
FROM outbox
WHERE published_at IS NULL
""")
if oldest_unpublished:
age = now() - oldest_unpublished
if age > timedelta(minutes=5):
return Health.DEGRADED
return Health.HEALTHYVariations
Inbox Pattern (Idempotent Consumer)
Mirror of outbox for consumers
Message arrives → Write to inbox → Process → Mark processed
Inbox table:
id, message_id, payload, processed_at
Guarantees idempotency at consumerTransactional Inbox
python
def handle_message(message):
with db.transaction():
# Check/insert inbox record
result = db.execute("""
INSERT INTO inbox (message_id, received_at)
VALUES (%s, NOW())
ON CONFLICT (message_id) DO NOTHING
RETURNING id
""", message.id)
if not result:
return # Already processed
# Process in same transaction
process(message)Key Takeaways
- Solves dual-write problem - Atomic database + message
- Same transaction is key - Business data + outbox together
- Polling or CDC - Choose based on latency needs
- Duplicates will happen - Consumers must be idempotent
- Order by aggregate - Preserve per-entity ordering
- Clean up regularly - Don't let outbox grow unbounded
- Monitor lag - Detect publishing problems early
- Inbox for consumers - Same pattern on receive side