// stickers.js // Copyright (C) 2022 DTP Technologies, LLC // License: Apache-2.0 'use strict'; const DTP_STICKER_HEIGHT = 100; const path = require('path'); const fs = require('fs'); require('dotenv').config({ path: path.resolve(__dirname, '..', '..', '.env') }); const mongoose = require('mongoose'); const { SitePlatform, SiteLog, SiteWorker } = require(path.join(__dirname, '..', '..', 'lib', 'site-lib')); const sharp = require('sharp'); module.pkg = require(path.resolve(__dirname, '..', '..', 'package.json')); module.config = { environment: process.env.NODE_ENV, root: path.resolve(__dirname, '..', '..'), component: { name: 'stickersWorker', slug: 'stickers-worker' }, }; class StickerWorker extends SiteWorker { constructor (dtp) { super(dtp, dtp.config.component); this.processors = { processStickerSharp: this.processStickerSharp.bind(this), processStickerFFMPEG: this.processStickerFFMPEG.bind(this), }; } async start ( ) { await super.start(); const { jobQueue: jobQueueService } = this.dtp.services; this.log.info('registering sticker-ingest job processor', { config: this.dtp.config.jobQueues['sticker-ingest'], }); this.stickerProcessingQueue = jobQueueService.getJobQueue( 'sticker-ingest', this.dtp.config.jobQueues['sticker-ingest'], ); this.stickerProcessingQueue.process('sticker-ingest', 1, this.processStickerIngest.bind(this)); this.stickerProcessingQueue.process('sticker-delete', 1, this.processStickerDelete.bind(this)); } async stop ( ) { if (this.stickerProcessingQueue) { try { this.log.info('closing sticker-ingest job queue'); await this.stickerProcessingQueue.close(); delete this.stickerProcessingQueue; } catch (error) { this.log.error('failed to close sticker ingest job queue', { error }); // fall through } } await super.stop(); } async processStickerIngest (job) { try { this.log.info('received sticker ingest job', { id: job.id, data: job.data }); await this.fetchSticker(job); // defines jobs.data.processor await this.resetSticker(job); // call the chosen file processor to render the sticker for distribution await this.processors[job.data.processor](job); //TODO: emit a completion event which should cause a refresh of the // creator's view to display the processed sticker } catch (error) { this.log.error('failed to process sticker', { stickerId: job.data.stickerId, error }); throw error; } finally { if (job.data.workPath) { this.log.info('cleaning up sticker work path', { workPath: job.data.workPath }); await fs.promises.rm(job.data.workPath, { recursive: true }); } } } async fetchSticker (job) { const { minio: minioService, sticker: stickerService } = this.dtp.services; job.data.sticker = await stickerService.getById(job.data.stickerId, true); job.data.workPath = path.join( process.env.DTP_STICKER_WORK_PATH, this.dtp.config.component.slug, job.data.sticker._id.toString(), ); this.jobLog(job, 'creating work directory', { worthPath: job.data.workPath }); await fs.promises.mkdir(job.data.workPath, { recursive: true }); switch (job.data.sticker.original.type) { case 'image/jpeg': job.data.origFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.jpg`); job.data.outFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.proc.jpg`); job.data.processor = 'processStickerSharp'; job.data.sharpFormat = 'jpeg'; job.data.sharpFormatParameters = { quality: 85 }; break; case 'image/png': job.data.origFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.png`); job.data.outFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.proc.png`); job.data.processor = 'processStickerSharp'; job.data.sharpFormat = 'png'; job.data.sharpFormatParameters = { compression: 9 }; break; case 'image/gif': job.data.origFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.gif`); job.data.outFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.proc.mp4`); job.data.processor = 'processStickerFFMPEG'; break; case 'image/webp': // process as PNG job.data.origFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.webp`); job.data.outFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.proc.png`); job.data.processor = 'processStickerSharp'; job.data.sharpFormat = 'png'; job.data.sharpFormatParameters = { compression: 9 }; break; case 'image/webm': // process as MP4 job.data.origFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.webm`); job.data.outFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.proc.mp4`); job.data.processor = 'processStickerFFMPEG'; break; case 'video/mp4': job.data.origFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.mp4`); job.data.outFilePath = path.join(job.data.workPath, `${job.data.sticker._id}.proc.mp4`); job.data.processor = 'processStickerFFMPEG'; break; default: throw new Error(`unsupported sticker type: ${job.data.sticker.original.type}`); } this.jobLog(job, 'fetching original media', { // blah 62096adcb69874552a0f87bc stickerId: job.data.sticker._id, slug: job.data.sticker.slug, type: job.data.sticker.original.type, worthPath: job.data.origFilePath, }); await minioService.downloadFile({ bucket: job.data.sticker.original.bucket, key: job.data.sticker.original.key, filePath: job.data.origFilePath, }); } async resetSticker (job) { const { minio: minioService } = this.dtp.services; const { sticker } = job.data; if (!sticker.encoded) { return; } this.log.info('removing existing encoded sticker media', { media: sticker.encoded }); await minioService.removeObject(sticker.encoded.bucket, sticker.encoded.key); // switch sticker back to 'processing' status to prevent use in the app const Sticker = mongoose.model('Sticker'); await Sticker.updateOne( { _id: job.data.sticker._id }, { $set: { status: 'processing', }, $unset: { encoded: '', }, }, ); delete sticker.encoded; } async processStickerSharp (job) { const { minio: minioService } = this.dtp.services; const sharpImage = sharp(job.data.origFilePath); const metadata = await sharpImage.metadata(); this.log.info('sticker metadata from Sharp', { stickerId: job.data.sticker._id, metadata }); let chain = sharpImage .clone() .toColorspace('srgb') .resize({ height: DTP_STICKER_HEIGHT }); chain = chain[job.data.sharpFormat](job.data.sharpFormatParameters); await chain.toFile(job.data.outFilePath); job.data.outFileStat = await fs.promises.stat(job.data.outFilePath); const bucket = process.env.MINIO_VIDEO_BUCKET; const key = `/stickers/${job.data.sticker._id.toString().slice(0, 3)}/${job.data.sticker._id}.encoded.${job.data.sharpFormat}`; await minioService.uploadFile({ bucket, key, filePath: job.data.outFilePath, metadata: { 'Content-Type': `image/${job.data.sharpFormat}`, 'Content-Length': job.data.outFileStat.size, }, }); const Sticker = mongoose.model('Sticker'); await Sticker.updateOne( { _id: job.data.sticker._id }, { $set: { status: 'live', encoded: { bucket, key, type: `image/${job.data.sharpFormat}`, size: job.data.outFileStat.size, } }, }, ); } async processStickerFFMPEG (job) { const { media: mediaService, minio: minioService } = this.dtp.services; const codecVideo = (process.env.DTP_GPU_ACCELERATION === 'enabled') ? 'h264_nvenc' : 'libx264'; // generate the encoded sticker // Output height is 100 lines by [aspect] width with width and height being // padded to be divisible by 2. The video stream is given a bit rate of // 128Kbps, and the media is flagged for +faststart. Audio is stripped if // present. const ffmpegStickerArgs = [ '-y', '-i', job.data.origFilePath, '-vf', `scale=-1:${DTP_STICKER_HEIGHT},pad=ceil(iw/2)*2:ceil(ih/2)*2`, '-pix_fmt', 'yuv420p', '-c:v', codecVideo, '-b:v', '128k', '-movflags', '+faststart', '-an', job.data.outFilePath, ]; this.jobLog(job, `transcoding motion sticker: ${job.data.sticker.slug}`); this.log.debug('transcoding motion sticker', { ffmpegStickerArgs }); await mediaService.ffmpeg(ffmpegStickerArgs); job.data.outFileStat = await fs.promises.stat(job.data.outFilePath); const bucket = process.env.MINIO_VIDEO_BUCKET; const key = `/stickers/${job.data.sticker._id.toString().slice(0, 3)}/${job.data.sticker._id}.encoded.mp4`; this.jobLog(job, 'uploading encoded media file'); await minioService.uploadFile({ bucket, key, filePath: job.data.outFilePath, metadata: { 'Content-Type': 'video/mp4', 'Content-Length': job.data.outFileStat.size, }, }); this.jobLog(job, 'updating Sticker to live status'); const Sticker = mongoose.model('Sticker'); await Sticker.updateOne( { _id: job.data.sticker._id }, { $set: { status: 'live', encoded: { bucket, key, type: 'video/mp4', size: job.data.outFileStat.size, }, }, }, ); } async processStickerDelete (job) { const { minio: minioService, sticker: stickerService } = this.dtp.services; const Sticker = mongoose.model('Sticker'); try { const sticker = await stickerService.getById(job.data.stickerId, true); this.log.info('removing original media', { stickerId: sticker._id, slug: sticker.slug }); await minioService.removeObject(sticker.original.bucket, sticker.original.key); if (sticker.encoded) { this.log.info('removing encoded media', { stickerId: sticker._id, slug: sticker.slug }); await minioService.removeObject(sticker.encoded.bucket, sticker.encoded.key); } this.log.info('removing sticker', { stickerId: sticker._id, slug: sticker.slug }); await Sticker.deleteOne({ _id: sticker._id }); } catch (error) { this.log.error('failed to delete sticker', { stickerId: job.data.stickerId, error }); throw error; // for job report } } async jobLog (job, message, data = { }) { job.log(message); this.log.info(message, { jobId: job.id, ...data }); } } (async ( ) => { try { module.log = new SiteLog(module, module.config.componentName); /* * Platform startup */ await SitePlatform.startPlatform(module, module.config.component); module.worker = new StickerWorker(module); await module.worker.start(); /* * Worker startup */ if (process.argv[2]) { const stickerId = mongoose.Types.ObjectId(process.argv[2]); this.log.info('creating sticker processing job', { stickerId }); await module.worker.stickerProcessingQueue.add('sticker-ingest', { stickerId }); } module.log.info(`${module.pkg.name} v${module.pkg.version} ${module.config.component.name} started`); } catch (error) { module.log.error('failed to start worker', { component: module.config.component, error, }); process.exit(-1); } })();