Webhook Integration
Overview
Webhooks enable Q01 Core APIs to notify external systems about data changes in real-time. They are implemented as Outbox subscribers that consume events from RabbitMQ and forward them to external HTTP endpoints.
Use Cases:
- Notify external CRM when customer created
- Trigger fulfillment system on order placement
- Update external analytics platform
- Send data to third-party integrations
- Synchronize with external databases
- Trigger external workflows
Benefits:
- ✅ Real-time notifications (instant updates)
- ✅ Decoupled integration (no direct dependencies)
- ✅ Reliable delivery (retry on failure)
- ✅ Event filtering (subscribe to specific events)
- ✅ Webhook signing (security verification)
Architecture
Write Operation (POST/PUT/DELETE)
↓
CoreWrite inserts OUTBOX event
↓
RabbitMQ Fanout Exchange (corewrite.fanout)
↓
Webhook Dispatcher Subscriber
↓
External HTTP Endpoints:
├── https://crm.example.com/webhooks/customer
├── https://analytics.example.com/events
├── https://fulfillment.example.com/orders
└── https://warehouse.example.com/inventory
Webhook Registration
Register Webhook Endpoint
Via API:
# Register webhook for customer events
curl -X POST https://api.q01.io/api/v4/webhooks \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"url": "https://crm.example.com/webhooks/customer",
"events": ["CustomerCreated", "CustomerUpdated", "CustomerDeleted"],
"dimensions": ["CUST"],
"secret": "whsec_abc123xyz789",
"active": true,
"filters": {
"source": "tenant_123"
}
}'
Response:
{
"webhook_id": "wh_xyz789",
"url": "https://crm.example.com/webhooks/customer",
"events": ["CustomerCreated", "CustomerUpdated", "CustomerDeleted"],
"dimensions": ["CUST"],
"secret": "whsec_abc123xyz789",
"active": true,
"created_at": "2025-12-19T10:00:00Z"
}
Webhook Configuration
TB_WEBHOOK metadata table:
CREATE TABLE TB_WEBHOOK (
WEBHOOK_ID INT PRIMARY KEY AUTO_INCREMENT,
SOURCE VARCHAR(50) NOT NULL, -- Tenant identifier
URL VARCHAR(500) NOT NULL, -- Target endpoint
EVENTS JSON NOT NULL, -- Event types to send
DIMENSIONS JSON, -- Filter by dimensions
SECRET VARCHAR(100), -- Signing secret
ACTIVE CHAR(1) DEFAULT 'Y', -- Y/N
RETRY_COUNT INT DEFAULT 3, -- Max retries
TIMEOUT_MS INT DEFAULT 5000, -- Request timeout
CREATED_AT TIMESTAMP DEFAULT NOW(),
UPDATED_AT TIMESTAMP DEFAULT NOW() ON UPDATE NOW(),
INDEX idx_webhook_source (SOURCE),
INDEX idx_webhook_active (ACTIVE)
);
Webhook Dispatcher Implementation
Node.js Webhook Dispatcher
const amqp = require('amqplib');
const axios = require('axios');
const crypto = require('crypto');
class WebhookDispatcher {
constructor() {
this.subscriber = null;
this.webhooks = [];
}
async start() {
// Load webhooks from database
await this.loadWebhooks();
// Connect to RabbitMQ
this.subscriber = new OutboxSubscriber('webhook-dispatcher');
await this.subscriber.connect();
// Subscribe to events
await this.subscriber.subscribe(async (event) => {
await this.dispatchEvent(event);
});
console.log('[WebhookDispatcher] Started');
}
async loadWebhooks() {
// Load from database
const result = await db.query(`
SELECT * FROM TB_WEBHOOK
WHERE ACTIVE = 'Y'
`);
this.webhooks = result.map(row => ({
webhook_id: row.WEBHOOK_ID,
url: row.URL,
events: JSON.parse(row.EVENTS),
dimensions: JSON.parse(row.DIMENSIONS),
secret: row.SECRET,
retry_count: row.RETRY_COUNT,
timeout_ms: row.TIMEOUT_MS
}));
console.log(`[WebhookDispatcher] Loaded ${this.webhooks.length} webhooks`);
}
async dispatchEvent(event) {
const { EVENT_TYPE, AGGREGATE_TYPE, AGGREGATE_ID, PAYLOAD } = event;
// Find matching webhooks
const matchingWebhooks = this.webhooks.filter(webhook =>
webhook.events.includes(EVENT_TYPE) &&
(!webhook.dimensions || webhook.dimensions.includes(AGGREGATE_TYPE))
);
console.log(
`[WebhookDispatcher] Found ${matchingWebhooks.length} webhooks for ${EVENT_TYPE}`
);
// Dispatch to all matching webhooks
for (const webhook of matchingWebhooks) {
await this.sendWebhook(webhook, event);
}
}
async sendWebhook(webhook, event, attempt = 1) {
try {
const payload = this.buildPayload(event);
const signature = this.sign(payload, webhook.secret);
console.log(`[WebhookDispatcher] Sending to ${webhook.url} (attempt ${attempt})`);
const response = await axios.post(webhook.url, payload, {
headers: {
'Content-Type': 'application/json',
'X-Q01-Signature': signature,
'X-Q01-Event-Type': event.EVENT_TYPE,
'X-Q01-Event-ID': event.OUTBOX_ID,
'X-Q01-Timestamp': new Date().toISOString()
},
timeout: webhook.timeout_ms
});
console.log(`[WebhookDispatcher] Success: ${response.status}`);
// Log success
await this.logWebhook(webhook.webhook_id, event.OUTBOX_ID, 'success', null);
} catch (error) {
console.error(`[WebhookDispatcher] Error: ${error.message}`);
// Retry with exponential backoff
if (attempt < webhook.retry_count) {
const delay = Math.pow(2, attempt) * 1000; // 2s, 4s, 8s
console.log(`[WebhookDispatcher] Retrying in ${delay}ms...`);
await this.sleep(delay);
return this.sendWebhook(webhook, event, attempt + 1);
}
// Max retries exceeded - log failure
await this.logWebhook(webhook.webhook_id, event.OUTBOX_ID, 'failed', error.message);
}
}
buildPayload(event) {
return {
event_id: event.OUTBOX_ID,
event_type: event.EVENT_TYPE,
timestamp: event.CREATED_AT,
dimension: event.AGGREGATE_TYPE,
record_id: event.AGGREGATE_ID,
data: event.PAYLOAD
};
}
sign(payload, secret) {
if (!secret) return null;
const hmac = crypto.createHmac('sha256', secret);
hmac.update(JSON.stringify(payload));
return `sha256=${hmac.digest('hex')}`;
}
async logWebhook(webhookId, outboxId, status, error) {
await db.query(`
INSERT INTO TB_WEBHOOK_LOG
(WEBHOOK_ID, OUTBOX_ID, STATUS, ERROR_MESSAGE, CREATED_AT)
VALUES (?, ?, ?, ?, NOW())
`, [webhookId, outboxId, status, error]);
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Start dispatcher
const dispatcher = new WebhookDispatcher();
dispatcher.start().catch(console.error);
Webhook Receiver Implementation
Verify Webhook Signature
Node.js receiver:
const express = require('express');
const crypto = require('crypto');
const bodyParser = require('body-parser');
const app = express();
// Use raw body for signature verification
app.use(bodyParser.json({
verify: (req, res, buf) => {
req.rawBody = buf.toString();
}
}));
// Webhook endpoint
app.post('/webhooks/customer', (req, res) => {
const signature = req.headers['x-q01-signature'];
const secret = process.env.WEBHOOK_SECRET;
// Verify signature
if (!verifySignature(req.rawBody, signature, secret)) {
console.error('[Webhook] Invalid signature');
return res.status(401).json({ error: 'Invalid signature' });
}
// Process webhook
const { event_type, dimension, record_id, data } = req.body;
console.log(`[Webhook] Received: ${event_type} for ${dimension}/${record_id}`);
// Handle event
handleCustomerEvent(event_type, data)
.then(() => {
console.log('[Webhook] Processed successfully');
res.status(200).json({ received: true });
})
.catch(error => {
console.error('[Webhook] Error:', error);
res.status(500).json({ error: error.message });
});
});
function verifySignature(payload, signature, secret) {
if (!signature || !secret) return false;
const hmac = crypto.createHmac('sha256', secret);
hmac.update(payload);
const expectedSignature = `sha256=${hmac.digest('hex')}`;
return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(expectedSignature)
);
}
async function handleCustomerEvent(eventType, data) {
switch (eventType) {
case 'CustomerCreated':
await createCustomerInCRM(data);
break;
case 'CustomerUpdated':
await updateCustomerInCRM(data);
break;
case 'CustomerDeleted':
await deleteCustomerInCRM(data);
break;
default:
console.log(`Unknown event type: ${eventType}`);
}
}
app.listen(3000, () => {
console.log('[Webhook] Receiver listening on port 3000');
});
PHP Webhook Receiver
<?php
// webhook-receiver.php
$secret = getenv('WEBHOOK_SECRET');
// Get raw body
$payload = file_get_contents('php://input');
$signature = $_SERVER['HTTP_X_Q01_SIGNATURE'] ?? '';
// Verify signature
if (!verifySignature($payload, $signature, $secret)) {
http_response_code(401);
echo json_encode(['error' => 'Invalid signature']);
exit;
}
// Parse payload
$event = json_decode($payload, true);
// Handle event
try {
handleEvent($event);
http_response_code(200);
echo json_encode(['received' => true]);
} catch (Exception $e) {
http_response_code(500);
echo json_encode(['error' => $e->getMessage()]);
}
function verifySignature($payload, $signature, $secret) {
if (empty($signature) || empty($secret)) {
return false;
}
$expectedSignature = 'sha256=' . hash_hmac('sha256', $payload, $secret);
return hash_equals($expectedSignature, $signature);
}
function handleEvent($event) {
$eventType = $event['event_type'];
$data = $event['data'];
switch ($eventType) {
case 'CustomerCreated':
createCustomerInCRM($data);
break;
case 'CustomerUpdated':
updateCustomerInCRM($data);
break;
case 'CustomerDeleted':
deleteCustomerInCRM($data);
break;
default:
error_log("Unknown event type: $eventType");
}
}
?>
Webhook Payload Structure
Standard Webhook Payload
{
"event_id": "outbox_123456",
"event_type": "CustomerCreated",
"timestamp": "2025-12-19T10:30:00Z",
"dimension": "CUST",
"record_id": 1234,
"data": {
"CUST_ID": 1234,
"XCUST01": "John Doe",
"XCUST02": "john.doe@example.com",
"XCUST03": "+1234567890",
"XCUST04": "123 Main St",
"CREATED_AT": "2025-12-19T10:30:00Z",
"UPDATED_AT": "2025-12-19T10:30:00Z"
}
}
HTTP Headers
POST /webhooks/customer HTTP/1.1
Host: crm.example.com
Content-Type: application/json
X-Q01-Signature: sha256=abc123...
X-Q01-Event-Type: CustomerCreated
X-Q01-Event-ID: outbox_123456
X-Q01-Timestamp: 2025-12-19T10:30:00Z
Content-Length: 512
{...}
Event Filtering
Filter by Dimension
// Only send product events
const webhook = {
url: 'https://warehouse.example.com/inventory',
events: ['ProductCreated', 'ProductUpdated', 'ProductDeleted'],
dimensions: ['PRD'] // Only PRD dimension
};
Filter by Event Type
// Only send creation events
const webhook = {
url: 'https://analytics.example.com/new-customers',
events: ['CustomerCreated'], // Only creation
dimensions: ['CUST']
};
Filter by Field Value
class WebhookDispatcher {
async dispatchEvent(event) {
const matchingWebhooks = this.webhooks.filter(webhook => {
// Event type match
if (!webhook.events.includes(event.EVENT_TYPE)) {
return false;
}
// Dimension match
if (webhook.dimensions && !webhook.dimensions.includes(event.AGGREGATE_TYPE)) {
return false;
}
// Custom filters
if (webhook.filters) {
for (const [field, value] of Object.entries(webhook.filters)) {
if (event.PAYLOAD[field] !== value) {
return false;
}
}
}
return true;
});
for (const webhook of matchingWebhooks) {
await this.sendWebhook(webhook, event);
}
}
}
Error Handling and Retry
Exponential Backoff
async function sendWebhookWithRetry(webhook, event) {
const maxRetries = 5;
const baseDelay = 1000; // 1 second
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
await axios.post(webhook.url, event, {
timeout: 5000,
headers: {
'X-Q01-Signature': sign(event, webhook.secret)
}
});
console.log('[Webhook] Success');
return;
} catch (error) {
console.error(`[Webhook] Attempt ${attempt} failed: ${error.message}`);
if (attempt === maxRetries) {
console.error('[Webhook] Max retries exceeded');
await logFailedWebhook(webhook, event, error);
return;
}
// Exponential backoff: 1s, 2s, 4s, 8s, 16s
const delay = baseDelay * Math.pow(2, attempt - 1);
console.log(`[Webhook] Retrying in ${delay}ms...`);
await sleep(delay);
}
}
}
Circuit Breaker Pattern
class CircuitBreaker {
constructor(threshold = 5, timeout = 60000) {
this.failureCount = 0;
this.threshold = threshold;
this.timeout = timeout;
this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
this.nextAttempt = Date.now();
}
async execute(fn) {
if (this.state === 'OPEN') {
if (Date.now() < this.nextAttempt) {
throw new Error('Circuit breaker is OPEN');
}
this.state = 'HALF_OPEN';
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
onSuccess() {
this.failureCount = 0;
this.state = 'CLOSED';
}
onFailure() {
this.failureCount++;
if (this.failureCount >= this.threshold) {
this.state = 'OPEN';
this.nextAttempt = Date.now() + this.timeout;
console.log(`[CircuitBreaker] OPEN for ${this.timeout}ms`);
}
}
}
// Usage
const breaker = new CircuitBreaker();
async function sendWebhook(webhook, event) {
try {
await breaker.execute(() =>
axios.post(webhook.url, event, { timeout: 5000 })
);
} catch (error) {
console.error('[Webhook] Failed (circuit breaker may be open)');
}
}
Webhook Monitoring
TB_WEBHOOK_LOG Table
CREATE TABLE TB_WEBHOOK_LOG (
LOG_ID BIGINT PRIMARY KEY AUTO_INCREMENT,
WEBHOOK_ID INT NOT NULL,
OUTBOX_ID BIGINT NOT NULL,
STATUS ENUM('success', 'failed', 'retrying') NOT NULL,
HTTP_STATUS INT,
ERROR_MESSAGE TEXT,
ATTEMPT_COUNT INT DEFAULT 1,
CREATED_AT TIMESTAMP DEFAULT NOW(),
INDEX idx_webhook_log_webhook (WEBHOOK_ID),
INDEX idx_webhook_log_status (STATUS),
INDEX idx_webhook_log_created (CREATED_AT)
);
Monitor Webhook Health
class WebhookMonitor {
async getWebhookStats(webhookId) {
const stats = await db.query(`
SELECT
COUNT(*) as total_deliveries,
SUM(CASE WHEN STATUS = 'success' THEN 1 ELSE 0 END) as successful,
SUM(CASE WHEN STATUS = 'failed' THEN 1 ELSE 0 END) as failed,
AVG(CASE WHEN STATUS = 'success' THEN ATTEMPT_COUNT END) as avg_attempts
FROM TB_WEBHOOK_LOG
WHERE WEBHOOK_ID = ?
AND CREATED_AT > DATE_SUB(NOW(), INTERVAL 24 HOUR)
`, [webhookId]);
const successRate = (stats.successful / stats.total_deliveries) * 100;
return {
webhook_id: webhookId,
total_deliveries: stats.total_deliveries,
successful: stats.successful,
failed: stats.failed,
success_rate: successRate.toFixed(2) + '%',
avg_attempts: stats.avg_attempts
};
}
async alertOnFailures(webhookId) {
const stats = await this.getWebhookStats(webhookId);
if (stats.success_rate < 90) {
await sendAlert({
title: 'Webhook Health Degraded',
webhook_id: webhookId,
success_rate: stats.success_rate,
failed: stats.failed
});
}
}
}
Best Practices
✅ DO:
Verify webhook signatures:
// ✅ Good - always verify
if (!verifySignature(payload, signature, secret)) {
return res.status(401).json({ error: 'Invalid signature' });
}
Return 200 quickly:
// ✅ Good - acknowledge immediately, process async
app.post('/webhook', async (req, res) => {
res.status(200).json({ received: true });
// Process asynchronously
processWebhook(req.body).catch(console.error);
});
Implement idempotency:
// ✅ Good - check if already processed
if (await isProcessed(event.event_id)) {
return res.status(200).json({ received: true, duplicate: true });
}
await process(event);
await markProcessed(event.event_id);
❌ DON'T:
Don't ignore signatures:
// ❌ Bad - security risk
app.post('/webhook', (req, res) => {
// No signature verification!
processWebhook(req.body);
});
Don't block the response:
// ❌ Bad - slow response
app.post('/webhook', async (req, res) => {
await longRunningProcess(req.body); // Blocks webhook delivery
res.status(200).json({ received: true });
});
Don't store secrets in code:
// ❌ Bad - hardcoded secret
const secret = 'whsec_abc123xyz';
// ✅ Good - environment variable
const secret = process.env.WEBHOOK_SECRET;
Summary
- ✅ Webhooks enable real-time external system notifications
- ✅ Implemented as Outbox subscribers consuming RabbitMQ events
- ✅ Always verify webhook signatures for security
- ✅ Implement retry with exponential backoff
- ✅ Return 200 quickly, process asynchronously
- ✅ Filter events by dimension and event type
- ✅ Monitor webhook health and success rates
- ✅ Use circuit breaker for failing endpoints
Key Takeaways:
- Webhooks decouple external integrations
- Signature verification prevents unauthorized access
- Exponential backoff handles transient failures
- Circuit breaker protects against persistent failures
- Idempotency prevents duplicate processing
- Async processing keeps webhook responses fast
- Monitor success rates and alert on degradation
- Log all webhook deliveries for debugging
Related Concepts
- Outbox Subscribers - RabbitMQ event consumers
- Outbox Pattern - Event sourcing
- Advanced Topics - Overview