Files
sena_db_api_layer/workers/databaseWriteWorker.js
2026-01-19 09:33:35 +07:00

200 lines
5.0 KiB
JavaScript

const { Worker } = require('bullmq');
const { connectionOptions, QueueNames } = require('../config/bullmq');
const models = require('../models');
const { cacheUtils } = require('../config/redis');
/**
* Database Write Worker
* Xử lý các thao tác ghi database (create, update, delete) từ queue
*/
class DatabaseWriteWorker {
constructor() {
this.worker = new Worker(
QueueNames.DATABASE_WRITE,
async (job) => await this.processJob(job),
{
connection: connectionOptions,
prefix: process.env.BULLMQ_PREFIX || 'vcb',
concurrency: 5, // Process 5 jobs concurrently
limiter: {
max: 100,
duration: 1000, // 100 jobs per second max
},
}
);
this.setupEventHandlers();
}
/**
* Process individual job
*/
async processJob(job) {
const { operation, model, data, userId } = job.data;
console.log(`[Worker] Processing ${operation} ${model} - Job ID: ${job.id}`);
try {
const Model = models[model];
if (!Model) {
throw new Error(`Model ${model} not found`);
}
let result;
switch (operation) {
case 'create':
result = await this.handleCreate(Model, data, userId);
break;
case 'update':
result = await this.handleUpdate(Model, data, userId);
break;
case 'delete':
result = await this.handleDelete(Model, data, userId);
break;
default:
throw new Error(`Unknown operation: ${operation}`);
}
// Invalidate related caches
await this.invalidateCache(model, data.id);
console.log(`[Worker] ✅ Completed ${operation} ${model} - Job ID: ${job.id}`);
return result;
} catch (error) {
console.error(`[Worker] ❌ Error processing job ${job.id}:`, error.message);
throw error; // Will trigger retry
}
}
/**
* Handle create operation
*/
async handleCreate(Model, data, userId) {
const record = await Model.create(data);
console.log(`[Worker] Created ${Model.name} with ID: ${record.id}`);
return record;
}
/**
* Handle update operation
*/
async handleUpdate(Model, data, userId) {
const { id, ...updateData } = data;
const [affectedCount] = await Model.update(updateData, {
where: { id },
});
if (affectedCount === 0) {
throw new Error(`${Model.name} with ID ${id} not found`);
}
console.log(`[Worker] Updated ${Model.name} with ID: ${id}`);
return { id, affectedCount };
}
/**
* Handle delete operation
*/
async handleDelete(Model, data, userId) {
const { id } = data;
const affectedCount = await Model.destroy({
where: { id },
});
if (affectedCount === 0) {
throw new Error(`${Model.name} with ID ${id} not found`);
}
console.log(`[Worker] Deleted ${Model.name} with ID: ${id}`);
return { id, affectedCount };
}
/**
* Invalidate related caches
*/
async invalidateCache(modelName, recordId) {
try {
// Specific record cache
if (recordId) {
await cacheUtils.delete(`${modelName.toLowerCase()}:${recordId}`);
}
// List caches
await cacheUtils.deletePattern(`${modelName.toLowerCase()}s:list:*`);
await cacheUtils.deletePattern(`${modelName.toLowerCase()}:*:stats`);
console.log(`[Worker] Cache invalidated for ${modelName}`);
} catch (error) {
console.error(`[Worker] Cache invalidation error:`, error.message);
}
}
/**
* Setup event handlers
*/
setupEventHandlers() {
this.worker.on('completed', (job, result) => {
console.log(`[Worker] ✅ Job ${job.id} completed successfully`);
});
this.worker.on('failed', (job, error) => {
console.error(`[Worker] ❌ Job ${job?.id} failed:`, error.message);
});
this.worker.on('error', (error) => {
console.error('[Worker] ❌ Worker error:', error.message);
});
this.worker.on('stalled', (jobId) => {
console.warn(`[Worker] ⚠️ Job ${jobId} stalled`);
});
}
/**
* Graceful shutdown
*/
async close() {
console.log('[Worker] Closing database write worker...');
await this.worker.close();
console.log('[Worker] ✅ Database write worker closed');
}
}
/**
* Start worker
*/
const startWorker = () => {
const worker = new DatabaseWriteWorker();
// Handle process termination
process.on('SIGTERM', async () => {
console.log('SIGTERM received, closing worker...');
await worker.close();
process.exit(0);
});
process.on('SIGINT', async () => {
console.log('SIGINT received, closing worker...');
await worker.close();
process.exit(0);
});
console.log('✅ Database Write Worker started');
return worker;
};
// Start worker if this file is run directly
if (require.main === module) {
require('dotenv').config();
startWorker();
}
module.exports = { DatabaseWriteWorker, startWorker };