200 lines
5.0 KiB
JavaScript
200 lines
5.0 KiB
JavaScript
const { Worker } = require('bullmq');
|
|
const { connectionOptions, QueueNames } = require('../config/bullmq');
|
|
const models = require('../models');
|
|
const { cacheUtils } = require('../config/redis');
|
|
|
|
/**
|
|
* Database Write Worker
|
|
* Xử lý các thao tác ghi database (create, update, delete) từ queue
|
|
*/
|
|
class DatabaseWriteWorker {
|
|
constructor() {
|
|
this.worker = new Worker(
|
|
QueueNames.DATABASE_WRITE,
|
|
async (job) => await this.processJob(job),
|
|
{
|
|
connection: connectionOptions,
|
|
prefix: process.env.BULLMQ_PREFIX || 'vcb',
|
|
concurrency: 5, // Process 5 jobs concurrently
|
|
limiter: {
|
|
max: 100,
|
|
duration: 1000, // 100 jobs per second max
|
|
},
|
|
}
|
|
);
|
|
|
|
this.setupEventHandlers();
|
|
}
|
|
|
|
/**
|
|
* Process individual job
|
|
*/
|
|
async processJob(job) {
|
|
const { operation, model, data, userId } = job.data;
|
|
|
|
console.log(`[Worker] Processing ${operation} ${model} - Job ID: ${job.id}`);
|
|
|
|
try {
|
|
const Model = models[model];
|
|
if (!Model) {
|
|
throw new Error(`Model ${model} not found`);
|
|
}
|
|
|
|
let result;
|
|
|
|
switch (operation) {
|
|
case 'create':
|
|
result = await this.handleCreate(Model, data, userId);
|
|
break;
|
|
|
|
case 'update':
|
|
result = await this.handleUpdate(Model, data, userId);
|
|
break;
|
|
|
|
case 'delete':
|
|
result = await this.handleDelete(Model, data, userId);
|
|
break;
|
|
|
|
default:
|
|
throw new Error(`Unknown operation: ${operation}`);
|
|
}
|
|
|
|
// Invalidate related caches
|
|
await this.invalidateCache(model, data.id);
|
|
|
|
console.log(`[Worker] ✅ Completed ${operation} ${model} - Job ID: ${job.id}`);
|
|
return result;
|
|
|
|
} catch (error) {
|
|
console.error(`[Worker] ❌ Error processing job ${job.id}:`, error.message);
|
|
throw error; // Will trigger retry
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle create operation
|
|
*/
|
|
async handleCreate(Model, data, userId) {
|
|
const record = await Model.create(data);
|
|
console.log(`[Worker] Created ${Model.name} with ID: ${record.id}`);
|
|
return record;
|
|
}
|
|
|
|
/**
|
|
* Handle update operation
|
|
*/
|
|
async handleUpdate(Model, data, userId) {
|
|
const { id, ...updateData } = data;
|
|
|
|
const [affectedCount] = await Model.update(updateData, {
|
|
where: { id },
|
|
});
|
|
|
|
if (affectedCount === 0) {
|
|
throw new Error(`${Model.name} with ID ${id} not found`);
|
|
}
|
|
|
|
console.log(`[Worker] Updated ${Model.name} with ID: ${id}`);
|
|
return { id, affectedCount };
|
|
}
|
|
|
|
/**
|
|
* Handle delete operation
|
|
*/
|
|
async handleDelete(Model, data, userId) {
|
|
const { id } = data;
|
|
|
|
const affectedCount = await Model.destroy({
|
|
where: { id },
|
|
});
|
|
|
|
if (affectedCount === 0) {
|
|
throw new Error(`${Model.name} with ID ${id} not found`);
|
|
}
|
|
|
|
console.log(`[Worker] Deleted ${Model.name} with ID: ${id}`);
|
|
return { id, affectedCount };
|
|
}
|
|
|
|
/**
|
|
* Invalidate related caches
|
|
*/
|
|
async invalidateCache(modelName, recordId) {
|
|
try {
|
|
// Specific record cache
|
|
if (recordId) {
|
|
await cacheUtils.delete(`${modelName.toLowerCase()}:${recordId}`);
|
|
}
|
|
|
|
// List caches
|
|
await cacheUtils.deletePattern(`${modelName.toLowerCase()}s:list:*`);
|
|
await cacheUtils.deletePattern(`${modelName.toLowerCase()}:*:stats`);
|
|
|
|
console.log(`[Worker] Cache invalidated for ${modelName}`);
|
|
} catch (error) {
|
|
console.error(`[Worker] Cache invalidation error:`, error.message);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Setup event handlers
|
|
*/
|
|
setupEventHandlers() {
|
|
this.worker.on('completed', (job, result) => {
|
|
console.log(`[Worker] ✅ Job ${job.id} completed successfully`);
|
|
});
|
|
|
|
this.worker.on('failed', (job, error) => {
|
|
console.error(`[Worker] ❌ Job ${job?.id} failed:`, error.message);
|
|
});
|
|
|
|
this.worker.on('error', (error) => {
|
|
console.error('[Worker] ❌ Worker error:', error.message);
|
|
});
|
|
|
|
this.worker.on('stalled', (jobId) => {
|
|
console.warn(`[Worker] ⚠️ Job ${jobId} stalled`);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Graceful shutdown
|
|
*/
|
|
async close() {
|
|
console.log('[Worker] Closing database write worker...');
|
|
await this.worker.close();
|
|
console.log('[Worker] ✅ Database write worker closed');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Start worker
|
|
*/
|
|
const startWorker = () => {
|
|
const worker = new DatabaseWriteWorker();
|
|
|
|
// Handle process termination
|
|
process.on('SIGTERM', async () => {
|
|
console.log('SIGTERM received, closing worker...');
|
|
await worker.close();
|
|
process.exit(0);
|
|
});
|
|
|
|
process.on('SIGINT', async () => {
|
|
console.log('SIGINT received, closing worker...');
|
|
await worker.close();
|
|
process.exit(0);
|
|
});
|
|
|
|
console.log('✅ Database Write Worker started');
|
|
return worker;
|
|
};
|
|
|
|
// Start worker if this file is run directly
|
|
if (require.main === module) {
|
|
require('dotenv').config();
|
|
startWorker();
|
|
}
|
|
|
|
module.exports = { DatabaseWriteWorker, startWorker };
|