283 lines
7.8 KiB
JavaScript
283 lines
7.8 KiB
JavaScript
const { Worker, Queue } = require('bullmq');
|
|
const { connectionOptions } = require('../config/bullmq');
|
|
const models = require('../models');
|
|
const { Op } = require('sequelize');
|
|
|
|
/**
|
|
* Lesson Data Fill Worker
|
|
* Worker này lấy các lesson từ database và tạo task xử lý dữ liệu lên BullMQ
|
|
* - Liệt kê các từ vựng hoặc ngữ pháp của lesson
|
|
* - Ghi thành task lên BullMQ với label 'process_data'
|
|
*/
|
|
class LessonDataFillWorker {
|
|
constructor() {
|
|
// Queue để ghi task process_data
|
|
this.processDataQueue = new Queue('process-data', {
|
|
connection: connectionOptions,
|
|
prefix: process.env.BULLMQ_PREFIX || 'vcb',
|
|
defaultJobOptions: {
|
|
attempts: 3,
|
|
backoff: {
|
|
type: 'exponential',
|
|
delay: 2000,
|
|
},
|
|
removeOnComplete: {
|
|
age: 24 * 3600,
|
|
count: 1000,
|
|
},
|
|
removeOnFail: {
|
|
age: 7 * 24 * 3600,
|
|
},
|
|
},
|
|
});
|
|
|
|
// Worker để xử lý các lesson
|
|
this.worker = new Worker(
|
|
'lesson-data-fill',
|
|
async (job) => await this.processJob(job),
|
|
{
|
|
connection: connectionOptions,
|
|
prefix: process.env.BULLMQ_PREFIX || 'vcb',
|
|
concurrency: 3,
|
|
limiter: {
|
|
max: 10,
|
|
duration: 1000,
|
|
},
|
|
}
|
|
);
|
|
|
|
this.setupEventHandlers();
|
|
}
|
|
|
|
/**
|
|
* Process job - Lấy lessons và tạo task xử lý
|
|
*/
|
|
async processJob(job) {
|
|
console.log(`[LessonDataFillWorker] Starting job ${job.id}`);
|
|
|
|
try {
|
|
const { batchSize = 50, offset = 0, filters = {} } = job.data;
|
|
|
|
// Lấy lessons từ database
|
|
const lessons = await this.getLessons(batchSize, offset, filters);
|
|
|
|
console.log(`[LessonDataFillWorker] Found ${lessons.length} lessons`);
|
|
|
|
// Xử lý từng lesson
|
|
const tasks = [];
|
|
for (const lesson of lessons) {
|
|
const taskData = await this.extractLessonData(lesson);
|
|
|
|
if (taskData) {
|
|
// Tạo task lên BullMQ
|
|
const task = await this.createProcessDataTask(taskData);
|
|
tasks.push(task);
|
|
}
|
|
}
|
|
|
|
console.log(`[LessonDataFillWorker] ✅ Created ${tasks.length} process_data tasks`);
|
|
|
|
return {
|
|
success: true,
|
|
lessonsProcessed: lessons.length,
|
|
tasksCreated: tasks.length,
|
|
offset,
|
|
hasMore: lessons.length === batchSize,
|
|
};
|
|
|
|
} catch (error) {
|
|
console.error(`[LessonDataFillWorker] ❌ Error:`, error.message);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Lấy lessons từ database
|
|
*/
|
|
async getLessons(limit, offset, filters) {
|
|
const whereClause = {
|
|
is_published: true,
|
|
lesson_type: 'json_content',
|
|
content_json: {
|
|
[Op.ne]: null,
|
|
},
|
|
};
|
|
|
|
// Thêm filters nếu có
|
|
if (filters.lesson_content_type) {
|
|
whereClause.lesson_content_type = filters.lesson_content_type;
|
|
}
|
|
if (filters.chapter_id) {
|
|
whereClause.chapter_id = filters.chapter_id;
|
|
}
|
|
|
|
const lessons = await models.Lesson.findAll({
|
|
where: whereClause,
|
|
limit,
|
|
offset,
|
|
order: [['created_at', 'DESC']],
|
|
include: [
|
|
{
|
|
model: models.Chapter,
|
|
as: 'chapter',
|
|
attributes: ['id', 'chapter_number', 'chapter_title', 'subject_id'],
|
|
},
|
|
],
|
|
});
|
|
|
|
return lessons;
|
|
}
|
|
|
|
/**
|
|
* Trích xuất dữ liệu từ lesson
|
|
*/
|
|
async extractLessonData(lesson) {
|
|
try {
|
|
const contentJson = lesson.content_json;
|
|
|
|
if (!contentJson || typeof contentJson !== 'object') {
|
|
return null;
|
|
}
|
|
|
|
const lessonData = {
|
|
lesson_id: lesson.id,
|
|
lesson_title: lesson.lesson_title,
|
|
lesson_number: lesson.lesson_number,
|
|
lesson_content_type: lesson.lesson_content_type,
|
|
chapter_id: lesson.chapter_id,
|
|
chapter_title: lesson.chapter?.chapter_title,
|
|
vocabularies: [],
|
|
grammars: [],
|
|
};
|
|
|
|
// Trích xuất vocabulary IDs
|
|
if (contentJson.vocabulary_ids && Array.isArray(contentJson.vocabulary_ids)) {
|
|
lessonData.vocabularies = contentJson.vocabulary_ids;
|
|
console.log(`[LessonDataFillWorker] Lesson ${lesson.id}: ${lessonData.vocabularies.length} vocabularies`);
|
|
}
|
|
|
|
// Trích xuất grammar IDs
|
|
if (contentJson.grammar_ids && Array.isArray(contentJson.grammar_ids)) {
|
|
lessonData.grammars = contentJson.grammar_ids;
|
|
console.log(`[LessonDataFillWorker] Lesson ${lesson.id}: ${lessonData.grammars.length} grammars`);
|
|
}
|
|
|
|
// Xử lý review lessons (có thể chứa cả vocab và grammar)
|
|
if (contentJson.sections && Array.isArray(contentJson.sections)) {
|
|
for (const section of contentJson.sections) {
|
|
if (section.vocabulary_ids && Array.isArray(section.vocabulary_ids)) {
|
|
lessonData.vocabularies.push(...section.vocabulary_ids);
|
|
}
|
|
if (section.grammar_ids && Array.isArray(section.grammar_ids)) {
|
|
lessonData.grammars.push(...section.grammar_ids);
|
|
}
|
|
}
|
|
|
|
// Loại bỏ duplicate IDs
|
|
lessonData.vocabularies = [...new Set(lessonData.vocabularies)];
|
|
lessonData.grammars = [...new Set(lessonData.grammars)];
|
|
}
|
|
|
|
// Chỉ return nếu có vocabulary hoặc grammar
|
|
if (lessonData.vocabularies.length > 0 || lessonData.grammars.length > 0) {
|
|
return lessonData;
|
|
}
|
|
|
|
return null;
|
|
|
|
} catch (error) {
|
|
console.error(`[LessonDataFillWorker] Error extracting lesson data:`, error.message);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Tạo task process_data lên BullMQ
|
|
*/
|
|
async createProcessDataTask(lessonData) {
|
|
try {
|
|
const job = await this.processDataQueue.add(
|
|
'process_data',
|
|
{
|
|
type: 'lesson_data',
|
|
lesson: lessonData,
|
|
timestamp: new Date().toISOString(),
|
|
},
|
|
{
|
|
priority: 5,
|
|
jobId: `lesson-${lessonData.lesson_id}-${Date.now()}`,
|
|
}
|
|
);
|
|
|
|
console.log(`[LessonDataFillWorker] ✅ Created task: ${job.id} for lesson ${lessonData.lesson_id}`);
|
|
|
|
return {
|
|
jobId: job.id,
|
|
lessonId: lessonData.lesson_id,
|
|
};
|
|
|
|
} catch (error) {
|
|
console.error(`[LessonDataFillWorker] ❌ Error creating task:`, error.message);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Setup event handlers
|
|
*/
|
|
setupEventHandlers() {
|
|
this.worker.on('completed', (job) => {
|
|
console.log(`[LessonDataFillWorker] ✅ Job ${job.id} completed`);
|
|
});
|
|
|
|
this.worker.on('failed', (job, err) => {
|
|
console.error(`[LessonDataFillWorker] ❌ Job ${job?.id} failed:`, err.message);
|
|
});
|
|
|
|
this.worker.on('error', (err) => {
|
|
console.error(`[LessonDataFillWorker] ❌ Worker error:`, err.message);
|
|
});
|
|
|
|
this.worker.on('ready', () => {
|
|
console.log('[LessonDataFillWorker] 🚀 Worker ready');
|
|
});
|
|
|
|
this.processDataQueue.on('error', (err) => {
|
|
console.error(`[ProcessDataQueue] ❌ Queue error:`, err.message);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Graceful shutdown
|
|
*/
|
|
async close() {
|
|
console.log('[LessonDataFillWorker] Closing worker...');
|
|
await this.worker.close();
|
|
await this.processDataQueue.close();
|
|
console.log('[LessonDataFillWorker] Worker closed');
|
|
}
|
|
}
|
|
|
|
// Export class và instance để sử dụng
|
|
module.exports = LessonDataFillWorker;
|
|
|
|
// Nếu chạy trực tiếp file này
|
|
if (require.main === module) {
|
|
const worker = new LessonDataFillWorker();
|
|
|
|
// Graceful shutdown
|
|
process.on('SIGTERM', async () => {
|
|
console.log('SIGTERM received, closing worker...');
|
|
await worker.close();
|
|
process.exit(0);
|
|
});
|
|
|
|
process.on('SIGINT', async () => {
|
|
console.log('SIGINT received, closing worker...');
|
|
await worker.close();
|
|
process.exit(0);
|
|
});
|
|
|
|
console.log('🚀 Lesson Data Fill Worker started');
|
|
}
|