const { Worker } = require('bullmq'); const { connectionOptions } = require('../config/bullmq'); const models = require('../models'); /** * Process Data Worker * Worker này xử lý các task từ queue 'process-data' * - Nhận lesson data với danh sách vocabulary và grammar IDs * - Thực hiện các xử lý nghiệp vụ với dữ liệu */ class ProcessDataWorker { constructor() { this.worker = new Worker( 'process-data', async (job) => await this.processJob(job), { connection: connectionOptions, prefix: process.env.BULLMQ_PREFIX || 'vcb', concurrency: 5, // Process 5 jobs đồng thời limiter: { max: 20, duration: 1000, // 20 jobs/second }, } ); this.setupEventHandlers(); } /** * Process job - Xử lý task process_data */ async processJob(job) { const { type, lesson, timestamp } = job.data; console.log(`[ProcessDataWorker] Processing job ${job.id} - Type: ${type}`); console.log(`[ProcessDataWorker] Lesson: ${lesson.lesson_title} (${lesson.lesson_id})`); try { if (type === 'lesson_data') { const result = await this.processLessonData(lesson); console.log(`[ProcessDataWorker] ✅ Completed job ${job.id}`); return result; } else { console.log(`[ProcessDataWorker] ⚠️ Unknown type: ${type}`); return { success: false, error: 'Unknown type' }; } } catch (error) { console.error(`[ProcessDataWorker] ❌ Error processing job ${job.id}:`, error.message); throw error; } } /** * Xử lý lesson data */ async processLessonData(lesson) { const { lesson_id, lesson_title, vocabularies, grammars, chapter_id, } = lesson; console.log(`[ProcessDataWorker] Processing lesson: ${lesson_title}`); console.log(`[ProcessDataWorker] - Vocabularies: ${vocabularies.length}`); console.log(`[ProcessDataWorker] - Grammars: ${grammars.length}`); const result = { lesson_id, processed: { vocabularies: 0, grammars: 0, }, details: { vocabularies: [], grammars: [], }, }; // Xử lý vocabularies if (vocabularies.length > 0) { const vocabResult = await this.processVocabularies(vocabularies, lesson_id); result.processed.vocabularies = vocabResult.count; result.details.vocabularies = vocabResult.items; } // Xử lý grammars if (grammars.length > 0) { const grammarResult = await this.processGrammars(grammars, lesson_id); result.processed.grammars = grammarResult.count; result.details.grammars = grammarResult.items; } console.log(`[ProcessDataWorker] ✅ Processed: ${result.processed.vocabularies} vocabs, ${result.processed.grammars} grammars`); return result; } /** * Xử lý vocabularies * TODO: Implement logic xử lý vocabularies theo yêu cầu nghiệp vụ */ async processVocabularies(vocabularyIds, lessonId) { const items = []; // Lấy thông tin vocabularies từ database const vocabs = await models.Vocab.findAll({ where: { vocab_id: vocabularyIds, }, include: [ { model: models.VocabMapping, as: 'mappings', }, { model: models.VocabForm, as: 'forms', }, ], }); for (const vocab of vocabs) { console.log(`[ProcessDataWorker] - Vocab: ${vocab.word} (${vocab.vocab_id})`); // TODO: Thực hiện xử lý nghiệp vụ ở đây // Ví dụ: // - Cập nhật thống kê // - Tạo cache // - Sync với hệ thống khác // - Tạo notification // - Log audit items.push({ vocab_id: vocab.vocab_id, word: vocab.word, word_type: vocab.word_type, mappings_count: vocab.mappings?.length || 0, forms_count: vocab.forms?.length || 0, }); } return { count: vocabs.length, items, }; } /** * Xử lý grammars * TODO: Implement logic xử lý grammars theo yêu cầu nghiệp vụ */ async processGrammars(grammarIds, lessonId) { const items = []; // Lấy thông tin grammars từ database const grammars = await models.Grammar.findAll({ where: { id: grammarIds, }, include: [ { model: models.GrammarMapping, as: 'mappings', }, ], }); for (const grammar of grammars) { console.log(`[ProcessDataWorker] - Grammar: ${grammar.title} (${grammar.id})`); // TODO: Thực hiện xử lý nghiệp vụ ở đây // Ví dụ: // - Cập nhật thống kê // - Tạo cache // - Sync với hệ thống khác // - Tạo notification // - Log audit items.push({ grammar_id: grammar.id, title: grammar.title, grammar_type: grammar.grammar_type, mappings_count: grammar.mappings?.length || 0, }); } return { count: grammars.length, items, }; } /** * Setup event handlers */ setupEventHandlers() { this.worker.on('completed', (job, returnvalue) => { console.log(`[ProcessDataWorker] ✅ Job ${job.id} completed`); }); this.worker.on('failed', (job, err) => { console.error(`[ProcessDataWorker] ❌ Job ${job?.id} failed:`, err.message); }); this.worker.on('error', (err) => { console.error(`[ProcessDataWorker] ❌ Worker error:`, err.message); }); this.worker.on('ready', () => { console.log('[ProcessDataWorker] 🚀 Worker ready and listening for jobs...'); }); this.worker.on('active', (job) => { console.log(`[ProcessDataWorker] 🔄 Processing job ${job.id}...`); }); } /** * Graceful shutdown */ async close() { console.log('[ProcessDataWorker] Closing worker...'); await this.worker.close(); console.log('[ProcessDataWorker] Worker closed'); } } // Export class module.exports = ProcessDataWorker; // Nếu chạy trực tiếp file này if (require.main === module) { const worker = new ProcessDataWorker(); // Graceful shutdown process.on('SIGTERM', async () => { console.log('SIGTERM received, closing worker...'); await worker.close(); process.exit(0); }); process.on('SIGINT', async () => { console.log('SIGINT received, closing worker...'); await worker.close(); process.exit(0); }); console.log('═══════════════════════════════════════════════════════'); console.log(' Process Data Worker'); console.log(' Queue: process-data'); console.log(' Concurrency: 5'); console.log('═══════════════════════════════════════════════════════\n'); }