Skip to main content

Webhook Integration

Overview

Webhooks enable Q01 Core APIs to notify external systems about data changes in real-time. They are implemented as Outbox subscribers that consume events from RabbitMQ and forward them to external HTTP endpoints.

Use Cases:

  • Notify external CRM when customer created
  • Trigger fulfillment system on order placement
  • Update external analytics platform
  • Send data to third-party integrations
  • Synchronize with external databases
  • Trigger external workflows

Benefits:

  • ✅ Real-time notifications (instant updates)
  • ✅ Decoupled integration (no direct dependencies)
  • ✅ Reliable delivery (retry on failure)
  • ✅ Event filtering (subscribe to specific events)
  • ✅ Webhook signing (security verification)

Architecture

Write Operation (POST/PUT/DELETE)

CoreWrite inserts OUTBOX event

RabbitMQ Fanout Exchange (corewrite.fanout)

Webhook Dispatcher Subscriber

External HTTP Endpoints:
├── https://crm.example.com/webhooks/customer
├── https://analytics.example.com/events
├── https://fulfillment.example.com/orders
└── https://warehouse.example.com/inventory

Webhook Registration

Register Webhook Endpoint

Via API:

# Register webhook for customer events
curl -X POST https://api.q01.io/api/v4/webhooks \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"url": "https://crm.example.com/webhooks/customer",
"events": ["CustomerCreated", "CustomerUpdated", "CustomerDeleted"],
"dimensions": ["CUST"],
"secret": "whsec_abc123xyz789",
"active": true,
"filters": {
"source": "tenant_123"
}
}'

Response:

{
"webhook_id": "wh_xyz789",
"url": "https://crm.example.com/webhooks/customer",
"events": ["CustomerCreated", "CustomerUpdated", "CustomerDeleted"],
"dimensions": ["CUST"],
"secret": "whsec_abc123xyz789",
"active": true,
"created_at": "2025-12-19T10:00:00Z"
}

Webhook Configuration

TB_WEBHOOK metadata table:

CREATE TABLE TB_WEBHOOK (
WEBHOOK_ID INT PRIMARY KEY AUTO_INCREMENT,
SOURCE VARCHAR(50) NOT NULL, -- Tenant identifier
URL VARCHAR(500) NOT NULL, -- Target endpoint
EVENTS JSON NOT NULL, -- Event types to send
DIMENSIONS JSON, -- Filter by dimensions
SECRET VARCHAR(100), -- Signing secret
ACTIVE CHAR(1) DEFAULT 'Y', -- Y/N
RETRY_COUNT INT DEFAULT 3, -- Max retries
TIMEOUT_MS INT DEFAULT 5000, -- Request timeout
CREATED_AT TIMESTAMP DEFAULT NOW(),
UPDATED_AT TIMESTAMP DEFAULT NOW() ON UPDATE NOW(),
INDEX idx_webhook_source (SOURCE),
INDEX idx_webhook_active (ACTIVE)
);

Webhook Dispatcher Implementation

Node.js Webhook Dispatcher

const amqp = require('amqplib');
const axios = require('axios');
const crypto = require('crypto');

class WebhookDispatcher {
constructor() {
this.subscriber = null;
this.webhooks = [];
}

async start() {
// Load webhooks from database
await this.loadWebhooks();

// Connect to RabbitMQ
this.subscriber = new OutboxSubscriber('webhook-dispatcher');
await this.subscriber.connect();

// Subscribe to events
await this.subscriber.subscribe(async (event) => {
await this.dispatchEvent(event);
});

console.log('[WebhookDispatcher] Started');
}

async loadWebhooks() {
// Load from database
const result = await db.query(`
SELECT * FROM TB_WEBHOOK
WHERE ACTIVE = 'Y'
`);

this.webhooks = result.map(row => ({
webhook_id: row.WEBHOOK_ID,
url: row.URL,
events: JSON.parse(row.EVENTS),
dimensions: JSON.parse(row.DIMENSIONS),
secret: row.SECRET,
retry_count: row.RETRY_COUNT,
timeout_ms: row.TIMEOUT_MS
}));

console.log(`[WebhookDispatcher] Loaded ${this.webhooks.length} webhooks`);
}

async dispatchEvent(event) {
const { EVENT_TYPE, AGGREGATE_TYPE, AGGREGATE_ID, PAYLOAD } = event;

// Find matching webhooks
const matchingWebhooks = this.webhooks.filter(webhook =>
webhook.events.includes(EVENT_TYPE) &&
(!webhook.dimensions || webhook.dimensions.includes(AGGREGATE_TYPE))
);

console.log(
`[WebhookDispatcher] Found ${matchingWebhooks.length} webhooks for ${EVENT_TYPE}`
);

// Dispatch to all matching webhooks
for (const webhook of matchingWebhooks) {
await this.sendWebhook(webhook, event);
}
}

async sendWebhook(webhook, event, attempt = 1) {
try {
const payload = this.buildPayload(event);
const signature = this.sign(payload, webhook.secret);

console.log(`[WebhookDispatcher] Sending to ${webhook.url} (attempt ${attempt})`);

const response = await axios.post(webhook.url, payload, {
headers: {
'Content-Type': 'application/json',
'X-Q01-Signature': signature,
'X-Q01-Event-Type': event.EVENT_TYPE,
'X-Q01-Event-ID': event.OUTBOX_ID,
'X-Q01-Timestamp': new Date().toISOString()
},
timeout: webhook.timeout_ms
});

console.log(`[WebhookDispatcher] Success: ${response.status}`);

// Log success
await this.logWebhook(webhook.webhook_id, event.OUTBOX_ID, 'success', null);
} catch (error) {
console.error(`[WebhookDispatcher] Error: ${error.message}`);

// Retry with exponential backoff
if (attempt < webhook.retry_count) {
const delay = Math.pow(2, attempt) * 1000; // 2s, 4s, 8s
console.log(`[WebhookDispatcher] Retrying in ${delay}ms...`);

await this.sleep(delay);
return this.sendWebhook(webhook, event, attempt + 1);
}

// Max retries exceeded - log failure
await this.logWebhook(webhook.webhook_id, event.OUTBOX_ID, 'failed', error.message);
}
}

buildPayload(event) {
return {
event_id: event.OUTBOX_ID,
event_type: event.EVENT_TYPE,
timestamp: event.CREATED_AT,
dimension: event.AGGREGATE_TYPE,
record_id: event.AGGREGATE_ID,
data: event.PAYLOAD
};
}

sign(payload, secret) {
if (!secret) return null;

const hmac = crypto.createHmac('sha256', secret);
hmac.update(JSON.stringify(payload));
return `sha256=${hmac.digest('hex')}`;
}

async logWebhook(webhookId, outboxId, status, error) {
await db.query(`
INSERT INTO TB_WEBHOOK_LOG
(WEBHOOK_ID, OUTBOX_ID, STATUS, ERROR_MESSAGE, CREATED_AT)
VALUES (?, ?, ?, ?, NOW())
`, [webhookId, outboxId, status, error]);
}

sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}

// Start dispatcher
const dispatcher = new WebhookDispatcher();
dispatcher.start().catch(console.error);

Webhook Receiver Implementation

Verify Webhook Signature

Node.js receiver:

const express = require('express');
const crypto = require('crypto');
const bodyParser = require('body-parser');

const app = express();

// Use raw body for signature verification
app.use(bodyParser.json({
verify: (req, res, buf) => {
req.rawBody = buf.toString();
}
}));

// Webhook endpoint
app.post('/webhooks/customer', (req, res) => {
const signature = req.headers['x-q01-signature'];
const secret = process.env.WEBHOOK_SECRET;

// Verify signature
if (!verifySignature(req.rawBody, signature, secret)) {
console.error('[Webhook] Invalid signature');
return res.status(401).json({ error: 'Invalid signature' });
}

// Process webhook
const { event_type, dimension, record_id, data } = req.body;

console.log(`[Webhook] Received: ${event_type} for ${dimension}/${record_id}`);

// Handle event
handleCustomerEvent(event_type, data)
.then(() => {
console.log('[Webhook] Processed successfully');
res.status(200).json({ received: true });
})
.catch(error => {
console.error('[Webhook] Error:', error);
res.status(500).json({ error: error.message });
});
});

function verifySignature(payload, signature, secret) {
if (!signature || !secret) return false;

const hmac = crypto.createHmac('sha256', secret);
hmac.update(payload);
const expectedSignature = `sha256=${hmac.digest('hex')}`;

return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(expectedSignature)
);
}

async function handleCustomerEvent(eventType, data) {
switch (eventType) {
case 'CustomerCreated':
await createCustomerInCRM(data);
break;

case 'CustomerUpdated':
await updateCustomerInCRM(data);
break;

case 'CustomerDeleted':
await deleteCustomerInCRM(data);
break;

default:
console.log(`Unknown event type: ${eventType}`);
}
}

app.listen(3000, () => {
console.log('[Webhook] Receiver listening on port 3000');
});

PHP Webhook Receiver

<?php
// webhook-receiver.php

$secret = getenv('WEBHOOK_SECRET');

// Get raw body
$payload = file_get_contents('php://input');
$signature = $_SERVER['HTTP_X_Q01_SIGNATURE'] ?? '';

// Verify signature
if (!verifySignature($payload, $signature, $secret)) {
http_response_code(401);
echo json_encode(['error' => 'Invalid signature']);
exit;
}

// Parse payload
$event = json_decode($payload, true);

// Handle event
try {
handleEvent($event);
http_response_code(200);
echo json_encode(['received' => true]);
} catch (Exception $e) {
http_response_code(500);
echo json_encode(['error' => $e->getMessage()]);
}

function verifySignature($payload, $signature, $secret) {
if (empty($signature) || empty($secret)) {
return false;
}

$expectedSignature = 'sha256=' . hash_hmac('sha256', $payload, $secret);

return hash_equals($expectedSignature, $signature);
}

function handleEvent($event) {
$eventType = $event['event_type'];
$data = $event['data'];

switch ($eventType) {
case 'CustomerCreated':
createCustomerInCRM($data);
break;

case 'CustomerUpdated':
updateCustomerInCRM($data);
break;

case 'CustomerDeleted':
deleteCustomerInCRM($data);
break;

default:
error_log("Unknown event type: $eventType");
}
}
?>

Webhook Payload Structure

Standard Webhook Payload

{
"event_id": "outbox_123456",
"event_type": "CustomerCreated",
"timestamp": "2025-12-19T10:30:00Z",
"dimension": "CUST",
"record_id": 1234,
"data": {
"CUST_ID": 1234,
"XCUST01": "John Doe",
"XCUST02": "john.doe@example.com",
"XCUST03": "+1234567890",
"XCUST04": "123 Main St",
"CREATED_AT": "2025-12-19T10:30:00Z",
"UPDATED_AT": "2025-12-19T10:30:00Z"
}
}

HTTP Headers

POST /webhooks/customer HTTP/1.1
Host: crm.example.com
Content-Type: application/json
X-Q01-Signature: sha256=abc123...
X-Q01-Event-Type: CustomerCreated
X-Q01-Event-ID: outbox_123456
X-Q01-Timestamp: 2025-12-19T10:30:00Z
Content-Length: 512

{...}

Event Filtering

Filter by Dimension

// Only send product events
const webhook = {
url: 'https://warehouse.example.com/inventory',
events: ['ProductCreated', 'ProductUpdated', 'ProductDeleted'],
dimensions: ['PRD'] // Only PRD dimension
};

Filter by Event Type

// Only send creation events
const webhook = {
url: 'https://analytics.example.com/new-customers',
events: ['CustomerCreated'], // Only creation
dimensions: ['CUST']
};

Filter by Field Value

class WebhookDispatcher {
async dispatchEvent(event) {
const matchingWebhooks = this.webhooks.filter(webhook => {
// Event type match
if (!webhook.events.includes(event.EVENT_TYPE)) {
return false;
}

// Dimension match
if (webhook.dimensions && !webhook.dimensions.includes(event.AGGREGATE_TYPE)) {
return false;
}

// Custom filters
if (webhook.filters) {
for (const [field, value] of Object.entries(webhook.filters)) {
if (event.PAYLOAD[field] !== value) {
return false;
}
}
}

return true;
});

for (const webhook of matchingWebhooks) {
await this.sendWebhook(webhook, event);
}
}
}

Error Handling and Retry

Exponential Backoff

async function sendWebhookWithRetry(webhook, event) {
const maxRetries = 5;
const baseDelay = 1000; // 1 second

for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
await axios.post(webhook.url, event, {
timeout: 5000,
headers: {
'X-Q01-Signature': sign(event, webhook.secret)
}
});

console.log('[Webhook] Success');
return;
} catch (error) {
console.error(`[Webhook] Attempt ${attempt} failed: ${error.message}`);

if (attempt === maxRetries) {
console.error('[Webhook] Max retries exceeded');
await logFailedWebhook(webhook, event, error);
return;
}

// Exponential backoff: 1s, 2s, 4s, 8s, 16s
const delay = baseDelay * Math.pow(2, attempt - 1);
console.log(`[Webhook] Retrying in ${delay}ms...`);
await sleep(delay);
}
}
}

Circuit Breaker Pattern

class CircuitBreaker {
constructor(threshold = 5, timeout = 60000) {
this.failureCount = 0;
this.threshold = threshold;
this.timeout = timeout;
this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
this.nextAttempt = Date.now();
}

async execute(fn) {
if (this.state === 'OPEN') {
if (Date.now() < this.nextAttempt) {
throw new Error('Circuit breaker is OPEN');
}
this.state = 'HALF_OPEN';
}

try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}

onSuccess() {
this.failureCount = 0;
this.state = 'CLOSED';
}

onFailure() {
this.failureCount++;

if (this.failureCount >= this.threshold) {
this.state = 'OPEN';
this.nextAttempt = Date.now() + this.timeout;
console.log(`[CircuitBreaker] OPEN for ${this.timeout}ms`);
}
}
}

// Usage
const breaker = new CircuitBreaker();

async function sendWebhook(webhook, event) {
try {
await breaker.execute(() =>
axios.post(webhook.url, event, { timeout: 5000 })
);
} catch (error) {
console.error('[Webhook] Failed (circuit breaker may be open)');
}
}

Webhook Monitoring

TB_WEBHOOK_LOG Table

CREATE TABLE TB_WEBHOOK_LOG (
LOG_ID BIGINT PRIMARY KEY AUTO_INCREMENT,
WEBHOOK_ID INT NOT NULL,
OUTBOX_ID BIGINT NOT NULL,
STATUS ENUM('success', 'failed', 'retrying') NOT NULL,
HTTP_STATUS INT,
ERROR_MESSAGE TEXT,
ATTEMPT_COUNT INT DEFAULT 1,
CREATED_AT TIMESTAMP DEFAULT NOW(),
INDEX idx_webhook_log_webhook (WEBHOOK_ID),
INDEX idx_webhook_log_status (STATUS),
INDEX idx_webhook_log_created (CREATED_AT)
);

Monitor Webhook Health

class WebhookMonitor {
async getWebhookStats(webhookId) {
const stats = await db.query(`
SELECT
COUNT(*) as total_deliveries,
SUM(CASE WHEN STATUS = 'success' THEN 1 ELSE 0 END) as successful,
SUM(CASE WHEN STATUS = 'failed' THEN 1 ELSE 0 END) as failed,
AVG(CASE WHEN STATUS = 'success' THEN ATTEMPT_COUNT END) as avg_attempts
FROM TB_WEBHOOK_LOG
WHERE WEBHOOK_ID = ?
AND CREATED_AT > DATE_SUB(NOW(), INTERVAL 24 HOUR)
`, [webhookId]);

const successRate = (stats.successful / stats.total_deliveries) * 100;

return {
webhook_id: webhookId,
total_deliveries: stats.total_deliveries,
successful: stats.successful,
failed: stats.failed,
success_rate: successRate.toFixed(2) + '%',
avg_attempts: stats.avg_attempts
};
}

async alertOnFailures(webhookId) {
const stats = await this.getWebhookStats(webhookId);

if (stats.success_rate < 90) {
await sendAlert({
title: 'Webhook Health Degraded',
webhook_id: webhookId,
success_rate: stats.success_rate,
failed: stats.failed
});
}
}
}

Best Practices

✅ DO:

Verify webhook signatures:

// ✅ Good - always verify
if (!verifySignature(payload, signature, secret)) {
return res.status(401).json({ error: 'Invalid signature' });
}

Return 200 quickly:

// ✅ Good - acknowledge immediately, process async
app.post('/webhook', async (req, res) => {
res.status(200).json({ received: true });

// Process asynchronously
processWebhook(req.body).catch(console.error);
});

Implement idempotency:

// ✅ Good - check if already processed
if (await isProcessed(event.event_id)) {
return res.status(200).json({ received: true, duplicate: true });
}
await process(event);
await markProcessed(event.event_id);

❌ DON'T:

Don't ignore signatures:

// ❌ Bad - security risk
app.post('/webhook', (req, res) => {
// No signature verification!
processWebhook(req.body);
});

Don't block the response:

// ❌ Bad - slow response
app.post('/webhook', async (req, res) => {
await longRunningProcess(req.body); // Blocks webhook delivery
res.status(200).json({ received: true });
});

Don't store secrets in code:

// ❌ Bad - hardcoded secret
const secret = 'whsec_abc123xyz';

// ✅ Good - environment variable
const secret = process.env.WEBHOOK_SECRET;

Summary

  • ✅ Webhooks enable real-time external system notifications
  • ✅ Implemented as Outbox subscribers consuming RabbitMQ events
  • ✅ Always verify webhook signatures for security
  • ✅ Implement retry with exponential backoff
  • ✅ Return 200 quickly, process asynchronously
  • ✅ Filter events by dimension and event type
  • ✅ Monitor webhook health and success rates
  • ✅ Use circuit breaker for failing endpoints

Key Takeaways:

  1. Webhooks decouple external integrations
  2. Signature verification prevents unauthorized access
  3. Exponential backoff handles transient failures
  4. Circuit breaker protects against persistent failures
  5. Idempotency prevents duplicate processing
  6. Async processing keeps webhook responses fast
  7. Monitor success rates and alert on degradation
  8. Log all webhook deliveries for debugging