update
This commit is contained in:
199
workers/databaseWriteWorker.js
Normal file
199
workers/databaseWriteWorker.js
Normal file
@@ -0,0 +1,199 @@
|
||||
const { Worker } = require('bullmq');
|
||||
const { connectionOptions, QueueNames } = require('../config/bullmq');
|
||||
const models = require('../models');
|
||||
const { cacheUtils } = require('../config/redis');
|
||||
|
||||
/**
|
||||
* Database Write Worker
|
||||
* Xử lý các thao tác ghi database (create, update, delete) từ queue
|
||||
*/
|
||||
class DatabaseWriteWorker {
|
||||
constructor() {
|
||||
this.worker = new Worker(
|
||||
QueueNames.DATABASE_WRITE,
|
||||
async (job) => await this.processJob(job),
|
||||
{
|
||||
connection: connectionOptions,
|
||||
prefix: process.env.BULLMQ_PREFIX || 'vcb',
|
||||
concurrency: 5, // Process 5 jobs concurrently
|
||||
limiter: {
|
||||
max: 100,
|
||||
duration: 1000, // 100 jobs per second max
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
this.setupEventHandlers();
|
||||
}
|
||||
|
||||
/**
|
||||
* Process individual job
|
||||
*/
|
||||
async processJob(job) {
|
||||
const { operation, model, data, userId } = job.data;
|
||||
|
||||
console.log(`[Worker] Processing ${operation} ${model} - Job ID: ${job.id}`);
|
||||
|
||||
try {
|
||||
const Model = models[model];
|
||||
if (!Model) {
|
||||
throw new Error(`Model ${model} not found`);
|
||||
}
|
||||
|
||||
let result;
|
||||
|
||||
switch (operation) {
|
||||
case 'create':
|
||||
result = await this.handleCreate(Model, data, userId);
|
||||
break;
|
||||
|
||||
case 'update':
|
||||
result = await this.handleUpdate(Model, data, userId);
|
||||
break;
|
||||
|
||||
case 'delete':
|
||||
result = await this.handleDelete(Model, data, userId);
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new Error(`Unknown operation: ${operation}`);
|
||||
}
|
||||
|
||||
// Invalidate related caches
|
||||
await this.invalidateCache(model, data.id);
|
||||
|
||||
console.log(`[Worker] ✅ Completed ${operation} ${model} - Job ID: ${job.id}`);
|
||||
return result;
|
||||
|
||||
} catch (error) {
|
||||
console.error(`[Worker] ❌ Error processing job ${job.id}:`, error.message);
|
||||
throw error; // Will trigger retry
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle create operation
|
||||
*/
|
||||
async handleCreate(Model, data, userId) {
|
||||
const record = await Model.create(data);
|
||||
console.log(`[Worker] Created ${Model.name} with ID: ${record.id}`);
|
||||
return record;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle update operation
|
||||
*/
|
||||
async handleUpdate(Model, data, userId) {
|
||||
const { id, ...updateData } = data;
|
||||
|
||||
const [affectedCount] = await Model.update(updateData, {
|
||||
where: { id },
|
||||
});
|
||||
|
||||
if (affectedCount === 0) {
|
||||
throw new Error(`${Model.name} with ID ${id} not found`);
|
||||
}
|
||||
|
||||
console.log(`[Worker] Updated ${Model.name} with ID: ${id}`);
|
||||
return { id, affectedCount };
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle delete operation
|
||||
*/
|
||||
async handleDelete(Model, data, userId) {
|
||||
const { id } = data;
|
||||
|
||||
const affectedCount = await Model.destroy({
|
||||
where: { id },
|
||||
});
|
||||
|
||||
if (affectedCount === 0) {
|
||||
throw new Error(`${Model.name} with ID ${id} not found`);
|
||||
}
|
||||
|
||||
console.log(`[Worker] Deleted ${Model.name} with ID: ${id}`);
|
||||
return { id, affectedCount };
|
||||
}
|
||||
|
||||
/**
|
||||
* Invalidate related caches
|
||||
*/
|
||||
async invalidateCache(modelName, recordId) {
|
||||
try {
|
||||
// Specific record cache
|
||||
if (recordId) {
|
||||
await cacheUtils.delete(`${modelName.toLowerCase()}:${recordId}`);
|
||||
}
|
||||
|
||||
// List caches
|
||||
await cacheUtils.deletePattern(`${modelName.toLowerCase()}s:list:*`);
|
||||
await cacheUtils.deletePattern(`${modelName.toLowerCase()}:*:stats`);
|
||||
|
||||
console.log(`[Worker] Cache invalidated for ${modelName}`);
|
||||
} catch (error) {
|
||||
console.error(`[Worker] Cache invalidation error:`, error.message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup event handlers
|
||||
*/
|
||||
setupEventHandlers() {
|
||||
this.worker.on('completed', (job, result) => {
|
||||
console.log(`[Worker] ✅ Job ${job.id} completed successfully`);
|
||||
});
|
||||
|
||||
this.worker.on('failed', (job, error) => {
|
||||
console.error(`[Worker] ❌ Job ${job?.id} failed:`, error.message);
|
||||
});
|
||||
|
||||
this.worker.on('error', (error) => {
|
||||
console.error('[Worker] ❌ Worker error:', error.message);
|
||||
});
|
||||
|
||||
this.worker.on('stalled', (jobId) => {
|
||||
console.warn(`[Worker] ⚠️ Job ${jobId} stalled`);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Graceful shutdown
|
||||
*/
|
||||
async close() {
|
||||
console.log('[Worker] Closing database write worker...');
|
||||
await this.worker.close();
|
||||
console.log('[Worker] ✅ Database write worker closed');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start worker
|
||||
*/
|
||||
const startWorker = () => {
|
||||
const worker = new DatabaseWriteWorker();
|
||||
|
||||
// Handle process termination
|
||||
process.on('SIGTERM', async () => {
|
||||
console.log('SIGTERM received, closing worker...');
|
||||
await worker.close();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
process.on('SIGINT', async () => {
|
||||
console.log('SIGINT received, closing worker...');
|
||||
await worker.close();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
console.log('✅ Database Write Worker started');
|
||||
return worker;
|
||||
};
|
||||
|
||||
// Start worker if this file is run directly
|
||||
if (require.main === module) {
|
||||
require('dotenv').config();
|
||||
startWorker();
|
||||
}
|
||||
|
||||
module.exports = { DatabaseWriteWorker, startWorker };
|
||||
Reference in New Issue
Block a user