// chat.js // Copyright (C) 2022 DTP Technologies, LLC // License: Apache-2.0 'use strict'; const mongoose = require('mongoose'); const ChatRoom = mongoose.model('ChatRoom'); const ChatRoomInvite = mongoose.model('ChatRoomInvite'); const ChatMessage = mongoose.model('ChatMessage'); const EmojiReaction = mongoose.model('EmojiReaction'); const ioEmitter = require('socket.io-emitter'); const moment = require('moment'); const marked = require('marked'); const hljs = require('highlight.js'); const striptags = require('striptags'); const unzalgo = require('unzalgo'); const stringSimilarity = require('string-similarity'); const { SiteService, SiteError } = require('../../lib/site-lib'); class ChatService extends SiteService { constructor (dtp) { super(dtp, module.exports); } async start ( ) { const { jobQueue: jobQueueService, user: userService, limiter: limiterService, } = this.dtp.services; await super.start(); this.populateChatMessage = [ { path: 'channel', }, { path: 'author', select: userService.USER_SELECT, }, { path: 'stickers', }, ]; this.populateChatRoom = [ { path: 'owner', select: userService.USER_SELECT, }, { path: 'members.member', select: userService.USER_SELECT, }, ]; this.populateChatRoomInvite = [ { path: 'room', populate: [ { path: 'owner', select: userService.USER_SELECT, }, ], }, { path: 'member', select: userService.USER_SELECT, }, ]; this.templates = { chatMessage: this.loadViewTemplate('chat/components/message-standalone.pug'), }; this.markedRenderer = new marked.Renderer(); this.markedRenderer.link = (href, title, text) => { return text; }; this.markedRenderer.image = (href, title, text) => { return text; }; this.markedConfig = { renderer: this.markedRenderer, highlight: function(code, lang) { const language = hljs.getLanguage(lang) ? lang : 'plaintext'; return hljs.highlight(code, { language }).value; }, langPrefix: 'hljs language-', // highlight.js css expects a top-level 'hljs' class. pedantic: false, gfm: true, breaks: false, sanitize: false, smartLists: true, smartypants: false, xhtml: false, }; this.chatMessageLimiter = limiterService.createRateLimiter({ points: 20, duration: 60, blockDuration: 60 * 3, execEvenly: false, keyPrefix: 'rl:chatmsg', }); this.reactionLimiter = limiterService.createRateLimiter({ points: 60, duration: 60, blockDuration: 60 * 3, execEvenly: false, keyPrefix: 'rl:react', }); /* * The Redis Emitter is a Socket.io-compatible message emitter that operates * with greater efficiency than using Socket.io itself. */ this.emitter = ioEmitter(this.dtp.redis); this.queues = { media: jobQueueService.getJobQueue('media', this.dtp.config.jobQueues.media), reeeper: jobQueueService.getJobQueue('reeeper', this.dtp.config.jobQueues.reeeper), }; } async renderTemplate (which, viewModel) { if (!this.templates || !this.templates[which]) { throw new Error('Chat service template does not exist'); } viewModel = Object.assign(viewModel, this.dtp.app.locals); return this.templates[which](viewModel); } middleware (options) { options = Object.assign({ maxOwnedRooms: 10, maxJoinedRooms: 10, }); return async (req, res, next) => { try { res.locals.ownedChatRooms = await this.getRoomsForOwner(req.user, { skip: 0, cpp: options.maxOwnedRooms, }); res.locals.joinedChatRooms = await this.getRoomsForMember(req.user, { skip: 0, cpp: options.maxJoinedRooms, }); return next(); } catch (error) { this.log.error('failed to execute chat middleware', { error }); return next(error); } }; } async createRoom (owner, roomDefinition) { const NOW = new Date(); const room = new ChatRoom(); room.created = NOW; room.lastActivity = NOW; room.ownerType = owner.type; room.owner = owner._id; if (!roomDefinition.name || !roomDefinition.name.length) { throw new SiteError(400, 'Must provide room name'); } room.name = this.filterText(roomDefinition.name); if (roomDefinition.description && (roomDefinition.description.length > 0)) { room.description = this.filterText(roomDefinition.description); } if (roomDefinition.policy && (roomDefinition.policy.length > 0)) { room.policy = this.filterText(roomDefinition.policy); } room.visibility = roomDefinition.visibility; room.membershipPolicy = roomDefinition.membershipPolicy; room.members = [ ]; await room.save(); return room.toObject(); } async updateRoom (room, roomDefinition) { const NOW = new Date(); const updateOp = { $set: { lastActivity: NOW, }, $unset: { }, }; if (!roomDefinition.name && !roomDefinition.name.length) { throw new SiteError(400, 'Must provide room name'); } updateOp.$set.name = this.filterText(roomDefinition.name); if (roomDefinition.description && (roomDefinition.description.length > 0)) { updateOp.$set.description = this.filterText(roomDefinition.description); } else { updateOp.$unset.description = 1; } if (roomDefinition.policy && (roomDefinition.policy.length > 0)) { updateOp.$set.policy = this.filterText(roomDefinition.policy); } else { updateOp.$unset.policy = 1; } if (!roomDefinition.visibility || !roomDefinition.visibility.length) { throw new SiteError(400, 'Must specify room visibility'); } updateOp.$set.visibility = roomDefinition.visibility.trim(); if (!roomDefinition.membershipPolicy || !roomDefinition.membershipPolicy.length) { throw new SiteError(400, 'Must specify room membership policy'); } updateOp.$set.membershipPolicy = roomDefinition.membershipPolicy.trim(); const response = await ChatRoom.findOneAndUpdate({ _id: room._id }, updateOp, { new: true }); this.log.debug('chat room update', { response }); return response; } async getRoomsForOwner (owner, pagination) { pagination = Object.assign({ skip: 0, cpp: 50 }, pagination); const rooms = await ChatRoom .find({ owner: owner._id }) .sort({ lastActivity: -1, created: -1 }) .skip(pagination.skip) .limit(pagination.cpp) .populate(this.populateChatRoom) .lean(); return rooms; } async getRoomsForMember (member, pagination) { pagination = Object.assign({ skip: 0, cpp: 50 }, pagination); const rooms = await ChatRoom .find({ 'members.member': member._id }) .sort({ lastActivity: -1, created: -1 }) .skip(pagination.skip) .limit(pagination.cpp) .populate(this.populateChatRoom) .lean(); return rooms; } async getPublicRooms (pagination) { pagination = Object.assign({ skip: 0, cpp: 50 }, pagination); const totalPublicRooms = await ChatRoom.countDocuments({ visibility: 'public' }); const rooms = await ChatRoom .find({ visibility: 'public' }) .sort({ lastActivity: -1, created: -1 }) .skip(pagination.skip) .limit(pagination.cpp) .populate(this.populateChatRoom) .lean(); return { rooms, totalPublicRooms }; } async getRoomById (roomId) { const room = await ChatRoom .findById(roomId) .populate(this.populateChatRoom) .lean(); return room; } async deleteRoom (room) { return this.queues.reeeper.add('chat-room-delete', { roomId: room._id }); } async joinRoom (room, member) { if (room.membershipPolicy !== 'open') { throw new SiteError(403, 'The room is not open'); } await ChatRoom.updateOne( { _id: room._id }, { $addToSet: { members: { memberType: member.type, member: member._id, }, }, }, ); } async leaveRoom (room, memberId) { await ChatRoom.updateOne( { _id: room._id }, { $pull: { members: { _id: memberId } }, }, ); } async sendRoomInvite (room, member, inviteDefinition) { const { coreNode: coreNodeService } = this.dtp.services; const NOW = new Date(); if (await this.isRoomMember(room, member)) { throw new SiteError(400, `${member.username} is already a member of ${room.name}`); } let invite = await ChatRoomInvite .findOne({ room: room._id, member: member._id }) .populate(this.populateChatRoomInvite) .lean(); if (invite) { switch (invite.status) { case 'new': throw new SiteError(400, `${member.displayName || member.username} was invited to join ${moment(invite.created).fromNow()}, but has not yet responded.`); case 'rejected': throw new SiteError(400, `${member.displayName || member.username} rejected your invitation to join.`); default: this.log.alert('deleting damaged ChatRoomInvite document', { _id: invite._id }); await ChatRoomInvite.deleteOne({ _id: invite._id }); break; // create a new one by proceeding } } invite = new ChatRoomInvite(); invite.created = NOW; invite.room = room._id; invite.memberType = member.type; invite.member = member._id; invite.status = 'new'; if (inviteDefinition && inviteDefinition.message) { invite.message = this.filterText(inviteDefinition.message); } await invite.save(); invite = invite.toObject(); this.log.info('chat room invite created', { roomId: room._id, memberId: member._id, inviteId: invite._id, }); /* * Send the invite notification using DTP Core services. It will figure out * who needs to receive the event and how to get it to them. */ room.owner.type = room.ownerType; const event = { recipientType: member.type, recipient: member._id, action: 'room-invite-create', emitter: room.owner, label: 'Chat Room Invitation', content: invite.message || `Join my chat room on ${this.dtp.config.site.name}!`, href: coreNodeService.getLocalUrl(`/chat/room/${room._id}/invite/${invite._id}`), }; await coreNodeService.sendKaleidoscopeEvent(event, member); return invite; } async getRoomInvites (room, status) { const invites = await ChatRoomInvite .find({ room: room._id, status }) .sort({ created: 1 }) .populate(this.populateChatRoomInvite) .lean(); return invites; } async getRoomInviteById (inviteId) { const invite = await ChatRoomInvite .findById(inviteId) .populate(this.populateChatRoomInvite) .lean(); return invite; } async acceptRoomInvite (invite) { if ((invite.status === 'accepted') || (invite.room.members.find((member) => member.member._id.equals(invite.member._id)))) { throw SiteError(400, "You have already accepted membership in this room."); } this.log.debug('updating chat invite', { inviteId: invite._id, status: 'accepted' }); await ChatRoomInvite.updateOne( { _id: invite._id }, { $set: { status: 'accepted' }, }, ); this.log.info('accepting invite to chat room', { roomId: invite.room._id, memberId: invite.member._id, }); await ChatRoom.updateOne( { _id: invite.room._id }, { $push: { members: { memberType: invite.memberType, member: invite.member._id, }, }, }, ); } async rejectRoomInvite (invite) { this.log.info('rejecting chat room invite', { inviteId: invite._id, roomId: invite.room._id, memberId: invite.member._id, }); await ChatRoomInvite.updateOne( { _id: invite._id }, { $set: { status: 'rejected' }, }, ); } /** * Marks an invitation as deleted, but does not physically remove it from the * database. They expire after 30 days and will self-delete. This serves as a * guard against invite spam. The person asking you to change this behavior * wants to use invite spam as a form of abuse. The answer is: No. * * @param {ChatRoomInvite} invite The invitation to be marked as deleted. */ async deleteRoomInvite (invite) { if (invite.status !== 'new') { throw new SiteError(400, "Can't delete selected room invite"); } this.log.info('deleting chat room invite', { inviteId: invite._id }); await ChatRoomInvite.updateOne({ _id: invite._id }, { $set: { status: 'deleted' } }); } async isRoomMember (room, userId) { const member = await ChatRoom.findOne({ 'members.member': userId }).lean(); return !!member; } async createMessage (author, messageDefinition) { const { sticker: stickerService, user: userService } = this.dtp.services; this.log.alert('user record', { author }); if (author.type === 'User') { author = await userService.getLocalUserAccount(author._id); } else { author = await userService.getCoreUserAccount(author._id); } if (!author || !author.permissions || !author.permissions.canChat) { throw new SiteError(403, `You are not permitted to chat at all on ${this.dtp.config.site.name}`); } try { const userKey = author._id.toString(); await this.chatMessageLimiter.consume(userKey, 1); } catch (error) { throw new SiteError(429, 'You are sending chat messages too quickly'); } const NOW = new Date(); /* * Record the chat message to the database */ let message = new ChatMessage(); message.created = NOW; message.channelType = messageDefinition.channelType; message.channel = mongoose.Types.ObjectId(messageDefinition.channel._id || messageDefinition.channel); message.authorType = author.type; message.author = author._id; message.content = this.filterText(messageDefinition.content); message.analysis = await this.analyzeContent(author, message.content); if (message.analysis.similarity > 3.0) { throw new SiteError(429, 'Message rejected as spam (too repetitive)'); } const stickerSlugs = this.findStickers(message.content); stickerSlugs.forEach((sticker) => { const re = new RegExp(`:${sticker}:`, 'gi'); message.content = message.content.replace(re, '').trim(); }); const stickers = await stickerService.resolveStickerSlugs(stickerSlugs); message.stickers = stickers.map((sticker) => sticker._id); await message.save(); message = message.toObject(); /* * Update room's latest message pointer */ await ChatRoom.updateOne( { _id: message.channel }, { $set: { latestMessage: message._id } }, ); /* * Prepare a message payload that can be transmitted over sockets to clients * and rendered for display. */ const renderedContent = this.renderMessageContent(message.content); const payload = { _id: message._id, created: message.created, user: { _id: author._id, displayName: author.displayName, username: author.username, }, content: renderedContent, stickers, }; if (author.picture) { payload.user.picture = { }; if (author.picture.large) { payload.user.picture.large ={ _id: author.picture.large._id, }; } if (author.picture.small) { payload.user.picture.small = { _id: author.picture.small._id, }; } } /* * Return both things */ return { message, payload }; } renderMessageContent (content) { return marked.parse(content, this.markedConfig); } findStickers (content) { const tokens = content.split(' '); const stickers = [ ]; tokens.forEach((token) => { if ((token[0] !== ':') || (token[token.length -1 ] !== ':')) { return; } token = token.slice(1, token.length - 1 ).toLowerCase(); if (token.includes('/') || token.includes(':') || token.includes(' ')) { return; // trimmed token includes invalid characters } this.log.debug('found sticker request', { token }); if (!stickers.includes(token)) { stickers.push(striptags(token)); } }); return stickers.slice(0, 4); } async removeMessage (message) { await ChatMessage.deleteOne({ _id: message._id }); this.emitter(`site:${this.dtp.config.site.domainKey}:chat`, { command: 'removeMessage', params: { messageId: message._id }, }); } async getChannelHistory (channel, pagination) { const messages = await ChatMessage .find({ channel: channel._id }) .sort({ created: -1 }) .skip(pagination.skip) .limit(pagination.cpp) .populate(this.populateChatMessage) .lean(); return messages.reverse(); } async getUserHistory (user, pagination) { const messages = await ChatMessage .find({ author: user._id }) .sort({ created: -1 }) .skip(pagination.skip) .limit(pagination.cpp) .populate(this.populateChatMessage) .lean(); return messages.reverse(); } async getRoomMemberships (user, options) { options = Object.assign({ withPopulate: true, }, options || { }); const search = { $or: [ { owner: user._id }, { 'members.member': user._id }, ], }; let q = ChatRoom.find(search).sort({ name: 1 }); if (options.pagination) { q = q.skip(options.pagination.skip).limit(options.pagination.cpp); } if (options.withPopulate) { q = q.populate(this.populateChatRoom); } const memberships = await q.lean(); return memberships; } /** * This service is never called using user-supplied lists of room IDs. Don't * do that, there is no membership check. Instead, every request knows the * member's list of rooms owned and rooms joined. When this method was * written, those arrays are being merged to build the list of roomIds. * @param {Array} roomIds an array of Room._id values * @param {*} pagination pagination params for the timeline * @returns an array of messages in chronological order from all room IDs * specified. */ async getMultiRoomTimeline (roomIds, pagination) { const messages = await ChatMessage .find({ room: { $in: roomIds } }) .sort({ created: -1 }) .skip(pagination.skip) .limit(pagination.cpp) .populate(this.populateChatMessage) .lean(); return messages.reverse(); } /** * Filters an input string to remove "zalgo" text and to strip all HTML tags. * This prevents cross-site scripting and the malicious destruction of text * layouts. * @param {String} content The text content to be filtered. * @returns the filtered text */ filterText (content) { return striptags(unzalgo.clean(content.trim())); } /** * Analyze an input chat message against a user's history for similarity and * other abusive content. Returns a response object with various scores * allowing the caller to implement various policies and make various * decisions. * @param {User} author The author of the chat message * @param {*} content The text of the chat message as would be distributed. * @returns response object with various scores indicating the results of * analyses performed. */ async analyzeContent (author, content) { const response = { similarity: 0.0 }; /* * Compare versus their recent chat messages, score for similarity, and * block based on repetition. Spammers are redundant. This stops them. */ const history = await ChatMessage .find({ author: author._id }) .sort({ created: -1 }) .select('content') .limit(10) .lean(); history.forEach((message) => { const similarity = stringSimilarity.compareTwoStrings(content, message.content); if (similarity > 0.9) { // 90% or greater match with history entry response.similarity += similarity; } }); return response; } async sendMessage (channel, messageName, payload) { if (typeof channel !== 'string') { channel = channel.toString(); } this.emitter.to(channel).emit(messageName, payload); } async sendSystemMessage (content, options) { const NOW = new Date(); options = Object.assign({ type: 'info', }, options || { }); const payload = { created: NOW, type: options.type, content, }; if (options.channelId) { this.emitter.to(options.channelId).emit('system-message', payload); return; } if (options.userId) { this.emitter.to(options.userId).emit('system-message', payload); } } async createEmojiReaction (user, reactionDefinition) { const { user: userService } = this.dtp.services; const NOW = new Date(); let userCheck; if (user.type === '') { userCheck = await userService.getLocalUserAccount(user._id); } else { userCheck = await userService.getCoreUserAccount(user._id); } if (!userCheck || !userCheck.permissions || !userCheck.permissions.canChat) { throw new SiteError(403, 'You are not permitted to chat'); } try { const userKey = user._id.toString(); await this.reactionLimiter.consume(userKey, 1); } catch (error) { throw new SiteError(429, 'You are sending reactions too quickly'); } const reaction = new EmojiReaction(); reaction.created = NOW; reaction.subjectType = reactionDefinition.subjectType; reaction.subject = mongoose.Types.ObjectId(reactionDefinition.subject); reaction.userType = user.type; reaction.user = user._id; reaction.reaction = reactionDefinition.reaction; if (reactionDefinition.timestamp) { reaction.timestamp = reactionDefinition.timestamp; } await reaction.save(); return reaction.toObject(); } async getRecent (limit = 10) { const messages = await ChatMessage .find({ }) .sort({ created: -1 }) .limit(limit) .populate(this.populateChatMessage) .lean(); return messages; } async removeForUser (user) { const { logan: loganService } = this.dtp.services; let roomCount = 0, messageCount = 0; await ChatRoom .find({ owner: user._id }) .populate(this.populateChatRoom) .cursor() .eachAsync(async (room) => { try { await this.deleteRoom(room); ++roomCount; } catch (error) { loganService.sendEvent(module.exports, { level: 'error', event: 'removeForUser', message: `failed to remove chat room: ${error.message}`, data: { room: { _id: room._id, name: room.name, }, error, }, }); // fall through } }); await ChatMessage .find({ author: user._id }) .cursor() .eachAsync(async (message) => { try { await this.removeMessage(message); ++messageCount; } catch (error) { loganService.sendEvent(module.exports, { level: 'error', event: 'removeForUser', message: `failed to remove chat message: ${error.message}`, data: { message: { _id: message._id, }, error, }, }); // fall through } }); loganService.sendEvent(module.exports, { level: 'info', event: 'removeForUser', data: { user: { _id: user._id, username: user.username, }, roomCount, messageCount, }, }); } } module.exports = { logId: 'svc:chat', index: 'chat', className: 'ChatService', create: (dtp) => { return new ChatService(dtp); }, };