const { Queue, Worker } = require('bullmq'); const Redis = require('ioredis'); const config = require('./config.json'); /** * BullMQ Connection Configuration - Direct Redis connection */ const redisConfig = config.redis; // Connection options for BullMQ (BullMQ will create connections using these options) const connectionOptions = { host: redisConfig.cluster[0].host, port: redisConfig.cluster[0].port, password: redisConfig.password, db: redisConfig.db, maxRetriesPerRequest: null, enableReadyCheck: false, }; // Create a shared Redis connection for direct Redis operations const bullMQConnection = new Redis(connectionOptions); console.log('📊 BullMQ Redis Connection:', { host: connectionOptions.host, port: connectionOptions.port, db: connectionOptions.db, }); bullMQConnection.on('connect', () => { console.log('✅ BullMQ Redis connected'); }); bullMQConnection.on('error', (err) => { console.error('❌ BullMQ Redis error:', err.message); }); /** * Default job options */ const defaultJobOptions = { attempts: 3, backoff: { type: 'exponential', delay: 2000, }, removeOnComplete: { age: 24 * 3600, // Keep completed jobs for 24 hours count: 1000, }, removeOnFail: { age: 7 * 24 * 3600, // Keep failed jobs for 7 days }, }; /** * Queue definitions for different operations */ const QueueNames = { DATABASE_WRITE: 'database-write', NOTIFICATION: 'notification', ATTENDANCE_PROCESS: 'attendance-process', GRADE_CALCULATION: 'grade-calculation', REPORT_GENERATION: 'report-generation', }; /** * Create queues with connection options */ const queues = {}; Object.values(QueueNames).forEach(queueName => { queues[queueName] = new Queue(queueName, { connection: connectionOptions, // Use connection options, not instance prefix: process.env.BULLMQ_PREFIX || 'vcb', defaultJobOptions, }); queues[queueName].on('error', (error) => { console.error(`❌ Queue ${queueName} error:`, error.message); }); console.log(`✅ Queue ${queueName} initialized`); }); /** * Add job to database write queue * @param {string} operation - Operation type: 'create', 'update', 'delete' * @param {string} model - Model name * @param {object} data - Data to be written * @param {object} options - Additional options */ const addDatabaseWriteJob = async (operation, model, data, options = {}) => { try { const job = await queues[QueueNames.DATABASE_WRITE].add( `${operation}-${model}`, { operation, model, data, timestamp: new Date().toISOString(), userId: options.userId, ...options, }, { priority: options.priority || 5, delay: options.delay || 0, } ); console.log(`✅ Database write job added: ${job.id} (${operation} ${model})`); return job; } catch (error) { console.error(`❌ Error adding database write job:`, error.message); throw error; } }; /** * Add notification job */ const addNotificationJob = async (type, recipients, content, options = {}) => { try { const job = await queues[QueueNames.NOTIFICATION].add( `send-${type}`, { type, recipients, content, timestamp: new Date().toISOString(), ...options, }, { priority: options.priority || 5, } ); console.log(`✅ Notification job added: ${job.id}`); return job; } catch (error) { console.error(`❌ Error adding notification job:`, error.message); throw error; } }; /** * Add attendance processing job */ const addAttendanceProcessJob = async (schoolId, date, options = {}) => { try { const job = await queues[QueueNames.ATTENDANCE_PROCESS].add( 'process-attendance', { schoolId, date, timestamp: new Date().toISOString(), ...options, }, { priority: options.priority || 3, } ); console.log(`✅ Attendance process job added: ${job.id}`); return job; } catch (error) { console.error(`❌ Error adding attendance process job:`, error.message); throw error; } }; /** * Add grade calculation job */ const addGradeCalculationJob = async (studentId, academicYearId, options = {}) => { try { const job = await queues[QueueNames.GRADE_CALCULATION].add( 'calculate-grades', { studentId, academicYearId, timestamp: new Date().toISOString(), ...options, }, { priority: options.priority || 4, } ); console.log(`✅ Grade calculation job added: ${job.id}`); return job; } catch (error) { console.error(`❌ Error adding grade calculation job:`, error.message); throw error; } }; /** * Get queue metrics */ const getQueueMetrics = async (queueName) => { try { const queue = queues[queueName]; if (!queue) { throw new Error(`Queue ${queueName} not found`); } const [waiting, active, completed, failed, delayed] = await Promise.all([ queue.getWaitingCount(), queue.getActiveCount(), queue.getCompletedCount(), queue.getFailedCount(), queue.getDelayedCount(), ]); return { queueName, waiting, active, completed, failed, delayed, total: waiting + active + completed + failed + delayed, }; } catch (error) { console.error(`❌ Error getting queue metrics:`, error.message); throw error; } }; /** * Close all queues */ const closeQueues = async () => { try { await Promise.all( Object.values(queues).map(queue => queue.close()) ); console.log('✅ All BullMQ queues closed'); } catch (error) { console.error('❌ Error closing queues:', error.message); } }; module.exports = { queues, QueueNames, addDatabaseWriteJob, addNotificationJob, addAttendanceProcessJob, addGradeCalculationJob, getQueueMetrics, closeQueues, bullMQConnection, connectionOptions, defaultJobOptions, };