Skip to main content

Outbox Subscribers

Overview

Outbox subscribers are microservices that consume events from the RabbitMQ fanout exchange, enabling event-driven architecture and eventual consistency across the Q01 platform.

Common Subscribers:

  • search-indexer - Updates Elasticsearch indices
  • cache-invalidator - Clears Redis cache
  • analytics-writer - Writes to data warehouse
  • notification-sender - Sends emails/push notifications
  • webhook-dispatcher - Calls external webhooks
  • audit-logger - Writes to audit log

Benefits:

  • ✅ Decoupled microservices
  • ✅ Asynchronous processing
  • ✅ Scalable event processing
  • ✅ At-least-once delivery
  • ✅ Independent failure handling

Event Flow

Write Operation (POST/PUT/PATCH/DELETE)

1. Update TB_ANAG_{DIM}00
2. Insert OUTBOX event
3. COMMIT transaction

Background Publisher (polls OUTBOX)

RabbitMQ Fanout Exchange (corewrite.fanout)

Subscribers (all receive same event):
├── search-indexer
├── cache-invalidator
├── analytics-writer
├── notification-sender
└── webhook-dispatcher

Subscriber Implementation

Node.js Subscriber

const amqp = require('amqplib');

class OutboxSubscriber {
constructor(queueName, exchangeName = 'corewrite.fanout') {
this.queueName = queueName;
this.exchangeName = exchangeName;
this.connection = null;
this.channel = null;
}

async connect() {
// Connect to RabbitMQ
this.connection = await amqp.connect(process.env.RABBITMQ_URL);
this.channel = await this.connection.createChannel();

// Assert exchange
await this.channel.assertExchange(this.exchangeName, 'fanout', {
durable: true
});

// Create queue
const queue = await this.channel.assertQueue(this.queueName, {
durable: true,
autoDelete: false
});

// Bind queue to exchange
await this.channel.bindQueue(queue.queue, this.exchangeName, '');

// Set prefetch (process one message at a time)
await this.channel.prefetch(1);

console.log(`[${this.queueName}] Waiting for events...`);
}

async subscribe(handler) {
await this.channel.consume(this.queueName, async (msg) => {
if (!msg) return;

try {
const event = JSON.parse(msg.content.toString());

console.log(`[${this.queueName}] Processing event:`, event.EVENT_TYPE);

// Handle event (idempotent)
await handler(event);

// Acknowledge message
this.channel.ack(msg);

console.log(`[${this.queueName}] Event processed successfully`);
} catch (error) {
console.error(`[${this.queueName}] Error processing event:`, error);

// Reject and requeue (will retry)
this.channel.nack(msg, false, true);
}
});
}

async close() {
await this.channel.close();
await this.connection.close();
}
}

// Usage
const subscriber = new OutboxSubscriber('search-indexer');
await subscriber.connect();

await subscriber.subscribe(async (event) => {
await handleSearchIndexEvent(event);
});

Search Indexer Example

const { Client } = require('@elastic/elasticsearch');

class SearchIndexer {
constructor() {
this.elasticsearch = new Client({
node: process.env.ELASTICSEARCH_URL
});
}

async handleEvent(event) {
const { EVENT_TYPE, AGGREGATE_TYPE, AGGREGATE_ID, PAYLOAD } = event;

// Check idempotency (already processed?)
if (await this.isProcessed(event.OUTBOX_ID)) {
console.log('Event already processed, skipping');
return;
}

switch (EVENT_TYPE) {
case 'ProductCreated':
await this.indexProduct(PAYLOAD);
break;

case 'ProductUpdated':
await this.updateProduct(AGGREGATE_ID, PAYLOAD);
break;

case 'ProductDeleted':
await this.deleteProduct(AGGREGATE_ID);
break;

default:
console.log(`Unknown event type: ${EVENT_TYPE}`);
}

// Mark as processed
await this.markProcessed(event.OUTBOX_ID);
}

async indexProduct(product) {
await this.elasticsearch.index({
index: 'products',
id: product.PRD_ID,
body: {
name: product.XPRD01,
price: product.XPRD02,
code: product.XPRD03,
category: product.XPRD05,
active: product.XPRD06,
created_at: product.CREATED_AT,
updated_at: product.UPDATED_AT
}
});

console.log(`Indexed product: ${product.PRD_ID}`);
}

async updateProduct(productId, updates) {
await this.elasticsearch.update({
index: 'products',
id: productId,
body: {
doc: {
name: updates.XPRD01,
price: updates.XPRD02,
updated_at: updates.UPDATED_AT
}
}
});

console.log(`Updated product: ${productId}`);
}

async deleteProduct(productId) {
await this.elasticsearch.delete({
index: 'products',
id: productId
});

console.log(`Deleted product: ${productId}`);
}

async isProcessed(outboxId) {
// Check Redis for processed events
const redis = require('redis').createClient();
const exists = await redis.exists(`processed:${outboxId}`);
return exists === 1;
}

async markProcessed(outboxId) {
// Store in Redis with 30-day TTL
const redis = require('redis').createClient();
await redis.set(`processed:${outboxId}`, '1', 'EX', 2592000);
}
}

// Start subscriber
const indexer = new SearchIndexer();
const subscriber = new OutboxSubscriber('search-indexer');

await subscriber.connect();
await subscriber.subscribe((event) => indexer.handleEvent(event));

Cache Invalidator Example

const Redis = require('ioredis');

class CacheInvalidator {
constructor() {
this.redis = new Redis(process.env.REDIS_URL);
}

async handleEvent(event) {
const { EVENT_TYPE, AGGREGATE_TYPE, AGGREGATE_ID, PAYLOAD } = event;

const keys = this.getCacheKeys(AGGREGATE_TYPE, AGGREGATE_ID, PAYLOAD);

for (const key of keys) {
await this.redis.del(key);
console.log(`Invalidated cache: ${key}`);
}
}

getCacheKeys(dimension, recordId, payload) {
const keys = [];

// Invalidate specific record cache
keys.push(`${dimension}:${recordId}`);

// Invalidate list caches
keys.push(`${dimension}:list:*`);

// Dimension-specific invalidation
if (dimension === 'PRD' && payload.XPRD05) {
// Invalidate category cache
keys.push(`CAT:${payload.XPRD05}:products`);
}

if (dimension === 'ORD' && payload.XORD_CUSTOMER_ID) {
// Invalidate customer orders cache
keys.push(`CUST:${payload.XORD_CUSTOMER_ID}:orders`);
}

return keys;
}
}

// Start subscriber
const invalidator = new CacheInvalidator();
const subscriber = new OutboxSubscriber('cache-invalidator');

await subscriber.connect();
await subscriber.subscribe((event) => invalidator.handleEvent(event));

Idempotency

Why Idempotency Matters

At-least-once delivery means events may be processed multiple times:

  • RabbitMQ redelivery on subscriber crash
  • Network failures
  • Manual replay

Idempotent processing ensures:

  • ✅ Same result regardless of repetition
  • ✅ No duplicate data
  • ✅ Safe reprocessing

Idempotency Strategies

1. Check if already processed:

async function handleEvent(event) {
// Check if event already processed
if (await isProcessed(event.OUTBOX_ID)) {
console.log('Event already processed, skipping');
return;
}

// Process event
await processEvent(event);

// Mark as processed
await markProcessed(event.OUTBOX_ID);
}

async function isProcessed(outboxId) {
const redis = new Redis();
return await redis.exists(`processed:${outboxId}`) === 1;
}

async function markProcessed(outboxId) {
const redis = new Redis();
await redis.set(`processed:${outboxId}`, Date.now(), 'EX', 2592000); // 30 days
}

2. Use unique constraints:

async function indexProduct(product) {
// Elasticsearch uses document ID as unique constraint
await elasticsearch.index({
index: 'products',
id: product.PRD_ID, // Unique ID
body: product
});

// Multiple calls with same ID → same result (idempotent)
}

3. Check-and-set patterns:

async function updateCounter(event) {
const key = `counter:${event.AGGREGATE_ID}`;

// Get current value
const current = await redis.get(key) || 0;

// Only increment if not already incremented for this event
if (current < event.PAYLOAD.counter_value) {
await redis.set(key, event.PAYLOAD.counter_value);
}
}

Error Handling

Retry with Exponential Backoff

class ResilientSubscriber extends OutboxSubscriber {
async subscribe(handler, maxRetries = 3) {
await this.channel.consume(this.queueName, async (msg) => {
if (!msg) return;

let retryCount = 0;
let success = false;

while (retryCount < maxRetries && !success) {
try {
const event = JSON.parse(msg.content.toString());

await handler(event);

this.channel.ack(msg);
success = true;
} catch (error) {
retryCount++;

console.error(
`[${this.queueName}] Error (attempt ${retryCount}/${maxRetries}):`,
error.message
);

if (retryCount >= maxRetries) {
// Max retries exceeded - move to dead letter queue
await this.sendToDeadLetterQueue(msg);
this.channel.ack(msg);
} else {
// Wait before retry (exponential backoff)
await this.sleep(Math.pow(2, retryCount) * 1000);
}
}
}
});
}

async sendToDeadLetterQueue(msg) {
const dlqExchange = 'corewrite.dlq';
const event = JSON.parse(msg.content.toString());

await this.channel.assertExchange(dlqExchange, 'direct', { durable: true });

await this.channel.publish(
dlqExchange,
this.queueName,
msg.content,
{
persistent: true,
headers: {
'x-original-queue': this.queueName,
'x-failed-at': new Date().toISOString(),
'x-error': 'Max retries exceeded'
}
}
);

console.log(`[${this.queueName}] Sent to DLQ:`, event.OUTBOX_ID);
}

sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}

Dead Letter Queue Monitoring

class DeadLetterQueueMonitor {
constructor() {
this.subscriber = new OutboxSubscriber('corewrite.dlq');
}

async start() {
await this.subscriber.connect();

await this.subscriber.subscribe(async (event) => {
// Alert on DLQ messages
await this.sendAlert({
title: 'Event Processing Failed',
event_id: event.OUTBOX_ID,
event_type: event.EVENT_TYPE,
aggregate: `${event.AGGREGATE_TYPE}/${event.AGGREGATE_ID}`,
timestamp: new Date().toISOString()
});

// Log to monitoring system
console.error('[DLQ] Failed event:', event);
});
}

async sendAlert(data) {
// Send to Slack, email, PagerDuty, etc.
await fetch(process.env.SLACK_WEBHOOK_URL, {
method: 'POST',
body: JSON.stringify({
text: `⚠️ ${data.title}\nEvent: ${data.event_type}\nID: ${data.event_id}`
})
});
}
}

Scaling Subscribers

Multiple Instances

# docker-compose.yml
version: '3'
services:
search-indexer-1:
image: search-indexer:latest
environment:
RABBITMQ_URL: amqp://rabbitmq:5672
ELASTICSEARCH_URL: http://elasticsearch:9200
restart: always

search-indexer-2:
image: search-indexer:latest
environment:
RABBITMQ_URL: amqp://rabbitmq:5672
ELASTICSEARCH_URL: http://elasticsearch:9200
restart: always

search-indexer-3:
image: search-indexer:latest
environment:
RABBITMQ_URL: amqp://rabbitmq:5672
ELASTICSEARCH_URL: http://elasticsearch:9200
restart: always

Benefits:

  • ✅ Parallel processing (faster throughput)
  • ✅ High availability (if one crashes, others continue)
  • ✅ Load balancing (RabbitMQ distributes messages)

Consumer Acknowledgment

// Acknowledge only after successful processing
await this.channel.consume(queueName, async (msg) => {
try {
await processEvent(msg);
this.channel.ack(msg); // ✅ Success
} catch (error) {
this.channel.nack(msg, false, true); // ❌ Reject and requeue
}
});

Monitoring and Metrics

Prometheus Metrics

const promClient = require('prom-client');

// Metrics
const eventsProcessed = new promClient.Counter({
name: 'events_processed_total',
help: 'Total events processed',
labelNames: ['event_type', 'status']
});

const processingDuration = new promClient.Histogram({
name: 'event_processing_duration_seconds',
help: 'Event processing duration',
labelNames: ['event_type']
});

// Track metrics
async function handleEvent(event) {
const timer = processingDuration.startTimer({ event_type: event.EVENT_TYPE });

try {
await processEvent(event);

eventsProcessed.inc({ event_type: event.EVENT_TYPE, status: 'success' });
timer();
} catch (error) {
eventsProcessed.inc({ event_type: event.EVENT_TYPE, status: 'error' });
timer();
throw error;
}
}

// Expose metrics endpoint
const express = require('express');
const app = express();

app.get('/metrics', async (req, res) => {
res.set('Content-Type', promClient.register.contentType);
res.end(await promClient.register.metrics());
});

app.listen(9090);

Best Practices

✅ DO:

Implement idempotency:

// ✅ Good - check if already processed
if (await isProcessed(event.OUTBOX_ID)) {
return;
}
await process(event);
await markProcessed(event.OUTBOX_ID);

Handle errors gracefully:

// ✅ Good - retry with backoff
try {
await process(event);
} catch (error) {
if (retryCount < maxRetries) {
await sleep(Math.pow(2, retryCount) * 1000);
return retry();
} else {
await sendToDLQ(event);
}
}

Monitor subscriber health:

// ✅ Good - expose health endpoint
app.get('/health', (req, res) => {
res.json({ status: 'healthy', queue: queueName });
});

❌ DON'T:

Don't assume exactly-once delivery:

// ❌ Bad - not idempotent
await redis.incr(`counter:${event.AGGREGATE_ID}`);

// ✅ Good - idempotent
await redis.set(`counter:${event.AGGREGATE_ID}`, event.PAYLOAD.count);

Don't block on external calls:

// ❌ Bad - synchronous external call
await externalAPI.notify(event); // Blocks queue

// ✅ Good - async with timeout
Promise.race([
externalAPI.notify(event),
timeout(5000)
]).catch(error => log(error));

Summary

  • ✅ Subscribers consume events from RabbitMQ fanout exchange
  • ✅ Common patterns: search indexing, cache invalidation, notifications
  • ✅ Idempotency required (at-least-once delivery)
  • ✅ Error handling with retry and dead letter queue
  • ✅ Scale with multiple subscriber instances
  • ✅ Monitor with Prometheus metrics

Key Takeaways:

  1. Implement idempotent event handlers
  2. Use retry with exponential backoff
  3. Move failed events to dead letter queue
  4. Scale horizontally with multiple instances
  5. Monitor processing metrics
  6. Acknowledge only after successful processing
  7. Handle failures gracefully