Outbox Subscribers
Overview
Outbox subscribers are microservices that consume events from the RabbitMQ fanout exchange, enabling event-driven architecture and eventual consistency across the Q01 platform.
Common Subscribers:
- search-indexer - Updates Elasticsearch indices
- cache-invalidator - Clears Redis cache
- analytics-writer - Writes to data warehouse
- notification-sender - Sends emails/push notifications
- webhook-dispatcher - Calls external webhooks
- audit-logger - Writes to audit log
Benefits:
- ✅ Decoupled microservices
- ✅ Asynchronous processing
- ✅ Scalable event processing
- ✅ At-least-once delivery
- ✅ Independent failure handling
Event Flow
Write Operation (POST/PUT/PATCH/DELETE)
↓
1. Update TB_ANAG_{DIM}00
2. Insert OUTBOX event
3. COMMIT transaction
↓
Background Publisher (polls OUTBOX)
↓
RabbitMQ Fanout Exchange (corewrite.fanout)
↓
Subscribers (all receive same event):
├── search-indexer
├── cache-invalidator
├── analytics-writer
├── notification-sender
└── webhook-dispatcher
Subscriber Implementation
Node.js Subscriber
const amqp = require('amqplib');
class OutboxSubscriber {
constructor(queueName, exchangeName = 'corewrite.fanout') {
this.queueName = queueName;
this.exchangeName = exchangeName;
this.connection = null;
this.channel = null;
}
async connect() {
// Connect to RabbitMQ
this.connection = await amqp.connect(process.env.RABBITMQ_URL);
this.channel = await this.connection.createChannel();
// Assert exchange
await this.channel.assertExchange(this.exchangeName, 'fanout', {
durable: true
});
// Create queue
const queue = await this.channel.assertQueue(this.queueName, {
durable: true,
autoDelete: false
});
// Bind queue to exchange
await this.channel.bindQueue(queue.queue, this.exchangeName, '');
// Set prefetch (process one message at a time)
await this.channel.prefetch(1);
console.log(`[${this.queueName}] Waiting for events...`);
}
async subscribe(handler) {
await this.channel.consume(this.queueName, async (msg) => {
if (!msg) return;
try {
const event = JSON.parse(msg.content.toString());
console.log(`[${this.queueName}] Processing event:`, event.EVENT_TYPE);
// Handle event (idempotent)
await handler(event);
// Acknowledge message
this.channel.ack(msg);
console.log(`[${this.queueName}] Event processed successfully`);
} catch (error) {
console.error(`[${this.queueName}] Error processing event:`, error);
// Reject and requeue (will retry)
this.channel.nack(msg, false, true);
}
});
}
async close() {
await this.channel.close();
await this.connection.close();
}
}
// Usage
const subscriber = new OutboxSubscriber('search-indexer');
await subscriber.connect();
await subscriber.subscribe(async (event) => {
await handleSearchIndexEvent(event);
});
Search Indexer Example
const { Client } = require('@elastic/elasticsearch');
class SearchIndexer {
constructor() {
this.elasticsearch = new Client({
node: process.env.ELASTICSEARCH_URL
});
}
async handleEvent(event) {
const { EVENT_TYPE, AGGREGATE_TYPE, AGGREGATE_ID, PAYLOAD } = event;
// Check idempotency (already processed?)
if (await this.isProcessed(event.OUTBOX_ID)) {
console.log('Event already processed, skipping');
return;
}
switch (EVENT_TYPE) {
case 'ProductCreated':
await this.indexProduct(PAYLOAD);
break;
case 'ProductUpdated':
await this.updateProduct(AGGREGATE_ID, PAYLOAD);
break;
case 'ProductDeleted':
await this.deleteProduct(AGGREGATE_ID);
break;
default:
console.log(`Unknown event type: ${EVENT_TYPE}`);
}
// Mark as processed
await this.markProcessed(event.OUTBOX_ID);
}
async indexProduct(product) {
await this.elasticsearch.index({
index: 'products',
id: product.PRD_ID,
body: {
name: product.XPRD01,
price: product.XPRD02,
code: product.XPRD03,
category: product.XPRD05,
active: product.XPRD06,
created_at: product.CREATED_AT,
updated_at: product.UPDATED_AT
}
});
console.log(`Indexed product: ${product.PRD_ID}`);
}
async updateProduct(productId, updates) {
await this.elasticsearch.update({
index: 'products',
id: productId,
body: {
doc: {
name: updates.XPRD01,
price: updates.XPRD02,
updated_at: updates.UPDATED_AT
}
}
});
console.log(`Updated product: ${productId}`);
}
async deleteProduct(productId) {
await this.elasticsearch.delete({
index: 'products',
id: productId
});
console.log(`Deleted product: ${productId}`);
}
async isProcessed(outboxId) {
// Check Redis for processed events
const redis = require('redis').createClient();
const exists = await redis.exists(`processed:${outboxId}`);
return exists === 1;
}
async markProcessed(outboxId) {
// Store in Redis with 30-day TTL
const redis = require('redis').createClient();
await redis.set(`processed:${outboxId}`, '1', 'EX', 2592000);
}
}
// Start subscriber
const indexer = new SearchIndexer();
const subscriber = new OutboxSubscriber('search-indexer');
await subscriber.connect();
await subscriber.subscribe((event) => indexer.handleEvent(event));
Cache Invalidator Example
const Redis = require('ioredis');
class CacheInvalidator {
constructor() {
this.redis = new Redis(process.env.REDIS_URL);
}
async handleEvent(event) {
const { EVENT_TYPE, AGGREGATE_TYPE, AGGREGATE_ID, PAYLOAD } = event;
const keys = this.getCacheKeys(AGGREGATE_TYPE, AGGREGATE_ID, PAYLOAD);
for (const key of keys) {
await this.redis.del(key);
console.log(`Invalidated cache: ${key}`);
}
}
getCacheKeys(dimension, recordId, payload) {
const keys = [];
// Invalidate specific record cache
keys.push(`${dimension}:${recordId}`);
// Invalidate list caches
keys.push(`${dimension}:list:*`);
// Dimension-specific invalidation
if (dimension === 'PRD' && payload.XPRD05) {
// Invalidate category cache
keys.push(`CAT:${payload.XPRD05}:products`);
}
if (dimension === 'ORD' && payload.XORD_CUSTOMER_ID) {
// Invalidate customer orders cache
keys.push(`CUST:${payload.XORD_CUSTOMER_ID}:orders`);
}
return keys;
}
}
// Start subscriber
const invalidator = new CacheInvalidator();
const subscriber = new OutboxSubscriber('cache-invalidator');
await subscriber.connect();
await subscriber.subscribe((event) => invalidator.handleEvent(event));
Idempotency
Why Idempotency Matters
At-least-once delivery means events may be processed multiple times:
- RabbitMQ redelivery on subscriber crash
- Network failures
- Manual replay
Idempotent processing ensures:
- ✅ Same result regardless of repetition
- ✅ No duplicate data
- ✅ Safe reprocessing
Idempotency Strategies
1. Check if already processed:
async function handleEvent(event) {
// Check if event already processed
if (await isProcessed(event.OUTBOX_ID)) {
console.log('Event already processed, skipping');
return;
}
// Process event
await processEvent(event);
// Mark as processed
await markProcessed(event.OUTBOX_ID);
}
async function isProcessed(outboxId) {
const redis = new Redis();
return await redis.exists(`processed:${outboxId}`) === 1;
}
async function markProcessed(outboxId) {
const redis = new Redis();
await redis.set(`processed:${outboxId}`, Date.now(), 'EX', 2592000); // 30 days
}
2. Use unique constraints:
async function indexProduct(product) {
// Elasticsearch uses document ID as unique constraint
await elasticsearch.index({
index: 'products',
id: product.PRD_ID, // Unique ID
body: product
});
// Multiple calls with same ID → same result (idempotent)
}
3. Check-and-set patterns:
async function updateCounter(event) {
const key = `counter:${event.AGGREGATE_ID}`;
// Get current value
const current = await redis.get(key) || 0;
// Only increment if not already incremented for this event
if (current < event.PAYLOAD.counter_value) {
await redis.set(key, event.PAYLOAD.counter_value);
}
}
Error Handling
Retry with Exponential Backoff
class ResilientSubscriber extends OutboxSubscriber {
async subscribe(handler, maxRetries = 3) {
await this.channel.consume(this.queueName, async (msg) => {
if (!msg) return;
let retryCount = 0;
let success = false;
while (retryCount < maxRetries && !success) {
try {
const event = JSON.parse(msg.content.toString());
await handler(event);
this.channel.ack(msg);
success = true;
} catch (error) {
retryCount++;
console.error(
`[${this.queueName}] Error (attempt ${retryCount}/${maxRetries}):`,
error.message
);
if (retryCount >= maxRetries) {
// Max retries exceeded - move to dead letter queue
await this.sendToDeadLetterQueue(msg);
this.channel.ack(msg);
} else {
// Wait before retry (exponential backoff)
await this.sleep(Math.pow(2, retryCount) * 1000);
}
}
}
});
}
async sendToDeadLetterQueue(msg) {
const dlqExchange = 'corewrite.dlq';
const event = JSON.parse(msg.content.toString());
await this.channel.assertExchange(dlqExchange, 'direct', { durable: true });
await this.channel.publish(
dlqExchange,
this.queueName,
msg.content,
{
persistent: true,
headers: {
'x-original-queue': this.queueName,
'x-failed-at': new Date().toISOString(),
'x-error': 'Max retries exceeded'
}
}
);
console.log(`[${this.queueName}] Sent to DLQ:`, event.OUTBOX_ID);
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Dead Letter Queue Monitoring
class DeadLetterQueueMonitor {
constructor() {
this.subscriber = new OutboxSubscriber('corewrite.dlq');
}
async start() {
await this.subscriber.connect();
await this.subscriber.subscribe(async (event) => {
// Alert on DLQ messages
await this.sendAlert({
title: 'Event Processing Failed',
event_id: event.OUTBOX_ID,
event_type: event.EVENT_TYPE,
aggregate: `${event.AGGREGATE_TYPE}/${event.AGGREGATE_ID}`,
timestamp: new Date().toISOString()
});
// Log to monitoring system
console.error('[DLQ] Failed event:', event);
});
}
async sendAlert(data) {
// Send to Slack, email, PagerDuty, etc.
await fetch(process.env.SLACK_WEBHOOK_URL, {
method: 'POST',
body: JSON.stringify({
text: `⚠️ ${data.title}\nEvent: ${data.event_type}\nID: ${data.event_id}`
})
});
}
}
Scaling Subscribers
Multiple Instances
# docker-compose.yml
version: '3'
services:
search-indexer-1:
image: search-indexer:latest
environment:
RABBITMQ_URL: amqp://rabbitmq:5672
ELASTICSEARCH_URL: http://elasticsearch:9200
restart: always
search-indexer-2:
image: search-indexer:latest
environment:
RABBITMQ_URL: amqp://rabbitmq:5672
ELASTICSEARCH_URL: http://elasticsearch:9200
restart: always
search-indexer-3:
image: search-indexer:latest
environment:
RABBITMQ_URL: amqp://rabbitmq:5672
ELASTICSEARCH_URL: http://elasticsearch:9200
restart: always
Benefits:
- ✅ Parallel processing (faster throughput)
- ✅ High availability (if one crashes, others continue)
- ✅ Load balancing (RabbitMQ distributes messages)
Consumer Acknowledgment
// Acknowledge only after successful processing
await this.channel.consume(queueName, async (msg) => {
try {
await processEvent(msg);
this.channel.ack(msg); // ✅ Success
} catch (error) {
this.channel.nack(msg, false, true); // ❌ Reject and requeue
}
});
Monitoring and Metrics
Prometheus Metrics
const promClient = require('prom-client');
// Metrics
const eventsProcessed = new promClient.Counter({
name: 'events_processed_total',
help: 'Total events processed',
labelNames: ['event_type', 'status']
});
const processingDuration = new promClient.Histogram({
name: 'event_processing_duration_seconds',
help: 'Event processing duration',
labelNames: ['event_type']
});
// Track metrics
async function handleEvent(event) {
const timer = processingDuration.startTimer({ event_type: event.EVENT_TYPE });
try {
await processEvent(event);
eventsProcessed.inc({ event_type: event.EVENT_TYPE, status: 'success' });
timer();
} catch (error) {
eventsProcessed.inc({ event_type: event.EVENT_TYPE, status: 'error' });
timer();
throw error;
}
}
// Expose metrics endpoint
const express = require('express');
const app = express();
app.get('/metrics', async (req, res) => {
res.set('Content-Type', promClient.register.contentType);
res.end(await promClient.register.metrics());
});
app.listen(9090);
Best Practices
✅ DO:
Implement idempotency:
// ✅ Good - check if already processed
if (await isProcessed(event.OUTBOX_ID)) {
return;
}
await process(event);
await markProcessed(event.OUTBOX_ID);
Handle errors gracefully:
// ✅ Good - retry with backoff
try {
await process(event);
} catch (error) {
if (retryCount < maxRetries) {
await sleep(Math.pow(2, retryCount) * 1000);
return retry();
} else {
await sendToDLQ(event);
}
}
Monitor subscriber health:
// ✅ Good - expose health endpoint
app.get('/health', (req, res) => {
res.json({ status: 'healthy', queue: queueName });
});
❌ DON'T:
Don't assume exactly-once delivery:
// ❌ Bad - not idempotent
await redis.incr(`counter:${event.AGGREGATE_ID}`);
// ✅ Good - idempotent
await redis.set(`counter:${event.AGGREGATE_ID}`, event.PAYLOAD.count);
Don't block on external calls:
// ❌ Bad - synchronous external call
await externalAPI.notify(event); // Blocks queue
// ✅ Good - async with timeout
Promise.race([
externalAPI.notify(event),
timeout(5000)
]).catch(error => log(error));
Summary
- ✅ Subscribers consume events from RabbitMQ fanout exchange
- ✅ Common patterns: search indexing, cache invalidation, notifications
- ✅ Idempotency required (at-least-once delivery)
- ✅ Error handling with retry and dead letter queue
- ✅ Scale with multiple subscriber instances
- ✅ Monitor with Prometheus metrics
Key Takeaways:
- Implement idempotent event handlers
- Use retry with exponential backoff
- Move failed events to dead letter queue
- Scale horizontally with multiple instances
- Monitor processing metrics
- Acknowledge only after successful processing
- Handle failures gracefully
Related Concepts
- Outbox Pattern - Event sourcing
- Advanced Topics - Overview
- Webhook Integration - External notifications