Batch Processing
Overview
Batch processing enables efficient handling of large-scale data operations, allowing you to insert, update, or delete thousands of records with optimal performance and transaction management.
Use Cases:
- Data migration from external systems
- Bulk product catalog updates
- Mass price adjustments
- End-of-day order processing
- Periodic data synchronization
- Bulk user imports
Benefits:
- ✅ Higher throughput (process 1000s of records)
- ✅ Lower overhead (fewer HTTP requests)
- ✅ Transaction management (all-or-nothing commits)
- ✅ Progress tracking (monitor batch execution)
- ✅ Error handling (partial failure recovery)
Batch Insert
Single Batch Insert
API endpoint:
# Batch insert multiple products
curl -X POST https://api.q01.io/api/v4/core/products/batch \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"records": [
{"XPRD01": "Widget A", "XPRD02": 19.99, "XPRD05": 1},
{"XPRD01": "Widget B", "XPRD02": 29.99, "XPRD05": 1},
{"XPRD01": "Widget C", "XPRD02": 39.99, "XPRD05": 2},
...
]
}'
Response:
{
"inserted": 1000,
"failed": 0,
"errors": [],
"duration_ms": 1250,
"records": [
{"PRD_ID": 1001, "XPRD01": "Widget A", "XPRD02": 19.99},
{"PRD_ID": 1002, "XPRD01": "Widget B", "XPRD02": 29.99},
{"PRD_ID": 1003, "XPRD01": "Widget C", "XPRD02": 39.99},
...
]
}
Batch Insert Implementation
JavaScript:
class BatchProcessor {
constructor(apiClient, batchSize = 100) {
this.apiClient = apiClient;
this.batchSize = batchSize;
}
async batchInsert(dimension, records) {
const batches = this.chunk(records, this.batchSize);
const results = {
total: records.length,
inserted: 0,
failed: 0,
errors: []
};
console.log(`Processing ${batches.length} batches...`);
for (let i = 0; i < batches.length; i++) {
const batch = batches[i];
console.log(`Batch ${i + 1}/${batches.length}: ${batch.length} records`);
try {
const response = await this.apiClient.post(
`/api/v4/core/${dimension}/batch`,
{ records: batch }
);
results.inserted += response.inserted;
results.failed += response.failed;
results.errors.push(...response.errors);
console.log(`✅ Batch ${i + 1} completed: ${response.inserted} inserted`);
} catch (error) {
console.error(`❌ Batch ${i + 1} failed: ${error.message}`);
results.failed += batch.length;
results.errors.push({
batch: i + 1,
error: error.message,
records: batch
});
}
}
return results;
}
chunk(array, size) {
const chunks = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
}
// Usage
const processor = new BatchProcessor(apiClient, 100);
const products = [
{XPRD01: "Product 1", XPRD02: 19.99, XPRD05: 1},
{XPRD01: "Product 2", XPRD02: 29.99, XPRD05: 1},
// ... 10,000 products
];
const results = await processor.batchInsert('products', products);
console.log(`Inserted: ${results.inserted}`);
console.log(`Failed: ${results.failed}`);
console.log(`Errors: ${results.errors.length}`);
Backend Batch Insert (PHP)
class BatchInsertHandler {
private $db;
private $batchSize = 100;
public function batchInsert(string $dimension, array $records): array {
$inserted = 0;
$failed = 0;
$errors = [];
// Process in batches
$batches = array_chunk($records, $this->batchSize);
foreach ($batches as $batchIndex => $batch) {
try {
// Start transaction
$this->db->beginTransaction();
// Build bulk insert SQL
$sql = $this->buildBulkInsertSQL($dimension, $batch);
// Execute
$this->db->exec($sql);
// Insert OUTBOX events
foreach ($batch as $record) {
$this->insertOutboxEvent('ProductCreated', $dimension, $record);
}
// Commit
$this->db->commit();
$inserted += count($batch);
} catch (\Exception $e) {
// Rollback on error
$this->db->rollBack();
$failed += count($batch);
$errors[] = [
'batch' => $batchIndex + 1,
'error' => $e->getMessage(),
'record_count' => count($batch)
];
}
}
return [
'inserted' => $inserted,
'failed' => $failed,
'errors' => $errors
];
}
private function buildBulkInsertSQL(string $dimension, array $records): string {
$tableName = "TB_ANAG_{$dimension}00";
// Get field names from first record
$fields = array_keys($records[0]);
$fieldList = implode(', ', $fields);
// Build VALUES clauses
$values = [];
foreach ($records as $record) {
$valueList = implode(', ', array_map(function($value) {
return $this->db->quote($value);
}, array_values($record)));
$values[] = "($valueList)";
}
$valuesString = implode(', ', $values);
return "INSERT INTO $tableName ($fieldList) VALUES $valuesString";
}
}
Batch Update
Bulk Update by IDs
# Update multiple products
curl -X PUT https://api.q01.io/api/v4/core/products/batch \
-H "Authorization: Bearer $TOKEN" \
-d '{
"records": [
{"PRD_ID": 1001, "XPRD02": 22.99, "XPRD06": "Y"},
{"PRD_ID": 1002, "XPRD02": 32.99, "XPRD06": "Y"},
{"PRD_ID": 1003, "XPRD02": 42.99, "XPRD06": "N"}
]
}'
Bulk Update by Filter
# Update all products in category 5
curl -X PATCH https://api.q01.io/api/v4/core/products/batch \
-H "Authorization: Bearer $TOKEN" \
-d '{
"filters": {
"XPRD05": 5,
"TREC": "N"
},
"updates": {
"XPRD06": "Y",
"XPRD10": "2025-12-19"
}
}'
# Response
{
"updated": 250,
"duration_ms": 450
}
Batch Update Implementation
async function batchUpdate(dimension, updates) {
const batchSize = 100;
const batches = chunk(updates, batchSize);
let totalUpdated = 0;
for (const batch of batches) {
const response = await apiClient.put(
`/api/v4/core/${dimension}/batch`,
{ records: batch }
);
totalUpdated += response.updated;
console.log(`Updated: ${response.updated} records`);
}
return totalUpdated;
}
// Usage
const updates = [
{PRD_ID: 1001, XPRD02: 22.99},
{PRD_ID: 1002, XPRD02: 32.99},
// ... 5,000 updates
];
await batchUpdate('products', updates);
Batch Delete
Bulk Delete by IDs
# Delete multiple products
curl -X DELETE https://api.q01.io/api/v4/core/products/batch \
-H "Authorization: Bearer $TOKEN" \
-d '{
"ids": [1001, 1002, 1003, 1004, 1005]
}'
# Response
{
"deleted": 5,
"duration_ms": 120
}
Bulk Delete by Filter
# Delete all inactive products older than 2 years
curl -X DELETE https://api.q01.io/api/v4/core/products/batch \
-H "Authorization: Bearer $TOKEN" \
-d '{
"filters": {
"XPRD06": "N",
"CREATED_AT": {"$lt": "2023-01-01"}
}
}'
# Response
{
"deleted": 1250,
"duration_ms": 2500
}
Transaction Management
All-or-Nothing Batches
class TransactionalBatch {
async processBatch(dimension, records) {
// All records in single transaction
try {
const response = await apiClient.post(
`/api/v4/core/${dimension}/batch`,
{
records: records,
transaction: 'atomic' // All-or-nothing
}
);
console.log(`✅ All ${records.length} records inserted`);
return response;
} catch (error) {
console.error(`❌ Batch failed - nothing inserted: ${error.message}`);
throw error;
}
}
}
Partial Commit Batches
class PartialCommitBatch {
async processBatch(dimension, records) {
// Process each record independently
const response = await apiClient.post(
`/api/v4/core/${dimension}/batch`,
{
records: records,
transaction: 'independent' // Commit each record
}
);
console.log(`✅ Inserted: ${response.inserted}`);
console.log(`❌ Failed: ${response.failed}`);
// Process errors
for (const error of response.errors) {
console.error(`Record ${error.index}: ${error.message}`);
}
return response;
}
}
Progress Tracking
Progress Bar Implementation
class ProgressTracker {
constructor(total) {
this.total = total;
this.current = 0;
this.startTime = Date.now();
}
update(processed) {
this.current += processed;
const percentage = ((this.current / this.total) * 100).toFixed(2);
const elapsed = (Date.now() - this.startTime) / 1000;
const rate = this.current / elapsed;
const remaining = (this.total - this.current) / rate;
console.log(
`Progress: ${this.current}/${this.total} (${percentage}%) | ` +
`Rate: ${rate.toFixed(0)} rec/sec | ` +
`ETA: ${remaining.toFixed(0)}s`
);
}
complete() {
const duration = (Date.now() - this.startTime) / 1000;
const rate = this.total / duration;
console.log(
`✅ Complete: ${this.total} records in ${duration.toFixed(2)}s ` +
`(${rate.toFixed(0)} rec/sec)`
);
}
}
// Usage
async function batchInsertWithProgress(dimension, records) {
const batchSize = 100;
const batches = chunk(records, batchSize);
const tracker = new ProgressTracker(records.length);
for (const batch of batches) {
await apiClient.post(`/api/v4/core/${dimension}/batch`, { records: batch });
tracker.update(batch.length);
}
tracker.complete();
}
Error Handling
Retry Failed Batches
class RetryableBatch {
constructor(maxRetries = 3) {
this.maxRetries = maxRetries;
}
async processBatchWithRetry(dimension, records, attempt = 1) {
try {
return await apiClient.post(
`/api/v4/core/${dimension}/batch`,
{ records: records }
);
} catch (error) {
if (attempt < this.maxRetries) {
const delay = Math.pow(2, attempt) * 1000; // Exponential backoff
console.log(`Retry attempt ${attempt} after ${delay}ms...`);
await this.sleep(delay);
return this.processBatchWithRetry(dimension, records, attempt + 1);
}
throw error;
}
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Partial Failure Recovery
class PartialFailureHandler {
async processBatch(dimension, records) {
const response = await apiClient.post(
`/api/v4/core/${dimension}/batch`,
{
records: records,
transaction: 'independent'
}
);
if (response.errors.length > 0) {
console.log(`${response.errors.length} records failed`);
// Extract failed records
const failedRecords = response.errors.map(err =>
records[err.index]
);
// Save to retry queue
await this.saveToRetryQueue(dimension, failedRecords);
// Alert
await this.sendAlert({
title: 'Batch Processing Partial Failure',
dimension: dimension,
failed: response.errors.length,
total: records.length
});
}
return response;
}
async saveToRetryQueue(dimension, records) {
await redis.lpush(
`retry:${dimension}`,
JSON.stringify({
records: records,
timestamp: Date.now()
})
);
}
}
Parallel Processing
Concurrent Batches
class ParallelBatchProcessor {
constructor(concurrency = 5) {
this.concurrency = concurrency;
}
async processBatches(dimension, records, batchSize = 100) {
const batches = chunk(records, batchSize);
const results = {
inserted: 0,
failed: 0,
errors: []
};
// Process batches in parallel (max concurrency)
for (let i = 0; i < batches.length; i += this.concurrency) {
const chunk = batches.slice(i, i + this.concurrency);
const promises = chunk.map(batch =>
apiClient.post(`/api/v4/core/${dimension}/batch`, { records: batch })
);
const responses = await Promise.allSettled(promises);
// Aggregate results
responses.forEach(response => {
if (response.status === 'fulfilled') {
results.inserted += response.value.inserted;
results.failed += response.value.failed;
} else {
results.failed += batchSize;
results.errors.push(response.reason);
}
});
}
return results;
}
}
// Usage
const processor = new ParallelBatchProcessor(5); // 5 concurrent batches
await processor.processBatches('products', records, 100);
Performance Optimization
Database-Level Optimizations
Disable indexes temporarily:
-- Before batch insert
ALTER TABLE TB_ANAG_PRD00 DISABLE KEYS;
-- Bulk insert
INSERT INTO TB_ANAG_PRD00 (...) VALUES (...), (...), ...;
-- Rebuild indexes
ALTER TABLE TB_ANAG_PRD00 ENABLE KEYS;
Use LOAD DATA INFILE (MySQL):
-- Fastest bulk insert method
LOAD DATA LOCAL INFILE '/tmp/products.csv'
INTO TABLE TB_ANAG_PRD00
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n'
IGNORE 1 ROWS
(XPRD01, XPRD02, XPRD03, XPRD05, XPRD06);
Batch Size Tuning
// Too small = too many HTTP requests
const batchSize = 10; // ❌ Slow - 1000 batches for 10k records
// Too large = memory issues, long transactions
const batchSize = 10000; // ❌ Risky - single huge transaction
// Optimal = balance throughput and safety
const batchSize = 100; // ✅ Good - 100 batches for 10k records
Best Practices
✅ DO:
Use appropriate batch sizes:
// ✅ Good - 100-500 records per batch
const batchSize = 100;
const batches = chunk(records, batchSize);
Implement progress tracking:
// ✅ Good - show progress
const tracker = new ProgressTracker(records.length);
for (const batch of batches) {
await process(batch);
tracker.update(batch.length);
}
Handle partial failures:
// ✅ Good - retry failed records
if (response.errors.length > 0) {
await saveToRetryQueue(failedRecords);
}
Use transactions appropriately:
// ✅ Good - atomic for critical operations
{transaction: 'atomic'} // All-or-nothing
// ✅ Good - independent for fault tolerance
{transaction: 'independent'} // Continue on errors
❌ DON'T:
Don't process all records in single transaction:
// ❌ Bad - single transaction for 10k records
await apiClient.post('/api/v4/core/products/batch', {
records: allRecords, // 10,000 records
transaction: 'atomic'
});
Don't ignore errors:
// ❌ Bad - no error handling
for (const batch of batches) {
await process(batch); // Errors ignored
}
// ✅ Good - handle errors
for (const batch of batches) {
try {
await process(batch);
} catch (error) {
logError(error);
}
}
Don't block on sequential processing:
// ❌ Bad - sequential processing
for (const batch of batches) {
await process(batch); // Waits for each batch
}
// ✅ Good - parallel processing
await Promise.all(batches.map(batch => process(batch)));
Summary
- ✅ Batch processing enables efficient large-scale operations
- ✅ Use appropriate batch sizes (100-500 records)
- ✅ Implement progress tracking and monitoring
- ✅ Handle partial failures with retry logic
- ✅ Use atomic transactions for critical operations
- ✅ Use independent transactions for fault tolerance
- ✅ Process batches in parallel for better throughput
- ✅ Monitor and alert on batch failures
Key Takeaways:
- Batch operations dramatically improve throughput
- Choose batch size based on record size and complexity
- Atomic transactions ensure data consistency
- Independent transactions provide fault tolerance
- Progress tracking provides visibility
- Parallel processing increases throughput
- Retry failed batches with exponential backoff
- Monitor batch processing metrics
Related Concepts
- Write Operations - POST/PUT/PATCH
- Performance Optimization - Query tuning
- Transaction Patterns - ACID compliance
- Advanced Topics - Overview