// media/job/attachment-ingest.js // Copyright (C) 2022 DTP Technologies, LLC // License: Apache-2.0 'use strict'; const path = require('path'); const fs = require('fs'); const sharp = require('sharp'); const ATTACHMENT_IMAGE_HEIGHT = 540; const mongoose = require('mongoose'); const { response } = require('express'); const Attachment = mongoose.model('Attachment'); const { SiteWorkerProcess } = require(path.join(__dirname, '..', '..', '..', '..', 'lib', 'site-lib')); class AttachmentIngestJob extends SiteWorkerProcess { static get COMPONENT ( ) { return { name: 'attachmentIngestJob', slug: 'attachment-ingest-job', }; } constructor (worker) { super(worker, AttachmentIngestJob.COMPONENT); this.processors = { processAttachmentSharp: this.processAttachmentSharp.bind(this), processAttachmentFFMPEG: this.processAttachmentFFMPEG.bind(this), }; } async start ( ) { await super.start(); this.queue = await this.getJobQueue('media'); this.log.info('registering job processor', { queue: this.queue.name, name: 'attachment-ingest' }); this.queue.process('attachment-ingest', 1, this.processAttachmentIngest.bind(this)); } async stop ( ) { await super.stop(); } async processAttachmentIngest (job) { const { attachment: attachmentService } = this.dtp.services; const { attachmentId } = job.data; this.log.info('received attachment-ingest job', { id: job.id, attachmentId }); try { job.data.attachment = await attachmentService.getById(attachmentId, { withOriginal: true }); await this.resetAttachment(job); await this.fetchAttachmentFile(job); await this.processors[job.data.processor](job); //TODO: emit a completion event which should cause a refresh of the // creator's view to display the processed attachment } catch (error) { this.log.error('failed to process attachment for ingest', { attachmentId: job.data.attachmentId, error }); throw error; } finally { if (job.data.workPath) { this.log.info('removing attachment work path'); await fs.promises.rmdir(job.data.workPath, { recursive: true, force: true }); delete job.data.workPath; } } } async fetchAttachmentFile (job) { const { minio: minioService } = this.dtp.services; try { const { attachment } = job.data; job.data.workPath = path.join( process.env.DTP_ATTACHMENT_WORK_PATH, AttachmentIngestJob.COMPONENT.slug, attachment._id.toString(), ); this.jobLog(job, 'creating work directory', { worthPath: job.data.workPath }); await fs.promises.mkdir(job.data.workPath, { recursive: true }); switch (attachment.original.mime) { case 'image/jpeg': job.data.origFilePath = path.join(job.data.workPath, `${attachment._id}.jpg`); job.data.procFilePath = path.join(job.data.workPath, `${attachment._id}.proc.jpg`); job.data.processor = 'processAttachmentSharp'; job.data.sharpFormat = 'jpeg'; job.data.sharpFormatParameters = { quality: 85 }; break; case 'image/png': job.data.origFilePath = path.join(job.data.workPath, `${attachment._id}.png`); job.data.procFilePath = path.join(job.data.workPath, `${attachment._id}.proc.png`); job.data.processor = 'processAttachmentSharp'; job.data.sharpFormat = 'png'; job.data.sharpFormatParameters = { compression: 9 }; break; case 'image/gif': job.data.origFilePath = path.join(job.data.workPath, `${attachment._id}.gif`); job.data.procFilePath = path.join(job.data.workPath, `${attachment._id}.proc.mp4`); job.data.processor = 'processAttachmentFFMPEG'; break; case 'image/webp': // process as PNG job.data.origFilePath = path.join(job.data.workPath, `${attachment._id}.webp`); job.data.procFilePath = path.join(job.data.workPath, `${attachment._id}.proc.png`); job.data.processor = 'processAttachmentSharp'; job.data.sharpFormat = 'png'; job.data.sharpFormatParameters = { compression: 9 }; break; default: throw new Error(`unsupported attachment type: ${attachment.original.mime}`); } this.jobLog(job, 'fetching attachment original file', { attachmentId: attachment._id, mime: attachment.original.mime, size: attachment.original.size, worthPath: job.data.origFilePath, }); await minioService.downloadFile({ bucket: attachment.original.bucket, key: attachment.original.key, filePath: job.data.origFilePath, }); } catch (error) { this.log.error('failed to fetch attachment file', { attachmentId: job.data.attachmentId, error }); throw error; } } async resetAttachment (job) { const { minio: minioService } = this.dtp.services; const { attachment } = job.data; const updateOp = { $set: { status: 'processing' } }; if (attachment.encoded) { this.log.info('removing existing encoded attachment file', { file: attachment.encoded }); await minioService.removeObject(attachment.encoded.bucket, attachment.encoded.key); delete attachment.encoded; updateOp.$unset = { encoded: '' }; } await Attachment.updateOne({ _id: attachment._id }, updateOp); } async processAttachmentSharp (job) { const { attachment: attachmentService, minio: minioService } = this.dtp.services; const { attachment } = job.data; const attachmentId = attachment._id; const sharpImage = sharp(job.data.origFilePath); const metadata = await sharpImage.metadata(); this.log.info('attachment metadata from Sharp', { attachmentId, metadata }); let chain = sharpImage .clone() .toColorspace('srgb') .resize({ height: ATTACHMENT_IMAGE_HEIGHT }); chain = chain[job.data.sharpFormat](job.data.sharpFormatParameters); await chain.toFile(job.data.procFilePath); job.data.outFileStat = await fs.promises.stat(job.data.procFilePath); const bucket = process.env.MINIO_ATTACHMENT_BUCKET; const key = attachmentService.getAttachmentKey(attachment, 'processed'); const response = await minioService.uploadFile({ bucket, key, filePath: job.data.procFilePath, metadata: { 'Content-Type': `image/${job.data.sharpFormat}`, 'Content-Length': job.data.outFileStat.size, }, }); await Attachment.updateOne( { _id: job.data.attachment._id }, { $set: { status: 'live', encoded: { bucket, key, mime: `image/${job.data.sharpFormat}`, size: job.data.outFileStat.size, etag: response.etag, }, }, }, ); } async processAttachmentFFMPEG (job) { const { attachment: attachmentService, media: mediaService, minio: minioService, } = this.dtp.services; const { attachment } = job.data; const codecVideo = (process.env.DTP_GPU_ACCELERATION === 'enabled') ? 'h264_nvenc' : 'libx264'; // generate the encoded attachment // Output height is 100 lines by [aspect] width with width and height being // padded to be divisible by 2. The video stream is given a bit rate of // 128Kbps, and the media is flagged for +faststart. Audio is stripped if // present. const ffmpegArgs = [ '-y', '-i', job.data.origFilePath, '-vf', `scale=-1:${ATTACHMENT_IMAGE_HEIGHT},pad=ceil(iw/2)*2:ceil(ih/2)*2`, '-pix_fmt', 'yuv420p', '-c:v', codecVideo, '-b:v', '128k', '-movflags', '+faststart', '-an', job.data.procFilePath, ]; this.log.debug('transcoding attachment', { ffmpegArgs }); await mediaService.ffmpeg(ffmpegArgs); job.data.outFileStat = await fs.promises.stat(job.data.procFilePath); const bucket = process.env.MINIO_VIDEO_BUCKET; const key = attachmentService.getAttachmentKey(attachment, 'processed'); this.jobLog(job, 'uploading processed media file'); const response = await minioService.uploadFile({ bucket, key, filePath: job.data.procFilePath, metadata: { 'Content-Type': 'video/mp4', 'Content-Length': job.data.outFileStat.size, }, }); await Attachment.updateOne( { _id: attachment._id }, { $set: { status: 'live', encoded: { bucket, key, mime: 'video/mp4', size: job.data.outFileStat.size, etag: response.etag, }, }, }, ); } } module.exports = AttachmentIngestJob;