From 5fd0b22f3f0a12afafb60a7c3fe3a9b9427f04bc Mon Sep 17 00:00:00 2001 From: rob Date: Thu, 4 Aug 2022 04:45:06 -0400 Subject: [PATCH] added chat worker; all workers start --- app/workers/chat.js | 66 ++++++++++++ app/workers/chat/job/chat-room-clear.js | 16 +-- app/workers/chat/job/chat-room-delete.js | 6 +- app/workers/newsletter.js | 97 +---------------- app/workers/newsletter/job/email-send.js | 63 +++++++++++ app/workers/newsletter/job/transmit.js | 100 ++++++++++++++++++ .../reeeper/cron/expire-crashed-hosts.js | 1 + config/job-queues.js | 4 + lib/site-common.js | 4 + lib/site-worker.js | 30 +++++- 10 files changed, 274 insertions(+), 113 deletions(-) create mode 100644 app/workers/chat.js create mode 100644 app/workers/newsletter/job/email-send.js create mode 100644 app/workers/newsletter/job/transmit.js diff --git a/app/workers/chat.js b/app/workers/chat.js new file mode 100644 index 0000000..0b5a2e7 --- /dev/null +++ b/app/workers/chat.js @@ -0,0 +1,66 @@ +// chat.js +// Copyright (C) 2022 DTP Technologies, LLC +// License: Apache-2.0 + +'use strict'; + +const path = require('path'); +require('dotenv').config({ path: path.resolve(__dirname, '..', '..', '.env') }); + +const mongoose = require('mongoose'); + +const { SiteLog, SiteWorker, SiteAsync } = require(path.join(__dirname, '..', '..', 'lib', 'site-lib')); + +module.rootPath = path.resolve(__dirname, '..', '..'); +module.pkg = require(path.resolve(__dirname, '..', '..', 'package.json')); + +module.config = { + environment: process.env.NODE_ENV, + root: module.rootPath, + component: { name: 'chatWorker', slug: 'chat-worker' }, +}; + +module.config.site = require(path.join(module.rootPath, 'config', 'site')); + +class ChatWorker extends SiteWorker { + + constructor (dtp) { + super(dtp, dtp.config.component); + } + + async start ( ) { + await super.start(); + + await this.loadProcessor(path.join(__dirname, 'chat', 'job', 'chat-room-clear.js')); + await this.loadProcessor(path.join(__dirname, 'chat', 'job', 'chat-room-delete.js')); + + await this.startProcessors(); + } + + async stop ( ) { + await super.stop(); + } + + async deleteChatMessage (message) { + const { attachment: attachmentService } = this.dtp.services; + const ChatMessage = mongoose.model('ChatMessage'); + + await SiteAsync.each(message.attachments, attachmentService.remove.bind(attachmentService), 2); + await ChatMessage.deleteOne({ _id: message._id }); + } +} + +(async ( ) => { + try { + module.log = new SiteLog(module, module.config.component); + + module.worker = new ChatWorker(module); + await module.worker.start(); + + module.log.info(`${module.pkg.name} v${module.pkg.version} ${module.config.component.name} started`); + } catch (error) { + module.log.error('failed to start worker', { component: module.config.component, error }); + process.exit(-1); + } + +})(); \ No newline at end of file diff --git a/app/workers/chat/job/chat-room-clear.js b/app/workers/chat/job/chat-room-clear.js index d51b2a7..1598c4e 100644 --- a/app/workers/chat/job/chat-room-clear.js +++ b/app/workers/chat/job/chat-room-clear.js @@ -8,19 +8,10 @@ const path = require('path'); const mongoose = require('mongoose'); -const ChatRoom = mongoose.model('ChatRoom'); -const ChatRoomInvite = mongoose.model('ChatRoomInvite'); const ChatMessage = mongoose.model('ChatMessage'); -const EmojiReaction = mongoose.model('EmojiReaction'); -const { SiteWorkerProcess } = require(path.join(__dirname, '..', '..', 'lib', 'site-lib')); +const { SiteWorkerProcess } = require(path.join(__dirname, '..', '..', '..', '..', 'lib', 'site-lib')); -/** - * DTP Core Chat sticker processor can receive requests to ingest and delete - * stickers to be executed as background jobs in a queue. This processor - * attaches to the `media` queue and registers processors for `sticker-ingest` - * and `sticker-delete`. - */ class ChatRoomClearJob extends SiteWorkerProcess { static get COMPONENT ( ) { @@ -37,10 +28,10 @@ class ChatRoomClearJob extends SiteWorkerProcess { async start ( ) { await super.start(); - const queue = this.getJobQueue('chat'); + this.queue = await this.getJobQueue('chat'); this.log.info('registering job processor', { queue: this.queue.name, name: 'chat-room-clear' }); - queue.process('chat-room-clear', this.processChatRoomClear.bind(this)); + this.queue.process('chat-room-clear', this.processChatRoomClear.bind(this)); } async stop ( ) { @@ -50,7 +41,6 @@ class ChatRoomClearJob extends SiteWorkerProcess { async processChatRoomClear (job) { const { roomId } = job.data; this.log.info('received chat room clear job', { id: job.id, roomId }); - await ChatMessage .find({ room: roomId }) .cursor() diff --git a/app/workers/chat/job/chat-room-delete.js b/app/workers/chat/job/chat-room-delete.js index 586496e..7c06a00 100644 --- a/app/workers/chat/job/chat-room-delete.js +++ b/app/workers/chat/job/chat-room-delete.js @@ -13,7 +13,7 @@ const ChatRoomInvite = mongoose.model('ChatRoomInvite'); const ChatMessage = mongoose.model('ChatMessage'); const EmojiReaction = mongoose.model('EmojiReaction'); -const { SiteWorkerProcess } = require(path.join(__dirname, '..', '..', 'lib', 'site-lib')); +const { SiteWorkerProcess } = require(path.join(__dirname, '..', '..', '..', '..', 'lib', 'site-lib')); /** * DTP Core Chat sticker processor can receive requests to ingest and delete @@ -37,10 +37,10 @@ class ChatRoomDeleteJob extends SiteWorkerProcess { async start ( ) { await super.start(); - const queue = this.getJobQueue('chat'); + this.queue = await this.getJobQueue('chat'); this.log.info('registering job processor', { queue: this.queue.name, name: 'chat-room-delete' }); - queue.process('chat-room-delete', this.processChatRoomDelete.bind(this)); + this.queue.process('chat-room-delete', this.processChatRoomDelete.bind(this)); } async stop ( ) { diff --git a/app/workers/newsletter.js b/app/workers/newsletter.js index 9efb0a3..25b201a 100644 --- a/app/workers/newsletter.js +++ b/app/workers/newsletter.js @@ -7,8 +7,6 @@ const path = require('path'); require('dotenv').config({ path: path.resolve(__dirname, '..', '..', '.env') }); -const mongoose = require('mongoose'); - const { SiteWorker, SiteLog } = require(path.join(__dirname, '..', '..', 'lib', 'site-lib')); module.pkg = require(path.resolve(__dirname, '..', '..', 'package.json')); @@ -21,18 +19,16 @@ class NewsletterWorker extends SiteWorker { constructor (dtp) { super(dtp, dtp.config.component); - this.newsletters = this.newsletters || { }; + this.newsletters = { }; } async start ( ) { await super.start(); - const { jobQueue: jobQueueService } = this.dtp.services; - this.jobQueue = await jobQueueService.getJobQueue('newsletter', { - attempts: 3, - }); - this.jobQueue.process('transmit', this.transmitNewsletter.bind(this)); - this.jobQueue.process('email-send', this.sendNewsletterEmail.bind(this)); + await this.loadProcessor(path.join(__dirname, 'newsletter', 'job', 'transmit.js')); + await this.loadProcessor(path.join(__dirname, 'newsletter', 'job', 'email-send.js')); + + await this.startProcessors(); } async stop ( ) { @@ -53,89 +49,6 @@ class NewsletterWorker extends SiteWorker { } return newsletter; } - - async transmitNewsletter (job) { - const User = mongoose.model('User'); - const NewsletterRecipient = mongoose.model('NewsletterRecipient'); - this.log.info('newsletter email job received', { data: job.data }); - try { - /* - * Transmit first to all local user accounts with verified email who've - * opted in for receiving marketing email. - */ - await User - .find({ - 'flags.isEmailVerified': true, - 'optIn.marketing': true, - }) - .select('email displayName username username_lc') - .lean() - .cursor() - .eachAsync(async (user) => { - try { - const jobData = { - newsletterId: job.data.newsletterId, - recipient: user.email, - recipientName: user.displayName || user.username, - }; - const jobOptions = { attempts: 3 }; - await this.jobQueue.add('email-send', jobData, jobOptions); - } catch (error) { - this.log.error('failed to create newsletter email job', { error }); - } - }, { parallel: 4 }); - - /* - * Transmit to all newsletter recipients on file who've joined through the - * widget on the site w/o signing up for an account. - */ - await NewsletterRecipient - .find({ 'flags.isVerified': true, 'flags.isOptIn': true, 'flags.isRejected': false }) - .lean() - .cursor() - .eachAsync(async (recipient) => { - try { - const jobData = { - newsletterId: job.data.newsletterId, - recipient: recipient.address, - }; - const jobOptions = { attempts: 3 }; - await this.jobQueue.add('email-send', jobData, jobOptions); - } catch (error) { - this.log.error('failed to create newsletter email job', { error }); - } - }, { parallel: 4 }); - } catch (error) { - this.log.error('failed to send newsletter', { newsletterId: job.data.newsletterId, error }); - throw error; - } - } - - async sendNewsletterEmail (job) { - const { email: emailService } = this.dtp.services; - const { newsletterId, recipient } = job.data; - - try { - let newsletter = await this.loadNewsletter(newsletterId); - if (!newsletter) { - throw new Error('newsletter not found'); - } - - const result = await emailService.send({ - from: process.env.DTP_EMAIL_SMTP_FROM || `noreply@${this.dtp.config.site.domainKey}`, - to: recipient, - subject: newsletter.title, - html: newsletter.content.html, - text: newsletter.content.text, - }); - - job.log(`newsletter email sent: ${result}`); - this.log.info('newsletter email sent', { recipient, result }); - } catch (error) { - this.log.error('failed to send newsletter email', { newsletterId, recipient, error }); - throw error; // throw error to Bull so it can report in job reports - } - } } (async ( ) => { diff --git a/app/workers/newsletter/job/email-send.js b/app/workers/newsletter/job/email-send.js new file mode 100644 index 0000000..e88343a --- /dev/null +++ b/app/workers/newsletter/job/email-send.js @@ -0,0 +1,63 @@ +// newsletter/job/email-send.js +// Copyright (C) 2022 DTP Technologies, LLC +// License: Apache-2.0 + +'use strict'; + +const path = require('path'); + +const { SiteWorkerProcess } = require(path.join(__dirname, '..', '..', '..', '..', 'lib', 'site-lib')); + +class NewsletterEmailSendJob extends SiteWorkerProcess { + + static get COMPONENT ( ) { + return { + name: 'newsletterEmailSendJob', + slug: 'newsletter-email-send-job', + }; + } + + constructor (worker) { + super(worker, NewsletterEmailSendJob.COMPONENT); + } + + async start ( ) { + await super.start(); + + this.queue = await this.getJobQueue('newsletter'); + + this.log.info('registering job processor', { queue: this.queue.name, name: 'email-send' }); + this.queue.process('email-send', this.processEmailSend.bind(this)); + } + + async stop ( ) { + await super.stop(); + } + + async processEmailSend (job) { + const { email: emailService } = this.dtp.services; + const { newsletterId, recipient } = job.data; + + try { + let newsletter = await this.worker.loadNewsletter(newsletterId); + if (!newsletter) { + throw new Error('newsletter not found'); + } + + const result = await emailService.send({ + from: process.env.DTP_EMAIL_SMTP_FROM || `noreply@${this.dtp.config.site.domainKey}`, + to: recipient, + subject: newsletter.title, + html: newsletter.content.html, + text: newsletter.content.text, + }); + + this.jobLog(job, 'newsletter email sent', { result }); + } catch (error) { + this.log.error('failed to send newsletter email', { newsletterId, recipient, error }); + throw error; // throw error to Bull so it can report in job reports + } + } +} + +module.exports = NewsletterEmailSendJob; \ No newline at end of file diff --git a/app/workers/newsletter/job/transmit.js b/app/workers/newsletter/job/transmit.js new file mode 100644 index 0000000..3b171d7 --- /dev/null +++ b/app/workers/newsletter/job/transmit.js @@ -0,0 +1,100 @@ +// newsletter/job/transmit.js +// Copyright (C) 2022 DTP Technologies, LLC +// License: Apache-2.0 + +'use strict'; + +const path = require('path'); + +const mongoose = require('mongoose'); + +const User = mongoose.model('User'); +const NewsletterRecipient = mongoose.model('NewsletterRecipient'); + +const { SiteWorkerProcess } = require(path.join(__dirname, '..', '..', '..', '..', 'lib', 'site-lib')); + +class NewsletterTransmitJob extends SiteWorkerProcess { + + static get COMPONENT ( ) { + return { + name: 'newsletterTransmitJob', + slug: 'newsletter-transmit-job', + }; + } + + constructor (worker) { + super(worker, NewsletterTransmitJob.COMPONENT); + } + + async start ( ) { + await super.start(); + + this.queue = await this.getJobQueue('newsletter'); + + this.log.info('registering job processor', { queue: this.queue.name, name: 'transmit' }); + this.queue.process('transmit', this.processTransmit.bind(this)); + } + + async stop ( ) { + await super.stop(); + } + + async processTransmit (job) { + const { newsletterId } = job.data; + this.log.info('newsletter email job received', { id: job.id, newsletterId }); + + try { + /* + * Transmit first to all local user accounts with verified email who've + * opted in for receiving marketing email. + */ + await User + .find({ + 'flags.isEmailVerified': true, + 'optIn.marketing': true, + }) + .select('email displayName username username_lc') + .lean() + .cursor() + .eachAsync(async (user) => { + try { + const jobData = { + newsletterId: newsletterId, + recipient: user.email, + recipientName: user.displayName || user.username, + }; + const jobOptions = { attempts: 3 }; + await this.queue.add('email-send', jobData, jobOptions); + } catch (error) { + this.log.error('failed to create newsletter email job', { error }); + } + }, { parallel: 4 }); + + /* + * Transmit to all newsletter recipients on file who've joined through the + * widget on the site w/o signing up for an account. + */ + await NewsletterRecipient + .find({ 'flags.isVerified': true, 'flags.isOptIn': true, 'flags.isRejected': false }) + .lean() + .cursor() + .eachAsync(async (recipient) => { + try { + const jobData = { + newsletterId: newsletterId, + recipient: recipient.address, + }; + const jobOptions = { attempts: 3 }; + await this.queue.add('email-send', jobData, jobOptions); + } catch (error) { + this.log.error('failed to create newsletter email job', { error }); + } + }, { parallel: 4 }); + } catch (error) { + this.log.error('failed to send newsletter', { newsletterId, error }); + throw error; + } + } +} + +module.exports = NewsletterTransmitJob; \ No newline at end of file diff --git a/app/workers/reeeper/cron/expire-crashed-hosts.js b/app/workers/reeeper/cron/expire-crashed-hosts.js index e85940f..793b0b5 100644 --- a/app/workers/reeeper/cron/expire-crashed-hosts.js +++ b/app/workers/reeeper/cron/expire-crashed-hosts.js @@ -57,6 +57,7 @@ class CrashedHostsCron extends SiteWorkerProcess { async expireCrashedHosts ( ) { try { + this.log.debug('expiring crashed hosts'); await NetHost .find({ status: 'crashed' }) .select('_id hostname') diff --git a/config/job-queues.js b/config/job-queues.js index 1fb3b31..0c55228 100644 --- a/config/job-queues.js +++ b/config/job-queues.js @@ -5,6 +5,10 @@ 'use strict'; module.exports = { + 'chat': { + attempts: 5, + removeOnComplete: true, + }, 'media': { attempts: 3, removeOnComplete: true, diff --git a/lib/site-common.js b/lib/site-common.js index fd363ff..c3bce95 100644 --- a/lib/site-common.js +++ b/lib/site-common.js @@ -42,6 +42,10 @@ class SiteCommon extends Events { } async getJobQueue (name) { + if (this.jobQueues[name]) { + return this.jobQueues[name]; + } + const { jobQueue: jobQueueService } = this.dtp.services; const config = this.dtp.config.jobQueues[name]; diff --git a/lib/site-worker.js b/lib/site-worker.js index 9b79951..3ad0cce 100644 --- a/lib/site-worker.js +++ b/lib/site-worker.js @@ -70,7 +70,7 @@ class SiteWorker extends SiteCommon { const processor = new ProcessorClass(this); const { COMPONENT } = ProcessorClass; - this.log.info('registering worker processor', { component: COMPONENT }); + this.log.info('loading worker processor', { component: COMPONENT.name }); this.processors[COMPONENT.name] = processor; return processor; @@ -84,8 +84,18 @@ class SiteWorker extends SiteCommon { async startProcessors ( ) { const slugs = Object.keys(this.processors); await SiteAsync.each(slugs, async (slug) => { - this.log.info('starting worker processor', { slug }); - await this.processors[slug].start(); + const processor = this.processors[slug]; + try { + this.log.info('starting worker processor', { + component: processor.component.name, + }); + await processor.start(); + } catch (error) { + this.log.error('failed to start processor', { + component: processor.component.name, + error, + }); + } }, 1); } @@ -95,8 +105,18 @@ class SiteWorker extends SiteCommon { async stop ( ) { const slugs = Object.keys(this.processors); await SiteAsync.each(slugs, async (slug) => { - this.log.info('stopping worker processor', { slug }); - await this.processors[slug].stop(); + const processor = this.processors[slug]; + try { + this.log.info('stopping worker processor', { + component: processor.component.name, + }); + await processor.stop(); + } catch (error) { + this.log.error('failed to stop processor', { + component: processor.component.name, + error, + }); + } }, 1); } }