You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

337 lines
11 KiB

// site-ioserver.js
// Copyright (C) 2022 DTP Technologies, LLC
// License: Apache-2.0
'use strict';
const DTP_COMPONENT = { name: 'I/O Server', slug: 'ioserver', prefix: 'srv' };
const path = require('path');
const Redis = require('ioredis');
const mongoose = require('mongoose');
const ConnectToken = mongoose.model('ConnectToken');
const striptags = require('striptags');
const marked = require('marked');
const { SiteLog } = require(path.join(__dirname, 'site-log'));
const Events = require('events');
class SiteIoServer extends Events {
constructor (dtp) {
super();
this.dtp = dtp;
this.log = new SiteLog(dtp, DTP_COMPONENT);
}
async start (httpServer) {
const { Server } = require('socket.io');
const { createAdapter } = require('@socket.io/redis-adapter');
this.createRateLimiters();
this.markedRenderer = new marked.Renderer();
this.markedRenderer.link = (href, title, text) => { return text; };
this.markedRenderer.image = (href, title, text) => { return text; };
this.markedRenderer.image = (href, title, text) => { return text; };
const hljs = require('highlight.js');
this.markedConfig = {
renderer: this.markedRenderer,
highlight: function(code, lang) {
const language = hljs.getLanguage(lang) ? lang : 'plaintext';
return hljs.highlight(code, { language }).value;
},
langPrefix: 'hljs language-', // highlight.js css expects a top-level 'hljs' class.
pedantic: false,
gfm: true,
breaks: false,
sanitize: false,
smartLists: true,
smartypants: false,
xhtml: false,
};
const pubClient = new Redis({
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
password: process.env.REDIS_PASSWORD,
key: process.env.REDIS_PREFIX,
});
pubClient.on('error', this.onRedisError.bind(this));
const subClient = pubClient.duplicate();
subClient.on('error', this.onRedisError.bind(this));
const transports = ['websocket'/*, 'polling'*/];
const adapter = createAdapter(pubClient, subClient);
this.io = new Server(httpServer, { adapter, transports });
this.io.on('connection', this.onSocketConnect.bind(this));
}
createRateLimiters ( ) {
const { RateLimiterRedis } = require('rate-limiter-flexible');
const rateLimiterRedisClient = new Redis({
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
password: process.env.REDIS_PASSWORD,
key: process.env.REDIS_KEY_PREFIX || 'dtp',
enableOfflineQueue: false,
lazyConnect: false,
});
this.chatMessageLimiter = new RateLimiterRedis({
storeClient: rateLimiterRedisClient,
points: 20,
duration: 60,
blockDuration: 60 * 3,
execEvenly: false,
keyPrefix: 'rl:chatmsg',
});
this.reactionLimiter = new RateLimiterRedis({
storeClient: rateLimiterRedisClient,
points: 60,
duration: 60,
blockDuration: 60 * 3,
execEvenly: false,
keyPrefix: 'rl:react',
});
}
async onRedisError (error) {
this.log.error('Redis error', { error });
}
async stop ( ) {
}
async onSocketConnect (socket) {
this.log.debug('socket connection', { sid: socket.id });
const token = await ConnectToken.findOne({ token: socket.handshake.auth.token }).populate('user').lean();
if (!token) {
this.log.alert('rejecting invalid socket token', {
sid: socket.sid,
handshake: socket.handshake,
});
socket.close();
return;
}
if (token.claimed) {
this.log.alert('rejecting use of claimed connect token', {
sid: socket.id,
handshake: socket.handshake,
});
socket.close();
return;
}
await ConnectToken.updateOne(
{ _id: token._id },
{ $set: { claimed: new Date() } },
);
this.log.debug('token claimed', {
sid: socket.id,
token: socket.handshake.auth.token,
user: token.user._id,
});
const session = {
user: {
_id: token.user._id,
type: token.userType,
created: token.user.created,
username: token.user.username,
displayName: token.user.displayName,
},
socket,
};
session.onSocketDisconnect = this.onSocketDisconnect.bind(this, session);
session.onJoinChannel = this.onJoinChannel.bind(this, session);
session.onLeaveChannel = this.onLeaveChannel.bind(this, session);
session.onUserChat = this.onUserChat.bind(this, session);
session.onUserReact = this.onUserReact.bind(this, session);
socket.on('disconnect', session.onSocketDisconnect);
socket.on('join', session.onJoinChannel);
socket.on('leave', session.onLeaveChannel);
socket.on('user-chat', session.onUserChat);
socket.on('user-react', session.onUserReact);
socket.emit('authenticated', {
message: 'token verified',
user: session.user,
});
}
async onSocketDisconnect (session, reason) {
this.log.debug('socket disconnect', { sid: session.socket.id, user: session.user._id, reason });
session.socket.off('disconnect', session.onSocketDisconnect);
session.socket.off('join', session.onJoinChannel);
session.socket.off('leave', session.onLeaveChannel);
}
async onJoinChannel (session, message) {
const { channelId } = message;
this.log.debug('socket joins channel', { sid: session.socket.id, user: session.user._id, channelId });
session.socket.join(channelId);
session.socket.emit('join-result', { channelId });
}
async onLeaveChannel (session, message) {
const { channelId } = message;
this.log.debug('socket leaves channel', { sid: session.socket.id, user: session.user._id, channelId });
session.socket.leave(channelId);
}
async onUserChat (session, messageDefinition) {
const { chat: chatService, user: userService } = this.dtp.services;
const channelId = messageDefinition.channel;
if (!messageDefinition.content || (messageDefinition.content.length === 0)) {
this.log.info('dropping empty chat message');
return;
}
/*
* First, implement the rate limiter check. If rate-limited, abort all
* further processing. Store nothing in the database. Send nothing to the
* chat room.
*/
try {
const userKey = session.user._id.toString();
await this.chatMessageLimiter.consume(userKey, 1);
} catch (rateLimiter) {
const NOW = new Date();
if (!session.notifySpamMuzzle) {
this.log.alert('preventing chat spam', { userId: session.user._id, rateLimiter });
session.socket.to(channelId).emit('system-message', {
created: NOW,
content: `${session.user.displayName || session.user.username} has been muted for a while.`,
});
session.notifySpamMuzzle = true;
}
session.socket.emit('system-message', {
created: NOW,
content: `You are rate limited for ${numeral(rateLimiter.msBeforeNext / 1000.0).format('0,0.0')} seconds.`,
rateLimiter,
});
return;
}
/*
* Pull the author's current User record from the db and verify that they
* have permission to chat. This read must happen with every chat message
* until permission update notifications are implemented on Redis pub/sub.
*/
try {
const userCheck = await userService.getUserAccount(session.user._id);
if (!userCheck || !userCheck.permissions || !userCheck.permissions.canChat) {
session.socket.emit('system-message', {
created: new Date(),
content: `You are not allowed to chat on ${this.dtp.config.site.name}.`,
});
return; // permission denied
}
//TODO: Forked apps may want to implement channel-level moderation, and
// this is where to implement those checks.
} catch (error) {
this.log.error('failed to implement user permissions check', { userId: session.user._id, error });
return; // can't verify permissions? No chat for you.
}
try {
const { message, payload } = await chatService.createMessage(session.user, messageDefinition);
if (message.analysis.similarity > 0.9) {
await chatService.sendSystemMessage(
session.socket,
"Your flow feels a little spammy, so that one didn't go through.",
{ type: 'warning' },
);
return;
}
// use chat service emitter to deliver to channel (more efficient)
// than socket.io API
await chatService.sendMessage(message.channel, 'user-chat', payload);
// use the socket itself to emit back to the sender
session.socket.emit('user-chat', payload);
session.notifySpamMuzzle = false;
} catch (error) {
this.log.error('failed to process user chat message', { error });
await chatService.sendSystemMessage(
session.socket,
`Failed to send chat: ${error.message}`,
{ type: 'error' },
);
return;
}
}
findStickers (content) {
const tokens = content.split(' ');
const stickers = [ ];
tokens.forEach((token) => {
if ((token[0] !== ':') || (token[token.length -1 ] !== ':')) {
return;
}
token = token.slice(1, token.length - 1 ).toLowerCase();
if (token.includes('/') || token.includes(':') || token.includes(' ')) {
return; // trimmed token includes invalid characters
}
this.log.debug('found sticker request', { token });
if (!stickers.includes(token)) {
stickers.push(striptags(token));
}
});
return stickers;
}
async onUserReact (session, message) {
const { chat: chatService, user: userService } = this.dtp.services;
try {
const userCheck = await userService.getUserAccount(session.user._id);
if (!userCheck || !userCheck.permissions || !userCheck.permissions.canChat) {
session.socket.emit('system-message', {
created: new Date(),
content: `You are not allowed to chat on ${this.dtp.config.site.name}.`,
});
return; // permission denied
}
try {
const userKey = session.user._id.toString();
await this.reactionLimiter.consume(userKey, 1);
} catch (error) {
return; // rate-limited
}
const reaction = await chatService.createEmojiReaction(session.user, message);
reaction.user = session.user;
const payload = { reaction };
const channelId = reaction.subject.toString();
await chatService.sendMessage(channelId, 'user-react', payload);
session.socket.emit('user-react', payload);
} catch (error) {
this.log.error('failed to process reaction', { message, error });
return;
}
}
}
module.exports.SiteIoServer = SiteIoServer;