Outbox Pattern
Overview
The Outbox Pattern ensures reliable event publishing by writing events to a database table (OUTBOX) within the same transaction as the main operation. A background process then publishes events to RabbitMQ, guaranteeing at-least-once delivery.
Benefits:
- ✅ At-least-once delivery guarantee
- ✅ Events never lost (transactional persistence)
- ✅ Eventual consistency across microservices
- ✅ Event replay for debugging
- ✅ Complete audit trail
Architecture:
Write Operation (POST/PUT/PATCH/DELETE)
↓
1. Update TB_ANAG_{DIM}00 (data table)
2. Insert OUTBOX (event table)
3. COMMIT transaction (atomic)
↓
Background Publisher Process
↓
Publish to RabbitMQ Fanout Exchange
↓
Subscribers (search-indexer, cache-invalidator, analytics, etc.)
Outbox Table
Schema
OUTBOX table stores all events:
CREATE TABLE OUTBOX (
OUTBOX_ID VARCHAR(36) PRIMARY KEY, -- UUID
EVENT_TYPE VARCHAR(100) NOT NULL, -- ProductCreated, OrderUpdated, etc.
AGGREGATE_ID VARCHAR(36) NOT NULL, -- Record ID (PRD_ID, ORD_ID, etc.)
AGGREGATE_TYPE VARCHAR(10) NOT NULL, -- Dimension (PRD, ORD, etc.)
OPERATION VARCHAR(10) NOT NULL, -- POST, PUT, PATCH, DELETE
PAYLOAD TEXT NOT NULL, -- JSON serialized data
METADATA TEXT, -- Session context (source, centro_dett, etc.)
USER VARCHAR(255), -- Who performed the operation
CREATED_AT VARCHAR(14) NOT NULL, -- Event timestamp
PUBLISHED_AT VARCHAR(14), -- When published to RabbitMQ
STATUS VARCHAR(20) DEFAULT 'PENDING', -- PENDING, PUBLISHED, FAILED
RETRY_COUNT INT DEFAULT 0, -- Number of publish attempts
ERROR_MESSAGE TEXT -- Last error (if failed)
);
CREATE INDEX idx_outbox_status ON OUTBOX(STATUS);
CREATE INDEX idx_outbox_created_at ON OUTBOX(CREATED_AT);
CREATE INDEX idx_outbox_aggregate ON OUTBOX(AGGREGATE_TYPE, AGGREGATE_ID);
Event Creation
Write Operation with Outbox
Every write operation inserts event:
// CoreWrite DataStore.php
public function insert(string $dimension, array $data): array {
$this->pdo->beginTransaction();
try {
// 1. Insert main record
$recordId = $this->insertMainRecord($dimension, $data);
// 2. Execute cascades
$this->executeCascades($dimension, $recordId, $data);
// 3. Insert outbox event (same transaction!)
$eventId = Uuid::uuid4();
$event = [
'OUTBOX_ID' => $eventId,
'EVENT_TYPE' => $this->getEventType($dimension, 'POST'), // "ProductCreated"
'AGGREGATE_ID' => $recordId,
'AGGREGATE_TYPE' => $dimension,
'OPERATION' => 'POST',
'PAYLOAD' => json_encode($data),
'METADATA' => json_encode($this->getSessionContext()),
'USER' => $this->getCurrentUser(),
'CREATED_AT' => date('YmdHis'),
'STATUS' => 'PENDING'
];
$this->pdo->execute("INSERT INTO OUTBOX (...) VALUES (...)", $event);
// 4. Commit (data + event persisted atomically)
$this->pdo->commit();
return ['id' => $recordId, 'status' => 'success'];
} catch (\Exception $e) {
$this->pdo->rollBack(); // Both data AND event rolled back
throw $e;
}
}
Event Types
Event type conventions:
| Operation | Event Type | Example |
|---|---|---|
| POST | {Dimension}Created | ProductCreated, OrderCreated |
| PUT/PATCH | {Dimension}Updated | ProductUpdated, OrderUpdated |
| DELETE (soft) | {Dimension}Deleted | ProductDeleted, OrderDeleted |
| DELETE (force) | {Dimension}ForceDeleted | ProductForceDeleted |
Event Structure
Payload Format
ProductCreated event:
{
"OUTBOX_ID": "550e8400-e29b-41d4-a716-446655440000",
"EVENT_TYPE": "ProductCreated",
"AGGREGATE_ID": "123",
"AGGREGATE_TYPE": "PRD",
"OPERATION": "POST",
"PAYLOAD": {
"PRD_ID": "123",
"XPRD01": "Widget Pro",
"XPRD02": 99.99,
"XPRD03": "PRD-2025-001",
"XPRD05": "cat_electronics",
"XPRD06": true,
"TREC": "N"
},
"METADATA": {
"source": "productManagement",
"centro_dett": "admin",
"microservice": "adminApp",
"peso": "1",
"ambiente": "production",
"lingua": "en"
},
"USER": "user@example.com",
"CREATED_AT": "20251219160000",
"STATUS": "PENDING"
}
ProductUpdated event:
{
"OUTBOX_ID": "660f9511-f3a-42e5-b827-557766550001",
"EVENT_TYPE": "ProductUpdated",
"AGGREGATE_ID": "123",
"AGGREGATE_TYPE": "PRD",
"OPERATION": "PATCH",
"PAYLOAD": {
"PRD_ID": "123",
"XPRD02": 89.99, // Changed fields only
"TREC": "M"
},
"PREVIOUS_PAYLOAD": {
"XPRD02": 99.99 // Previous values (optional)
},
"METADATA": {
"source": "productManagement",
"centro_dett": "admin"
},
"USER": "admin@example.com",
"CREATED_AT": "20251219170000",
"STATUS": "PENDING"
}
Event Publishing
Background Publisher Process
Polls OUTBOX table and publishes to RabbitMQ:
// Background publisher (runs every second)
class OutboxPublisher {
private $pdo;
private $rabbitMQ;
public function publishPendingEvents(int $batchSize = 100): void {
// 1. Fetch pending events
$events = $this->pdo->query(
"SELECT * FROM OUTBOX
WHERE STATUS = 'PENDING'
ORDER BY CREATED_AT ASC
LIMIT :batch_size",
['batch_size' => $batchSize]
)->fetchAll();
foreach ($events as $event) {
try {
// 2. Publish to RabbitMQ
$this->rabbitMQ->publish(
exchange: 'corewrite.fanout',
routingKey: $this->getRoutingKey($event),
message: json_encode($event),
properties: [
'content_type' => 'application/json',
'delivery_mode' => 2 // Persistent
]
);
// 3. Mark as published
$this->pdo->execute(
"UPDATE OUTBOX
SET STATUS = 'PUBLISHED',
PUBLISHED_AT = :published_at
WHERE OUTBOX_ID = :id",
[
'published_at' => date('YmdHis'),
'id' => $event['OUTBOX_ID']
]
);
echo "Published event: {$event['EVENT_TYPE']} (ID: {$event['OUTBOX_ID']})\n";
} catch (\Exception $e) {
// 4. Mark as failed, retry later
$this->pdo->execute(
"UPDATE OUTBOX
SET STATUS = 'FAILED',
ERROR_MESSAGE = :error,
RETRY_COUNT = RETRY_COUNT + 1
WHERE OUTBOX_ID = :id",
[
'error' => $e->getMessage(),
'id' => $event['OUTBOX_ID']
]
);
echo "Failed to publish event: {$event['OUTBOX_ID']} - {$e->getMessage()}\n";
}
}
}
private function getRoutingKey(array $event): string {
// Routing key format: {dimension}.{operation}
$dimension = strtolower($event['AGGREGATE_TYPE']);
$operation = strtolower($event['OPERATION']);
return "{$dimension}.{$operation}";
}
}
// Run publisher continuously
while (true) {
$publisher = new OutboxPublisher($pdo, $rabbitMQ);
$publisher->publishPendingEvents(100);
sleep(1); // Poll every second
}
RabbitMQ Exchange
Fanout exchange broadcasts to all subscribers:
Exchange: corewrite.fanout
Type: fanout
Durable: true
Auto-delete: false
Subscribers:
├── search-indexer (updates Elasticsearch)
├── cache-invalidator (clears Redis cache)
├── analytics-writer (writes to analytics DB)
├── audit-logger (logs to audit trail)
└── notification-sender (sends email/push notifications)
Fanout benefits:
- All subscribers receive events
- Adding subscribers doesn't affect publisher
- Subscribers process events independently
Event Subscribers
Search Indexer Example
// search-indexer subscriber (Node.js)
const amqp = require('amqplib');
async function startSearchIndexer() {
const connection = await amqp.connect('amqp://rabbitmq:5672');
const channel = await connection.createChannel();
// Bind to fanout exchange
const exchange = 'corewrite.fanout';
await channel.assertExchange(exchange, 'fanout', { durable: true });
// Create queue for this subscriber
const queue = await channel.assertQueue('search-indexer', {
durable: true,
autoDelete: false
});
// Bind queue to exchange
await channel.bindQueue(queue.queue, exchange, '');
// Consume events
channel.consume(queue.queue, async (msg) => {
if (msg) {
const event = JSON.parse(msg.content.toString());
try {
await handleEvent(event);
channel.ack(msg); // Acknowledge successful processing
} catch (error) {
console.error('Failed to process event:', error);
channel.nack(msg, false, true); // Requeue for retry
}
}
});
console.log('Search indexer started. Waiting for events...');
}
async function handleEvent(event) {
switch (event.EVENT_TYPE) {
case 'ProductCreated':
await elasticsearch.index({
index: 'products',
id: event.AGGREGATE_ID,
body: event.PAYLOAD
});
console.log(`Indexed product: ${event.AGGREGATE_ID}`);
break;
case 'ProductUpdated':
await elasticsearch.update({
index: 'products',
id: event.AGGREGATE_ID,
body: { doc: event.PAYLOAD }
});
console.log(`Updated product index: ${event.AGGREGATE_ID}`);
break;
case 'ProductDeleted':
await elasticsearch.delete({
index: 'products',
id: event.AGGREGATE_ID
});
console.log(`Removed product from index: ${event.AGGREGATE_ID}`);
break;
}
}
startSearchIndexer();
Cache Invalidator Example
// cache-invalidator subscriber
async function handleEvent(event) {
const cacheKeys = getCacheKeys(event);
for (const key of cacheKeys) {
await redis.del(key);
console.log(`Invalidated cache: ${key}`);
}
}
function getCacheKeys(event) {
const keys = [];
const id = event.AGGREGATE_ID;
const dim = event.AGGREGATE_TYPE;
// Invalidate record cache
keys.push(`${dim}:${id}`);
// Invalidate list caches
keys.push(`${dim}:list:*`);
// Dimension-specific invalidation
if (dim === 'PRD' && event.PAYLOAD.XPRD05) {
// Invalidate category cache
keys.push(`CAT:${event.PAYLOAD.XPRD05}:products`);
}
return keys;
}
Guarantees and Properties
At-Least-Once Delivery
Event will be delivered at least once to each subscriber:
Write Operation Success
↓
Event in OUTBOX (PENDING)
↓
Publisher attempts delivery
├─ Success → STATUS='PUBLISHED'
└─ Failure → STATUS='FAILED', RETRY_COUNT++
↓
Retry with exponential backoff
↓
Eventually succeeds or manual intervention
Subscribers must be idempotent: Same event processed multiple times should have same result.
Event Ordering
Events ordered by CREATED_AT within same aggregate:
SELECT * FROM OUTBOX
WHERE AGGREGATE_TYPE = 'PRD' AND AGGREGATE_ID = '123'
ORDER BY CREATED_AT ASC;
-- Result:
-- 1. ProductCreated (20251219160000)
-- 2. ProductUpdated (20251219170000)
-- 3. ProductDeleted (20251219180000)
Across aggregates: No ordering guarantees (eventual consistency).
Idempotency
Subscribers must handle duplicate events:
async function handleProductCreated(event) {
const productId = event.AGGREGATE_ID;
// Check if already indexed
const exists = await elasticsearch.exists({
index: 'products',
id: productId
});
if (exists) {
console.log(`Product ${productId} already indexed. Skipping.`);
return; // Idempotent - no-op on duplicate
}
// Index product
await elasticsearch.index({
index: 'products',
id: productId,
body: event.PAYLOAD
});
}
Monitoring and Operations
Health Checks
Monitor outbox lag:
-- Pending events (should be near zero)
SELECT COUNT(*) as PENDING_COUNT
FROM OUTBOX
WHERE STATUS = 'PENDING';
-- Failed events (requires attention)
SELECT COUNT(*) as FAILED_COUNT
FROM OUTBOX
WHERE STATUS = 'FAILED';
-- Oldest pending event (lag indicator)
SELECT MIN(CREATED_AT) as OLDEST_PENDING
FROM OUTBOX
WHERE STATUS = 'PENDING';
Retry Failed Events
Manual retry command:
-- Retry failed events (reset to pending)
UPDATE OUTBOX
SET STATUS = 'PENDING',
ERROR_MESSAGE = NULL
WHERE STATUS = 'FAILED'
AND RETRY_COUNT < 5;
Cleanup Old Events
Archive or delete published events:
-- Delete events older than 30 days
DELETE FROM OUTBOX
WHERE STATUS = 'PUBLISHED'
AND PUBLISHED_AT < DATE_SUB(NOW(), INTERVAL 30 DAY);
Best Practices
✅ DO:
Trust outbox for event publishing:
// ✅ Good - CoreWrite handles event publishing
await createProduct(data);
// Event automatically published via outbox
Make subscribers idempotent:
// ✅ Good - check before processing
if (await exists(event.AGGREGATE_ID)) {
return; // Already processed
}
await process(event);
Monitor outbox lag:
-- ✅ Good - alert if lag > 1 minute
SELECT COUNT(*) FROM OUTBOX
WHERE STATUS = 'PENDING'
AND CREATED_AT < DATE_SUB(NOW(), INTERVAL 1 MINUTE);
❌ DON'T:
Don't try to publish events manually:
// ❌ Bad - outbox handles this
await createProduct(data);
await rabbitMQ.publish('ProductCreated', data); // Duplicate!
Don't assume event ordering across aggregates:
// ❌ Bad - no ordering guarantee
// Order 1 created → Order 2 created
// But Order 2 might be processed first
Don't skip idempotency checks:
// ❌ Bad - duplicate processing
async function handleEvent(event) {
await process(event); // No idempotency check!
}
Summary
- ✅ Outbox pattern ensures reliable event publishing
- ✅ Events persisted in OUTBOX table (same transaction)
- ✅ Background process publishes to RabbitMQ
- ✅ At-least-once delivery guarantee
- ✅ Fanout exchange broadcasts to all subscribers
- ✅ Subscribers must be idempotent
- ✅ Event ordering within same aggregate
- ✅ Eventual consistency across microservices
Key Takeaways:
- Every write operation creates outbox event
- Event and data persisted atomically (ACID)
- Background publisher polls OUTBOX and publishes to RabbitMQ
- Subscribers process events independently
- Idempotency required (duplicate events possible)
- Monitor OUTBOX for lag and failures
- Cleanup old published events periodically
Event Flow:
Write Operation → OUTBOX (PENDING) → Publisher → RabbitMQ → Subscribers
↓ ↓
Data persisted Event published
(transactional) (eventually)
Next: Counter Management →
Related Concepts
- Transactions - ACID guarantees
- Create Operations - Event creation
- Update Operations - Update events
- Delete Operations - Delete events