259 lines
7.0 KiB
JavaScript
259 lines
7.0 KiB
JavaScript
const { Worker } = require('bullmq');
|
|
const { connectionOptions } = require('../config/bullmq');
|
|
const models = require('../models');
|
|
|
|
/**
|
|
* Process Data Worker
|
|
* Worker này xử lý các task từ queue 'process-data'
|
|
* - Nhận lesson data với danh sách vocabulary và grammar IDs
|
|
* - Thực hiện các xử lý nghiệp vụ với dữ liệu
|
|
*/
|
|
class ProcessDataWorker {
|
|
constructor() {
|
|
this.worker = new Worker(
|
|
'process-data',
|
|
async (job) => await this.processJob(job),
|
|
{
|
|
connection: connectionOptions,
|
|
prefix: process.env.BULLMQ_PREFIX || 'vcb',
|
|
concurrency: 5, // Process 5 jobs đồng thời
|
|
limiter: {
|
|
max: 20,
|
|
duration: 1000, // 20 jobs/second
|
|
},
|
|
}
|
|
);
|
|
|
|
this.setupEventHandlers();
|
|
}
|
|
|
|
/**
|
|
* Process job - Xử lý task process_data
|
|
*/
|
|
async processJob(job) {
|
|
const { type, lesson, timestamp } = job.data;
|
|
|
|
console.log(`[ProcessDataWorker] Processing job ${job.id} - Type: ${type}`);
|
|
console.log(`[ProcessDataWorker] Lesson: ${lesson.lesson_title} (${lesson.lesson_id})`);
|
|
|
|
try {
|
|
if (type === 'lesson_data') {
|
|
const result = await this.processLessonData(lesson);
|
|
|
|
console.log(`[ProcessDataWorker] ✅ Completed job ${job.id}`);
|
|
return result;
|
|
} else {
|
|
console.log(`[ProcessDataWorker] ⚠️ Unknown type: ${type}`);
|
|
return { success: false, error: 'Unknown type' };
|
|
}
|
|
|
|
} catch (error) {
|
|
console.error(`[ProcessDataWorker] ❌ Error processing job ${job.id}:`, error.message);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Xử lý lesson data
|
|
*/
|
|
async processLessonData(lesson) {
|
|
const {
|
|
lesson_id,
|
|
lesson_title,
|
|
vocabularies,
|
|
grammars,
|
|
chapter_id,
|
|
} = lesson;
|
|
|
|
console.log(`[ProcessDataWorker] Processing lesson: ${lesson_title}`);
|
|
console.log(`[ProcessDataWorker] - Vocabularies: ${vocabularies.length}`);
|
|
console.log(`[ProcessDataWorker] - Grammars: ${grammars.length}`);
|
|
|
|
const result = {
|
|
lesson_id,
|
|
processed: {
|
|
vocabularies: 0,
|
|
grammars: 0,
|
|
},
|
|
details: {
|
|
vocabularies: [],
|
|
grammars: [],
|
|
},
|
|
};
|
|
|
|
// Xử lý vocabularies
|
|
if (vocabularies.length > 0) {
|
|
const vocabResult = await this.processVocabularies(vocabularies, lesson_id);
|
|
result.processed.vocabularies = vocabResult.count;
|
|
result.details.vocabularies = vocabResult.items;
|
|
}
|
|
|
|
// Xử lý grammars
|
|
if (grammars.length > 0) {
|
|
const grammarResult = await this.processGrammars(grammars, lesson_id);
|
|
result.processed.grammars = grammarResult.count;
|
|
result.details.grammars = grammarResult.items;
|
|
}
|
|
|
|
console.log(`[ProcessDataWorker] ✅ Processed: ${result.processed.vocabularies} vocabs, ${result.processed.grammars} grammars`);
|
|
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Xử lý vocabularies
|
|
* TODO: Implement logic xử lý vocabularies theo yêu cầu nghiệp vụ
|
|
*/
|
|
async processVocabularies(vocabularyIds, lessonId) {
|
|
const items = [];
|
|
|
|
// Lấy thông tin vocabularies từ database
|
|
const vocabs = await models.Vocab.findAll({
|
|
where: {
|
|
vocab_id: vocabularyIds,
|
|
},
|
|
include: [
|
|
{
|
|
model: models.VocabMapping,
|
|
as: 'mappings',
|
|
},
|
|
{
|
|
model: models.VocabForm,
|
|
as: 'forms',
|
|
},
|
|
],
|
|
});
|
|
|
|
for (const vocab of vocabs) {
|
|
console.log(`[ProcessDataWorker] - Vocab: ${vocab.word} (${vocab.vocab_id})`);
|
|
|
|
// TODO: Thực hiện xử lý nghiệp vụ ở đây
|
|
// Ví dụ:
|
|
// - Cập nhật thống kê
|
|
// - Tạo cache
|
|
// - Sync với hệ thống khác
|
|
// - Tạo notification
|
|
// - Log audit
|
|
|
|
items.push({
|
|
vocab_id: vocab.vocab_id,
|
|
word: vocab.word,
|
|
word_type: vocab.word_type,
|
|
mappings_count: vocab.mappings?.length || 0,
|
|
forms_count: vocab.forms?.length || 0,
|
|
});
|
|
}
|
|
|
|
return {
|
|
count: vocabs.length,
|
|
items,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Xử lý grammars
|
|
* TODO: Implement logic xử lý grammars theo yêu cầu nghiệp vụ
|
|
*/
|
|
async processGrammars(grammarIds, lessonId) {
|
|
const items = [];
|
|
|
|
// Lấy thông tin grammars từ database
|
|
const grammars = await models.Grammar.findAll({
|
|
where: {
|
|
id: grammarIds,
|
|
},
|
|
include: [
|
|
{
|
|
model: models.GrammarMapping,
|
|
as: 'mappings',
|
|
},
|
|
],
|
|
});
|
|
|
|
for (const grammar of grammars) {
|
|
console.log(`[ProcessDataWorker] - Grammar: ${grammar.title} (${grammar.id})`);
|
|
|
|
// TODO: Thực hiện xử lý nghiệp vụ ở đây
|
|
// Ví dụ:
|
|
// - Cập nhật thống kê
|
|
// - Tạo cache
|
|
// - Sync với hệ thống khác
|
|
// - Tạo notification
|
|
// - Log audit
|
|
|
|
items.push({
|
|
grammar_id: grammar.id,
|
|
title: grammar.title,
|
|
grammar_type: grammar.grammar_type,
|
|
mappings_count: grammar.mappings?.length || 0,
|
|
});
|
|
}
|
|
|
|
return {
|
|
count: grammars.length,
|
|
items,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Setup event handlers
|
|
*/
|
|
setupEventHandlers() {
|
|
this.worker.on('completed', (job, returnvalue) => {
|
|
console.log(`[ProcessDataWorker] ✅ Job ${job.id} completed`);
|
|
});
|
|
|
|
this.worker.on('failed', (job, err) => {
|
|
console.error(`[ProcessDataWorker] ❌ Job ${job?.id} failed:`, err.message);
|
|
});
|
|
|
|
this.worker.on('error', (err) => {
|
|
console.error(`[ProcessDataWorker] ❌ Worker error:`, err.message);
|
|
});
|
|
|
|
this.worker.on('ready', () => {
|
|
console.log('[ProcessDataWorker] 🚀 Worker ready and listening for jobs...');
|
|
});
|
|
|
|
this.worker.on('active', (job) => {
|
|
console.log(`[ProcessDataWorker] 🔄 Processing job ${job.id}...`);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Graceful shutdown
|
|
*/
|
|
async close() {
|
|
console.log('[ProcessDataWorker] Closing worker...');
|
|
await this.worker.close();
|
|
console.log('[ProcessDataWorker] Worker closed');
|
|
}
|
|
}
|
|
|
|
// Export class
|
|
module.exports = ProcessDataWorker;
|
|
|
|
// Nếu chạy trực tiếp file này
|
|
if (require.main === module) {
|
|
const worker = new ProcessDataWorker();
|
|
|
|
// Graceful shutdown
|
|
process.on('SIGTERM', async () => {
|
|
console.log('SIGTERM received, closing worker...');
|
|
await worker.close();
|
|
process.exit(0);
|
|
});
|
|
|
|
process.on('SIGINT', async () => {
|
|
console.log('SIGINT received, closing worker...');
|
|
await worker.close();
|
|
process.exit(0);
|
|
});
|
|
|
|
console.log('═══════════════════════════════════════════════════════');
|
|
console.log(' Process Data Worker');
|
|
console.log(' Queue: process-data');
|
|
console.log(' Concurrency: 5');
|
|
console.log('═══════════════════════════════════════════════════════\n');
|
|
}
|