Skip to main content

Outbox Pattern

Overview

The Outbox Pattern ensures reliable event publishing by writing events to a database table (OUTBOX) within the same transaction as the main operation. A background process then publishes events to RabbitMQ, guaranteeing at-least-once delivery.

Benefits:

  • ✅ At-least-once delivery guarantee
  • ✅ Events never lost (transactional persistence)
  • ✅ Eventual consistency across microservices
  • ✅ Event replay for debugging
  • ✅ Complete audit trail

Architecture:

Write Operation (POST/PUT/PATCH/DELETE)

1. Update TB_ANAG_{DIM}00 (data table)
2. Insert OUTBOX (event table)
3. COMMIT transaction (atomic)

Background Publisher Process

Publish to RabbitMQ Fanout Exchange

Subscribers (search-indexer, cache-invalidator, analytics, etc.)

Outbox Table

Schema

OUTBOX table stores all events:

CREATE TABLE OUTBOX (
OUTBOX_ID VARCHAR(36) PRIMARY KEY, -- UUID
EVENT_TYPE VARCHAR(100) NOT NULL, -- ProductCreated, OrderUpdated, etc.
AGGREGATE_ID VARCHAR(36) NOT NULL, -- Record ID (PRD_ID, ORD_ID, etc.)
AGGREGATE_TYPE VARCHAR(10) NOT NULL, -- Dimension (PRD, ORD, etc.)
OPERATION VARCHAR(10) NOT NULL, -- POST, PUT, PATCH, DELETE
PAYLOAD TEXT NOT NULL, -- JSON serialized data
METADATA TEXT, -- Session context (source, centro_dett, etc.)
USER VARCHAR(255), -- Who performed the operation
CREATED_AT VARCHAR(14) NOT NULL, -- Event timestamp
PUBLISHED_AT VARCHAR(14), -- When published to RabbitMQ
STATUS VARCHAR(20) DEFAULT 'PENDING', -- PENDING, PUBLISHED, FAILED
RETRY_COUNT INT DEFAULT 0, -- Number of publish attempts
ERROR_MESSAGE TEXT -- Last error (if failed)
);

CREATE INDEX idx_outbox_status ON OUTBOX(STATUS);
CREATE INDEX idx_outbox_created_at ON OUTBOX(CREATED_AT);
CREATE INDEX idx_outbox_aggregate ON OUTBOX(AGGREGATE_TYPE, AGGREGATE_ID);

Event Creation

Write Operation with Outbox

Every write operation inserts event:

// CoreWrite DataStore.php
public function insert(string $dimension, array $data): array {
$this->pdo->beginTransaction();

try {
// 1. Insert main record
$recordId = $this->insertMainRecord($dimension, $data);

// 2. Execute cascades
$this->executeCascades($dimension, $recordId, $data);

// 3. Insert outbox event (same transaction!)
$eventId = Uuid::uuid4();
$event = [
'OUTBOX_ID' => $eventId,
'EVENT_TYPE' => $this->getEventType($dimension, 'POST'), // "ProductCreated"
'AGGREGATE_ID' => $recordId,
'AGGREGATE_TYPE' => $dimension,
'OPERATION' => 'POST',
'PAYLOAD' => json_encode($data),
'METADATA' => json_encode($this->getSessionContext()),
'USER' => $this->getCurrentUser(),
'CREATED_AT' => date('YmdHis'),
'STATUS' => 'PENDING'
];

$this->pdo->execute("INSERT INTO OUTBOX (...) VALUES (...)", $event);

// 4. Commit (data + event persisted atomically)
$this->pdo->commit();

return ['id' => $recordId, 'status' => 'success'];
} catch (\Exception $e) {
$this->pdo->rollBack(); // Both data AND event rolled back
throw $e;
}
}

Event Types

Event type conventions:

OperationEvent TypeExample
POST{Dimension}CreatedProductCreated, OrderCreated
PUT/PATCH{Dimension}UpdatedProductUpdated, OrderUpdated
DELETE (soft){Dimension}DeletedProductDeleted, OrderDeleted
DELETE (force){Dimension}ForceDeletedProductForceDeleted

Event Structure

Payload Format

ProductCreated event:

{
"OUTBOX_ID": "550e8400-e29b-41d4-a716-446655440000",
"EVENT_TYPE": "ProductCreated",
"AGGREGATE_ID": "123",
"AGGREGATE_TYPE": "PRD",
"OPERATION": "POST",
"PAYLOAD": {
"PRD_ID": "123",
"XPRD01": "Widget Pro",
"XPRD02": 99.99,
"XPRD03": "PRD-2025-001",
"XPRD05": "cat_electronics",
"XPRD06": true,
"TREC": "N"
},
"METADATA": {
"source": "productManagement",
"centro_dett": "admin",
"microservice": "adminApp",
"peso": "1",
"ambiente": "production",
"lingua": "en"
},
"USER": "user@example.com",
"CREATED_AT": "20251219160000",
"STATUS": "PENDING"
}

ProductUpdated event:

{
"OUTBOX_ID": "660f9511-f3a-42e5-b827-557766550001",
"EVENT_TYPE": "ProductUpdated",
"AGGREGATE_ID": "123",
"AGGREGATE_TYPE": "PRD",
"OPERATION": "PATCH",
"PAYLOAD": {
"PRD_ID": "123",
"XPRD02": 89.99, // Changed fields only
"TREC": "M"
},
"PREVIOUS_PAYLOAD": {
"XPRD02": 99.99 // Previous values (optional)
},
"METADATA": {
"source": "productManagement",
"centro_dett": "admin"
},
"USER": "admin@example.com",
"CREATED_AT": "20251219170000",
"STATUS": "PENDING"
}

Event Publishing

Background Publisher Process

Polls OUTBOX table and publishes to RabbitMQ:

// Background publisher (runs every second)
class OutboxPublisher {
private $pdo;
private $rabbitMQ;

public function publishPendingEvents(int $batchSize = 100): void {
// 1. Fetch pending events
$events = $this->pdo->query(
"SELECT * FROM OUTBOX
WHERE STATUS = 'PENDING'
ORDER BY CREATED_AT ASC
LIMIT :batch_size",
['batch_size' => $batchSize]
)->fetchAll();

foreach ($events as $event) {
try {
// 2. Publish to RabbitMQ
$this->rabbitMQ->publish(
exchange: 'corewrite.fanout',
routingKey: $this->getRoutingKey($event),
message: json_encode($event),
properties: [
'content_type' => 'application/json',
'delivery_mode' => 2 // Persistent
]
);

// 3. Mark as published
$this->pdo->execute(
"UPDATE OUTBOX
SET STATUS = 'PUBLISHED',
PUBLISHED_AT = :published_at
WHERE OUTBOX_ID = :id",
[
'published_at' => date('YmdHis'),
'id' => $event['OUTBOX_ID']
]
);

echo "Published event: {$event['EVENT_TYPE']} (ID: {$event['OUTBOX_ID']})\n";
} catch (\Exception $e) {
// 4. Mark as failed, retry later
$this->pdo->execute(
"UPDATE OUTBOX
SET STATUS = 'FAILED',
ERROR_MESSAGE = :error,
RETRY_COUNT = RETRY_COUNT + 1
WHERE OUTBOX_ID = :id",
[
'error' => $e->getMessage(),
'id' => $event['OUTBOX_ID']
]
);

echo "Failed to publish event: {$event['OUTBOX_ID']} - {$e->getMessage()}\n";
}
}
}

private function getRoutingKey(array $event): string {
// Routing key format: {dimension}.{operation}
$dimension = strtolower($event['AGGREGATE_TYPE']);
$operation = strtolower($event['OPERATION']);
return "{$dimension}.{$operation}";
}
}

// Run publisher continuously
while (true) {
$publisher = new OutboxPublisher($pdo, $rabbitMQ);
$publisher->publishPendingEvents(100);
sleep(1); // Poll every second
}

RabbitMQ Exchange

Fanout exchange broadcasts to all subscribers:

Exchange: corewrite.fanout
Type: fanout
Durable: true
Auto-delete: false

Subscribers:
├── search-indexer (updates Elasticsearch)
├── cache-invalidator (clears Redis cache)
├── analytics-writer (writes to analytics DB)
├── audit-logger (logs to audit trail)
└── notification-sender (sends email/push notifications)

Fanout benefits:

  • All subscribers receive events
  • Adding subscribers doesn't affect publisher
  • Subscribers process events independently

Event Subscribers

Search Indexer Example

// search-indexer subscriber (Node.js)
const amqp = require('amqplib');

async function startSearchIndexer() {
const connection = await amqp.connect('amqp://rabbitmq:5672');
const channel = await connection.createChannel();

// Bind to fanout exchange
const exchange = 'corewrite.fanout';
await channel.assertExchange(exchange, 'fanout', { durable: true });

// Create queue for this subscriber
const queue = await channel.assertQueue('search-indexer', {
durable: true,
autoDelete: false
});

// Bind queue to exchange
await channel.bindQueue(queue.queue, exchange, '');

// Consume events
channel.consume(queue.queue, async (msg) => {
if (msg) {
const event = JSON.parse(msg.content.toString());

try {
await handleEvent(event);
channel.ack(msg); // Acknowledge successful processing
} catch (error) {
console.error('Failed to process event:', error);
channel.nack(msg, false, true); // Requeue for retry
}
}
});

console.log('Search indexer started. Waiting for events...');
}

async function handleEvent(event) {
switch (event.EVENT_TYPE) {
case 'ProductCreated':
await elasticsearch.index({
index: 'products',
id: event.AGGREGATE_ID,
body: event.PAYLOAD
});
console.log(`Indexed product: ${event.AGGREGATE_ID}`);
break;

case 'ProductUpdated':
await elasticsearch.update({
index: 'products',
id: event.AGGREGATE_ID,
body: { doc: event.PAYLOAD }
});
console.log(`Updated product index: ${event.AGGREGATE_ID}`);
break;

case 'ProductDeleted':
await elasticsearch.delete({
index: 'products',
id: event.AGGREGATE_ID
});
console.log(`Removed product from index: ${event.AGGREGATE_ID}`);
break;
}
}

startSearchIndexer();

Cache Invalidator Example

// cache-invalidator subscriber
async function handleEvent(event) {
const cacheKeys = getCacheKeys(event);

for (const key of cacheKeys) {
await redis.del(key);
console.log(`Invalidated cache: ${key}`);
}
}

function getCacheKeys(event) {
const keys = [];
const id = event.AGGREGATE_ID;
const dim = event.AGGREGATE_TYPE;

// Invalidate record cache
keys.push(`${dim}:${id}`);

// Invalidate list caches
keys.push(`${dim}:list:*`);

// Dimension-specific invalidation
if (dim === 'PRD' && event.PAYLOAD.XPRD05) {
// Invalidate category cache
keys.push(`CAT:${event.PAYLOAD.XPRD05}:products`);
}

return keys;
}

Guarantees and Properties

At-Least-Once Delivery

Event will be delivered at least once to each subscriber:

Write Operation Success

Event in OUTBOX (PENDING)

Publisher attempts delivery
├─ Success → STATUS='PUBLISHED'
└─ Failure → STATUS='FAILED', RETRY_COUNT++

Retry with exponential backoff

Eventually succeeds or manual intervention

Subscribers must be idempotent: Same event processed multiple times should have same result.

Event Ordering

Events ordered by CREATED_AT within same aggregate:

SELECT * FROM OUTBOX
WHERE AGGREGATE_TYPE = 'PRD' AND AGGREGATE_ID = '123'
ORDER BY CREATED_AT ASC;

-- Result:
-- 1. ProductCreated (20251219160000)
-- 2. ProductUpdated (20251219170000)
-- 3. ProductDeleted (20251219180000)

Across aggregates: No ordering guarantees (eventual consistency).

Idempotency

Subscribers must handle duplicate events:

async function handleProductCreated(event) {
const productId = event.AGGREGATE_ID;

// Check if already indexed
const exists = await elasticsearch.exists({
index: 'products',
id: productId
});

if (exists) {
console.log(`Product ${productId} already indexed. Skipping.`);
return; // Idempotent - no-op on duplicate
}

// Index product
await elasticsearch.index({
index: 'products',
id: productId,
body: event.PAYLOAD
});
}

Monitoring and Operations

Health Checks

Monitor outbox lag:

-- Pending events (should be near zero)
SELECT COUNT(*) as PENDING_COUNT
FROM OUTBOX
WHERE STATUS = 'PENDING';

-- Failed events (requires attention)
SELECT COUNT(*) as FAILED_COUNT
FROM OUTBOX
WHERE STATUS = 'FAILED';

-- Oldest pending event (lag indicator)
SELECT MIN(CREATED_AT) as OLDEST_PENDING
FROM OUTBOX
WHERE STATUS = 'PENDING';

Retry Failed Events

Manual retry command:

-- Retry failed events (reset to pending)
UPDATE OUTBOX
SET STATUS = 'PENDING',
ERROR_MESSAGE = NULL
WHERE STATUS = 'FAILED'
AND RETRY_COUNT < 5;

Cleanup Old Events

Archive or delete published events:

-- Delete events older than 30 days
DELETE FROM OUTBOX
WHERE STATUS = 'PUBLISHED'
AND PUBLISHED_AT < DATE_SUB(NOW(), INTERVAL 30 DAY);

Best Practices

✅ DO:

Trust outbox for event publishing:

// ✅ Good - CoreWrite handles event publishing
await createProduct(data);
// Event automatically published via outbox

Make subscribers idempotent:

// ✅ Good - check before processing
if (await exists(event.AGGREGATE_ID)) {
return; // Already processed
}
await process(event);

Monitor outbox lag:

-- ✅ Good - alert if lag > 1 minute
SELECT COUNT(*) FROM OUTBOX
WHERE STATUS = 'PENDING'
AND CREATED_AT < DATE_SUB(NOW(), INTERVAL 1 MINUTE);

❌ DON'T:

Don't try to publish events manually:

// ❌ Bad - outbox handles this
await createProduct(data);
await rabbitMQ.publish('ProductCreated', data); // Duplicate!

Don't assume event ordering across aggregates:

// ❌ Bad - no ordering guarantee
// Order 1 created → Order 2 created
// But Order 2 might be processed first

Don't skip idempotency checks:

// ❌ Bad - duplicate processing
async function handleEvent(event) {
await process(event); // No idempotency check!
}

Summary

  • ✅ Outbox pattern ensures reliable event publishing
  • ✅ Events persisted in OUTBOX table (same transaction)
  • ✅ Background process publishes to RabbitMQ
  • ✅ At-least-once delivery guarantee
  • ✅ Fanout exchange broadcasts to all subscribers
  • ✅ Subscribers must be idempotent
  • ✅ Event ordering within same aggregate
  • ✅ Eventual consistency across microservices

Key Takeaways:

  1. Every write operation creates outbox event
  2. Event and data persisted atomically (ACID)
  3. Background publisher polls OUTBOX and publishes to RabbitMQ
  4. Subscribers process events independently
  5. Idempotency required (duplicate events possible)
  6. Monitor OUTBOX for lag and failures
  7. Cleanup old published events periodically

Event Flow:

Write Operation → OUTBOX (PENDING) → Publisher → RabbitMQ → Subscribers
↓ ↓
Data persisted Event published
(transactional) (eventually)

Next: Counter Management →