const { Worker, Queue } = require('bullmq'); const { connectionOptions } = require('../config/bullmq'); const models = require('../models'); const { Op } = require('sequelize'); /** * Lesson Data Fill Worker * Worker này lấy các lesson từ database và tạo task xử lý dữ liệu lên BullMQ * - Liệt kê các từ vựng hoặc ngữ pháp của lesson * - Ghi thành task lên BullMQ với label 'process_data' */ class LessonDataFillWorker { constructor() { // Queue để ghi task process_data this.processDataQueue = new Queue('process-data', { connection: connectionOptions, prefix: process.env.BULLMQ_PREFIX || 'vcb', defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 2000, }, removeOnComplete: { age: 24 * 3600, count: 1000, }, removeOnFail: { age: 7 * 24 * 3600, }, }, }); // Worker để xử lý các lesson this.worker = new Worker( 'lesson-data-fill', async (job) => await this.processJob(job), { connection: connectionOptions, prefix: process.env.BULLMQ_PREFIX || 'vcb', concurrency: 3, limiter: { max: 10, duration: 1000, }, } ); this.setupEventHandlers(); } /** * Process job - Lấy lessons và tạo task xử lý */ async processJob(job) { console.log(`[LessonDataFillWorker] Starting job ${job.id}`); try { const { batchSize = 50, offset = 0, filters = {} } = job.data; // Lấy lessons từ database const lessons = await this.getLessons(batchSize, offset, filters); console.log(`[LessonDataFillWorker] Found ${lessons.length} lessons`); // Xử lý từng lesson const tasks = []; for (const lesson of lessons) { const taskData = await this.extractLessonData(lesson); if (taskData) { // Tạo task lên BullMQ const task = await this.createProcessDataTask(taskData); tasks.push(task); } } console.log(`[LessonDataFillWorker] ✅ Created ${tasks.length} process_data tasks`); return { success: true, lessonsProcessed: lessons.length, tasksCreated: tasks.length, offset, hasMore: lessons.length === batchSize, }; } catch (error) { console.error(`[LessonDataFillWorker] ❌ Error:`, error.message); throw error; } } /** * Lấy lessons từ database */ async getLessons(limit, offset, filters) { const whereClause = { is_published: true, lesson_type: 'json_content', content_json: { [Op.ne]: null, }, }; // Thêm filters nếu có if (filters.lesson_content_type) { whereClause.lesson_content_type = filters.lesson_content_type; } if (filters.chapter_id) { whereClause.chapter_id = filters.chapter_id; } const lessons = await models.Lesson.findAll({ where: whereClause, limit, offset, order: [['created_at', 'DESC']], include: [ { model: models.Chapter, as: 'chapter', attributes: ['id', 'chapter_number', 'chapter_title', 'subject_id'], }, ], }); return lessons; } /** * Trích xuất dữ liệu từ lesson */ async extractLessonData(lesson) { try { const contentJson = lesson.content_json; if (!contentJson || typeof contentJson !== 'object') { return null; } const lessonData = { lesson_id: lesson.id, lesson_title: lesson.lesson_title, lesson_number: lesson.lesson_number, lesson_content_type: lesson.lesson_content_type, chapter_id: lesson.chapter_id, chapter_title: lesson.chapter?.chapter_title, vocabularies: [], grammars: [], }; // Trích xuất vocabulary IDs if (contentJson.vocabulary_ids && Array.isArray(contentJson.vocabulary_ids)) { lessonData.vocabularies = contentJson.vocabulary_ids; console.log(`[LessonDataFillWorker] Lesson ${lesson.id}: ${lessonData.vocabularies.length} vocabularies`); } // Trích xuất grammar IDs if (contentJson.grammar_ids && Array.isArray(contentJson.grammar_ids)) { lessonData.grammars = contentJson.grammar_ids; console.log(`[LessonDataFillWorker] Lesson ${lesson.id}: ${lessonData.grammars.length} grammars`); } // Xử lý review lessons (có thể chứa cả vocab và grammar) if (contentJson.sections && Array.isArray(contentJson.sections)) { for (const section of contentJson.sections) { if (section.vocabulary_ids && Array.isArray(section.vocabulary_ids)) { lessonData.vocabularies.push(...section.vocabulary_ids); } if (section.grammar_ids && Array.isArray(section.grammar_ids)) { lessonData.grammars.push(...section.grammar_ids); } } // Loại bỏ duplicate IDs lessonData.vocabularies = [...new Set(lessonData.vocabularies)]; lessonData.grammars = [...new Set(lessonData.grammars)]; } // Chỉ return nếu có vocabulary hoặc grammar if (lessonData.vocabularies.length > 0 || lessonData.grammars.length > 0) { return lessonData; } return null; } catch (error) { console.error(`[LessonDataFillWorker] Error extracting lesson data:`, error.message); return null; } } /** * Tạo task process_data lên BullMQ */ async createProcessDataTask(lessonData) { try { const job = await this.processDataQueue.add( 'process_data', { type: 'lesson_data', lesson: lessonData, timestamp: new Date().toISOString(), }, { priority: 5, jobId: `lesson-${lessonData.lesson_id}-${Date.now()}`, } ); console.log(`[LessonDataFillWorker] ✅ Created task: ${job.id} for lesson ${lessonData.lesson_id}`); return { jobId: job.id, lessonId: lessonData.lesson_id, }; } catch (error) { console.error(`[LessonDataFillWorker] ❌ Error creating task:`, error.message); throw error; } } /** * Setup event handlers */ setupEventHandlers() { this.worker.on('completed', (job) => { console.log(`[LessonDataFillWorker] ✅ Job ${job.id} completed`); }); this.worker.on('failed', (job, err) => { console.error(`[LessonDataFillWorker] ❌ Job ${job?.id} failed:`, err.message); }); this.worker.on('error', (err) => { console.error(`[LessonDataFillWorker] ❌ Worker error:`, err.message); }); this.worker.on('ready', () => { console.log('[LessonDataFillWorker] 🚀 Worker ready'); }); this.processDataQueue.on('error', (err) => { console.error(`[ProcessDataQueue] ❌ Queue error:`, err.message); }); } /** * Graceful shutdown */ async close() { console.log('[LessonDataFillWorker] Closing worker...'); await this.worker.close(); await this.processDataQueue.close(); console.log('[LessonDataFillWorker] Worker closed'); } } // Export class và instance để sử dụng module.exports = LessonDataFillWorker; // Nếu chạy trực tiếp file này if (require.main === module) { const worker = new LessonDataFillWorker(); // Graceful shutdown process.on('SIGTERM', async () => { console.log('SIGTERM received, closing worker...'); await worker.close(); process.exit(0); }); process.on('SIGINT', async () => { console.log('SIGINT received, closing worker...'); await worker.close(); process.exit(0); }); console.log('🚀 Lesson Data Fill Worker started'); }