// job-queue.js // Copyright (C) 2022 DTP Technologies, LLC // License: Apache-2.0 'use strict'; const BullQueue = require('bull'); const { /*SiteError,*/ SiteService } = require('../../lib/site-lib'); class JobQueueService extends SiteService { constructor (dtp) { super(dtp, module.exports); this.queues = { }; } getJobQueue (name, defaultJobOptions) { /* * If we have a named queue, return it. */ let queue = this.queues[name]; if (queue) { return queue; } /* * Create a new named queue */ defaultJobOptions = Object.assign({ priority: 10, delay: 0, attempts: 1, removeOnComplete: true, removeOnFail: false, }, defaultJobOptions); queue = new BullQueue(name, { prefix: process.env.REDIS_KEY_PREFIX || 'dtp', redis: { host: process.env.REDIS_HOST, port: parseInt(process.env.REDIS_PORT || '6379', 10), password: process.env.REDIS_PASSWORD, keyPrefix: process.env.REDIS_KEY_PREFIX, }, defaultJobOptions, }); queue.setMaxListeners(64); this.queues[name] = queue; return queue; } async discoverJobQueues (pattern) { const { cache: cacheService } = this.dtp.services; let bullQueues = await cacheService.getKeys(pattern); return bullQueues .map((queue) => queue.split(':')[1]) .sort() ; } } module.exports = { slug: 'job-queue', name: 'jobQueue', create: (dtp) => { return new JobQueueService(dtp); }, };