Skip to main content

Batch Processing

Overview

Batch processing enables efficient handling of large-scale data operations, allowing you to insert, update, or delete thousands of records with optimal performance and transaction management.

Use Cases:

  • Data migration from external systems
  • Bulk product catalog updates
  • Mass price adjustments
  • End-of-day order processing
  • Periodic data synchronization
  • Bulk user imports

Benefits:

  • ✅ Higher throughput (process 1000s of records)
  • ✅ Lower overhead (fewer HTTP requests)
  • ✅ Transaction management (all-or-nothing commits)
  • ✅ Progress tracking (monitor batch execution)
  • ✅ Error handling (partial failure recovery)

Batch Insert

Single Batch Insert

API endpoint:

# Batch insert multiple products
curl -X POST https://api.q01.io/api/v4/core/products/batch \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"records": [
{"XPRD01": "Widget A", "XPRD02": 19.99, "XPRD05": 1},
{"XPRD01": "Widget B", "XPRD02": 29.99, "XPRD05": 1},
{"XPRD01": "Widget C", "XPRD02": 39.99, "XPRD05": 2},
...
]
}'

Response:

{
"inserted": 1000,
"failed": 0,
"errors": [],
"duration_ms": 1250,
"records": [
{"PRD_ID": 1001, "XPRD01": "Widget A", "XPRD02": 19.99},
{"PRD_ID": 1002, "XPRD01": "Widget B", "XPRD02": 29.99},
{"PRD_ID": 1003, "XPRD01": "Widget C", "XPRD02": 39.99},
...
]
}

Batch Insert Implementation

JavaScript:

class BatchProcessor {
constructor(apiClient, batchSize = 100) {
this.apiClient = apiClient;
this.batchSize = batchSize;
}

async batchInsert(dimension, records) {
const batches = this.chunk(records, this.batchSize);
const results = {
total: records.length,
inserted: 0,
failed: 0,
errors: []
};

console.log(`Processing ${batches.length} batches...`);

for (let i = 0; i < batches.length; i++) {
const batch = batches[i];

console.log(`Batch ${i + 1}/${batches.length}: ${batch.length} records`);

try {
const response = await this.apiClient.post(
`/api/v4/core/${dimension}/batch`,
{ records: batch }
);

results.inserted += response.inserted;
results.failed += response.failed;
results.errors.push(...response.errors);

console.log(`✅ Batch ${i + 1} completed: ${response.inserted} inserted`);
} catch (error) {
console.error(`❌ Batch ${i + 1} failed: ${error.message}`);
results.failed += batch.length;
results.errors.push({
batch: i + 1,
error: error.message,
records: batch
});
}
}

return results;
}

chunk(array, size) {
const chunks = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
}

// Usage
const processor = new BatchProcessor(apiClient, 100);

const products = [
{XPRD01: "Product 1", XPRD02: 19.99, XPRD05: 1},
{XPRD01: "Product 2", XPRD02: 29.99, XPRD05: 1},
// ... 10,000 products
];

const results = await processor.batchInsert('products', products);

console.log(`Inserted: ${results.inserted}`);
console.log(`Failed: ${results.failed}`);
console.log(`Errors: ${results.errors.length}`);

Backend Batch Insert (PHP)

class BatchInsertHandler {
private $db;
private $batchSize = 100;

public function batchInsert(string $dimension, array $records): array {
$inserted = 0;
$failed = 0;
$errors = [];

// Process in batches
$batches = array_chunk($records, $this->batchSize);

foreach ($batches as $batchIndex => $batch) {
try {
// Start transaction
$this->db->beginTransaction();

// Build bulk insert SQL
$sql = $this->buildBulkInsertSQL($dimension, $batch);

// Execute
$this->db->exec($sql);

// Insert OUTBOX events
foreach ($batch as $record) {
$this->insertOutboxEvent('ProductCreated', $dimension, $record);
}

// Commit
$this->db->commit();

$inserted += count($batch);

} catch (\Exception $e) {
// Rollback on error
$this->db->rollBack();

$failed += count($batch);
$errors[] = [
'batch' => $batchIndex + 1,
'error' => $e->getMessage(),
'record_count' => count($batch)
];
}
}

return [
'inserted' => $inserted,
'failed' => $failed,
'errors' => $errors
];
}

private function buildBulkInsertSQL(string $dimension, array $records): string {
$tableName = "TB_ANAG_{$dimension}00";

// Get field names from first record
$fields = array_keys($records[0]);
$fieldList = implode(', ', $fields);

// Build VALUES clauses
$values = [];
foreach ($records as $record) {
$valueList = implode(', ', array_map(function($value) {
return $this->db->quote($value);
}, array_values($record)));

$values[] = "($valueList)";
}

$valuesString = implode(', ', $values);

return "INSERT INTO $tableName ($fieldList) VALUES $valuesString";
}
}

Batch Update

Bulk Update by IDs

# Update multiple products
curl -X PUT https://api.q01.io/api/v4/core/products/batch \
-H "Authorization: Bearer $TOKEN" \
-d '{
"records": [
{"PRD_ID": 1001, "XPRD02": 22.99, "XPRD06": "Y"},
{"PRD_ID": 1002, "XPRD02": 32.99, "XPRD06": "Y"},
{"PRD_ID": 1003, "XPRD02": 42.99, "XPRD06": "N"}
]
}'

Bulk Update by Filter

# Update all products in category 5
curl -X PATCH https://api.q01.io/api/v4/core/products/batch \
-H "Authorization: Bearer $TOKEN" \
-d '{
"filters": {
"XPRD05": 5,
"TREC": "N"
},
"updates": {
"XPRD06": "Y",
"XPRD10": "2025-12-19"
}
}'

# Response
{
"updated": 250,
"duration_ms": 450
}

Batch Update Implementation

async function batchUpdate(dimension, updates) {
const batchSize = 100;
const batches = chunk(updates, batchSize);

let totalUpdated = 0;

for (const batch of batches) {
const response = await apiClient.put(
`/api/v4/core/${dimension}/batch`,
{ records: batch }
);

totalUpdated += response.updated;

console.log(`Updated: ${response.updated} records`);
}

return totalUpdated;
}

// Usage
const updates = [
{PRD_ID: 1001, XPRD02: 22.99},
{PRD_ID: 1002, XPRD02: 32.99},
// ... 5,000 updates
];

await batchUpdate('products', updates);

Batch Delete

Bulk Delete by IDs

# Delete multiple products
curl -X DELETE https://api.q01.io/api/v4/core/products/batch \
-H "Authorization: Bearer $TOKEN" \
-d '{
"ids": [1001, 1002, 1003, 1004, 1005]
}'

# Response
{
"deleted": 5,
"duration_ms": 120
}

Bulk Delete by Filter

# Delete all inactive products older than 2 years
curl -X DELETE https://api.q01.io/api/v4/core/products/batch \
-H "Authorization: Bearer $TOKEN" \
-d '{
"filters": {
"XPRD06": "N",
"CREATED_AT": {"$lt": "2023-01-01"}
}
}'

# Response
{
"deleted": 1250,
"duration_ms": 2500
}

Transaction Management

All-or-Nothing Batches

class TransactionalBatch {
async processBatch(dimension, records) {
// All records in single transaction
try {
const response = await apiClient.post(
`/api/v4/core/${dimension}/batch`,
{
records: records,
transaction: 'atomic' // All-or-nothing
}
);

console.log(`✅ All ${records.length} records inserted`);
return response;
} catch (error) {
console.error(`❌ Batch failed - nothing inserted: ${error.message}`);
throw error;
}
}
}

Partial Commit Batches

class PartialCommitBatch {
async processBatch(dimension, records) {
// Process each record independently
const response = await apiClient.post(
`/api/v4/core/${dimension}/batch`,
{
records: records,
transaction: 'independent' // Commit each record
}
);

console.log(`✅ Inserted: ${response.inserted}`);
console.log(`❌ Failed: ${response.failed}`);

// Process errors
for (const error of response.errors) {
console.error(`Record ${error.index}: ${error.message}`);
}

return response;
}
}

Progress Tracking

Progress Bar Implementation

class ProgressTracker {
constructor(total) {
this.total = total;
this.current = 0;
this.startTime = Date.now();
}

update(processed) {
this.current += processed;

const percentage = ((this.current / this.total) * 100).toFixed(2);
const elapsed = (Date.now() - this.startTime) / 1000;
const rate = this.current / elapsed;
const remaining = (this.total - this.current) / rate;

console.log(
`Progress: ${this.current}/${this.total} (${percentage}%) | ` +
`Rate: ${rate.toFixed(0)} rec/sec | ` +
`ETA: ${remaining.toFixed(0)}s`
);
}

complete() {
const duration = (Date.now() - this.startTime) / 1000;
const rate = this.total / duration;

console.log(
`✅ Complete: ${this.total} records in ${duration.toFixed(2)}s ` +
`(${rate.toFixed(0)} rec/sec)`
);
}
}

// Usage
async function batchInsertWithProgress(dimension, records) {
const batchSize = 100;
const batches = chunk(records, batchSize);
const tracker = new ProgressTracker(records.length);

for (const batch of batches) {
await apiClient.post(`/api/v4/core/${dimension}/batch`, { records: batch });
tracker.update(batch.length);
}

tracker.complete();
}

Error Handling

Retry Failed Batches

class RetryableBatch {
constructor(maxRetries = 3) {
this.maxRetries = maxRetries;
}

async processBatchWithRetry(dimension, records, attempt = 1) {
try {
return await apiClient.post(
`/api/v4/core/${dimension}/batch`,
{ records: records }
);
} catch (error) {
if (attempt < this.maxRetries) {
const delay = Math.pow(2, attempt) * 1000; // Exponential backoff
console.log(`Retry attempt ${attempt} after ${delay}ms...`);

await this.sleep(delay);
return this.processBatchWithRetry(dimension, records, attempt + 1);
}

throw error;
}
}

sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}

Partial Failure Recovery

class PartialFailureHandler {
async processBatch(dimension, records) {
const response = await apiClient.post(
`/api/v4/core/${dimension}/batch`,
{
records: records,
transaction: 'independent'
}
);

if (response.errors.length > 0) {
console.log(`${response.errors.length} records failed`);

// Extract failed records
const failedRecords = response.errors.map(err =>
records[err.index]
);

// Save to retry queue
await this.saveToRetryQueue(dimension, failedRecords);

// Alert
await this.sendAlert({
title: 'Batch Processing Partial Failure',
dimension: dimension,
failed: response.errors.length,
total: records.length
});
}

return response;
}

async saveToRetryQueue(dimension, records) {
await redis.lpush(
`retry:${dimension}`,
JSON.stringify({
records: records,
timestamp: Date.now()
})
);
}
}

Parallel Processing

Concurrent Batches

class ParallelBatchProcessor {
constructor(concurrency = 5) {
this.concurrency = concurrency;
}

async processBatches(dimension, records, batchSize = 100) {
const batches = chunk(records, batchSize);
const results = {
inserted: 0,
failed: 0,
errors: []
};

// Process batches in parallel (max concurrency)
for (let i = 0; i < batches.length; i += this.concurrency) {
const chunk = batches.slice(i, i + this.concurrency);

const promises = chunk.map(batch =>
apiClient.post(`/api/v4/core/${dimension}/batch`, { records: batch })
);

const responses = await Promise.allSettled(promises);

// Aggregate results
responses.forEach(response => {
if (response.status === 'fulfilled') {
results.inserted += response.value.inserted;
results.failed += response.value.failed;
} else {
results.failed += batchSize;
results.errors.push(response.reason);
}
});
}

return results;
}
}

// Usage
const processor = new ParallelBatchProcessor(5); // 5 concurrent batches
await processor.processBatches('products', records, 100);

Performance Optimization

Database-Level Optimizations

Disable indexes temporarily:

-- Before batch insert
ALTER TABLE TB_ANAG_PRD00 DISABLE KEYS;

-- Bulk insert
INSERT INTO TB_ANAG_PRD00 (...) VALUES (...), (...), ...;

-- Rebuild indexes
ALTER TABLE TB_ANAG_PRD00 ENABLE KEYS;

Use LOAD DATA INFILE (MySQL):

-- Fastest bulk insert method
LOAD DATA LOCAL INFILE '/tmp/products.csv'
INTO TABLE TB_ANAG_PRD00
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n'
IGNORE 1 ROWS
(XPRD01, XPRD02, XPRD03, XPRD05, XPRD06);

Batch Size Tuning

// Too small = too many HTTP requests
const batchSize = 10; // ❌ Slow - 1000 batches for 10k records

// Too large = memory issues, long transactions
const batchSize = 10000; // ❌ Risky - single huge transaction

// Optimal = balance throughput and safety
const batchSize = 100; // ✅ Good - 100 batches for 10k records

Best Practices

✅ DO:

Use appropriate batch sizes:

// ✅ Good - 100-500 records per batch
const batchSize = 100;
const batches = chunk(records, batchSize);

Implement progress tracking:

// ✅ Good - show progress
const tracker = new ProgressTracker(records.length);
for (const batch of batches) {
await process(batch);
tracker.update(batch.length);
}

Handle partial failures:

// ✅ Good - retry failed records
if (response.errors.length > 0) {
await saveToRetryQueue(failedRecords);
}

Use transactions appropriately:

// ✅ Good - atomic for critical operations
{transaction: 'atomic'} // All-or-nothing

// ✅ Good - independent for fault tolerance
{transaction: 'independent'} // Continue on errors

❌ DON'T:

Don't process all records in single transaction:

// ❌ Bad - single transaction for 10k records
await apiClient.post('/api/v4/core/products/batch', {
records: allRecords, // 10,000 records
transaction: 'atomic'
});

Don't ignore errors:

// ❌ Bad - no error handling
for (const batch of batches) {
await process(batch); // Errors ignored
}

// ✅ Good - handle errors
for (const batch of batches) {
try {
await process(batch);
} catch (error) {
logError(error);
}
}

Don't block on sequential processing:

// ❌ Bad - sequential processing
for (const batch of batches) {
await process(batch); // Waits for each batch
}

// ✅ Good - parallel processing
await Promise.all(batches.map(batch => process(batch)));

Summary

  • ✅ Batch processing enables efficient large-scale operations
  • ✅ Use appropriate batch sizes (100-500 records)
  • ✅ Implement progress tracking and monitoring
  • ✅ Handle partial failures with retry logic
  • ✅ Use atomic transactions for critical operations
  • ✅ Use independent transactions for fault tolerance
  • ✅ Process batches in parallel for better throughput
  • ✅ Monitor and alert on batch failures

Key Takeaways:

  1. Batch operations dramatically improve throughput
  2. Choose batch size based on record size and complexity
  3. Atomic transactions ensure data consistency
  4. Independent transactions provide fault tolerance
  5. Progress tracking provides visibility
  6. Parallel processing increases throughput
  7. Retry failed batches with exponential backoff
  8. Monitor batch processing metrics