const { Worker } = require('bullmq'); const { connectionOptions, QueueNames } = require('../config/bullmq'); const models = require('../models'); const { cacheUtils } = require('../config/redis'); /** * Database Write Worker * Xử lý các thao tác ghi database (create, update, delete) từ queue */ class DatabaseWriteWorker { constructor() { this.worker = new Worker( QueueNames.DATABASE_WRITE, async (job) => await this.processJob(job), { connection: connectionOptions, prefix: process.env.BULLMQ_PREFIX || 'vcb', concurrency: 5, // Process 5 jobs concurrently limiter: { max: 100, duration: 1000, // 100 jobs per second max }, } ); this.setupEventHandlers(); } /** * Process individual job */ async processJob(job) { const { operation, model, data, userId } = job.data; console.log(`[Worker] Processing ${operation} ${model} - Job ID: ${job.id}`); try { const Model = models[model]; if (!Model) { throw new Error(`Model ${model} not found`); } let result; switch (operation) { case 'create': result = await this.handleCreate(Model, data, userId); break; case 'update': result = await this.handleUpdate(Model, data, userId); break; case 'delete': result = await this.handleDelete(Model, data, userId); break; default: throw new Error(`Unknown operation: ${operation}`); } // Invalidate related caches await this.invalidateCache(model, data.id); console.log(`[Worker] ✅ Completed ${operation} ${model} - Job ID: ${job.id}`); return result; } catch (error) { console.error(`[Worker] ❌ Error processing job ${job.id}:`, error.message); throw error; // Will trigger retry } } /** * Handle create operation */ async handleCreate(Model, data, userId) { const record = await Model.create(data); console.log(`[Worker] Created ${Model.name} with ID: ${record.id}`); return record; } /** * Handle update operation */ async handleUpdate(Model, data, userId) { const { id, ...updateData } = data; const [affectedCount] = await Model.update(updateData, { where: { id }, }); if (affectedCount === 0) { throw new Error(`${Model.name} with ID ${id} not found`); } console.log(`[Worker] Updated ${Model.name} with ID: ${id}`); return { id, affectedCount }; } /** * Handle delete operation */ async handleDelete(Model, data, userId) { const { id } = data; const affectedCount = await Model.destroy({ where: { id }, }); if (affectedCount === 0) { throw new Error(`${Model.name} with ID ${id} not found`); } console.log(`[Worker] Deleted ${Model.name} with ID: ${id}`); return { id, affectedCount }; } /** * Invalidate related caches */ async invalidateCache(modelName, recordId) { try { // Specific record cache if (recordId) { await cacheUtils.delete(`${modelName.toLowerCase()}:${recordId}`); } // List caches await cacheUtils.deletePattern(`${modelName.toLowerCase()}s:list:*`); await cacheUtils.deletePattern(`${modelName.toLowerCase()}:*:stats`); console.log(`[Worker] Cache invalidated for ${modelName}`); } catch (error) { console.error(`[Worker] Cache invalidation error:`, error.message); } } /** * Setup event handlers */ setupEventHandlers() { this.worker.on('completed', (job, result) => { console.log(`[Worker] ✅ Job ${job.id} completed successfully`); }); this.worker.on('failed', (job, error) => { console.error(`[Worker] ❌ Job ${job?.id} failed:`, error.message); }); this.worker.on('error', (error) => { console.error('[Worker] ❌ Worker error:', error.message); }); this.worker.on('stalled', (jobId) => { console.warn(`[Worker] ⚠️ Job ${jobId} stalled`); }); } /** * Graceful shutdown */ async close() { console.log('[Worker] Closing database write worker...'); await this.worker.close(); console.log('[Worker] ✅ Database write worker closed'); } } /** * Start worker */ const startWorker = () => { const worker = new DatabaseWriteWorker(); // Handle process termination process.on('SIGTERM', async () => { console.log('SIGTERM received, closing worker...'); await worker.close(); process.exit(0); }); process.on('SIGINT', async () => { console.log('SIGINT received, closing worker...'); await worker.close(); process.exit(0); }); console.log('✅ Database Write Worker started'); return worker; }; // Start worker if this file is run directly if (require.main === module) { require('dotenv').config(); startWorker(); } module.exports = { DatabaseWriteWorker, startWorker };