UserSubscription and Kaleidoscope event processing

master
rob 2 years ago
parent 5d186c2100
commit 5fef5a0eff

@ -49,9 +49,14 @@ class HiveKaleidoscopeController extends SiteController {
}
async postEvent (req, res) {
this.log.debug('kaleidoscope event received', { event: req.body.event });
this.emit('kaleidoscope:event', req, res);
res.status(200).json({ success: true });
const { hive: hiveService } = this.dtp.services;
try {
this.log.debug('kaleidoscope event received', { event: req.body.event });
const response = await hiveService.processKaleidoscopeEvent(req.body.event);
res.status(200).json({ success: true, response });
} catch (error) {
this.log.error('failed to process Kaleidoscope event', { })
}
}
async getKaleidoscopeRoot (req, res) {

@ -10,9 +10,33 @@ const Schema = mongoose.Schema;
const UserNotificationSchema = new Schema({
created: { type: Date, default: Date.now, required: true, index: -1, expires: '7d' },
user: { type: Schema.ObjectId, required: true, index: 1, ref: 'User' },
source: { type: String, required: true },
message: { type: String, required: true },
status: { type: String, enum: ['new', 'seen'], default: 'new', required: true },
action: { type: String, required: true, lowercase: true },
label: { type: String, required: true },
content: { type: String },
href: { type: String },
source: {
pkg: {
name: { type: String, required: true },
version: { type: String, required: true },
},
site: {
name: { type: String, required: true },
description: { type: String },
domain: { type: String, lowercase: true, required: true },
domainKey: { type: String, lowercase: true, required: true },
company: { type: String },
coreAuth: {
scopes: { type: [String] },
},
},
author: {
userId: { type: Schema.ObjectId, required: true },
displayName: { type: String },
username: { type: String },
href: { type: String },
},
},
attachmentType: { type: String },
attachment: { type: Schema.ObjectId, refPath: 'attachmentType' },
});

@ -0,0 +1,27 @@
// user-subscription.js
// Copyright (C) 2022 DTP Technologies, LLC
// License: Apache-2.0
'use strict';
const mongoose = require('mongoose');
const Schema = mongoose.Schema;
const SubscriptionSchema = new Schema({
client: { type: Schema.ObjectId, required: true, ref: 'OAuth2Client' },
emitterId: { type: Schema.ObjectId },
});
SubscriptionSchema.index({
client: 1,
emitterId: 1,
}, {
name: 'subscription_client_emitter',
});
const UserSubscriptionSchema = new Schema({
user: { type: Schema.ObjectId, required: true, unique: true, ref: 'User' },
subscriptions: { type: [SubscriptionSchema] },
});
module.exports = mongoose.model('UserSubscription', UserSubscriptionSchema);

@ -0,0 +1,105 @@
// hive.js
// Copyright (C) 2022 DTP Technologies, LLC
// License: Apache-2.0
'use strict';
const mongoose = require('mongoose');
const UserSubscription = mongoose.model('UserSubscription');
const slug = require('slug');
const { SiteService, SiteError } = require('../../lib/site-lib');
class HiveService extends SiteService {
constructor (dtp) {
super(dtp, module.exports);
}
async subscribe (user, client, emitterId) {
await UserSubscription.updateOne(
{ user: user._id },
{
$addToSet: {
subscriptions: {
client: client._id,
emitterId,
},
},
},
{
upsert: true,
},
);
}
async unsubscribe (user, subscription) {
await UserSubscription.updateOne(
{ user: user._id },
{ $pull: { subscriptions: subscription } },
);
}
extractHashtags (content) {
const hashtags = content
.split(/ \r\n/g)
.filter((tag) => tag[0] === '#')
.map((tag) => slug(tag.slice(1)))
;
return hashtags;
}
extractLinks (content) {
let links = content
.split(/( |\r|\n)/g)
.filter((tag) => {
const test = tag.trim().toLowerCase();
return test.startsWith('http://') || test.startsWith('https://');
});
return links;
}
async resolveLink (author, url) {
const jobData = {
authorType: author.type,
author: author._id,
url,
};
this.log.info('creating job to resolve link', { jobData });
await this.resolver.add('resolve-link', jobData);
}
async processKaleidoscopeEvent (event) {
const {
userNotification: userNotificationService,
oauth2: oauth2Service,
} = this.dtp.services;
const client = await oauth2Service.getClientByDomainKey(event.source.site.domainKey);
if (!client) {
throw new SiteError(403, 'Unknown client domain key');
}
await UserSubscription
.find({
'subscriptions.client': client._id,
'subscriptions.emitterId': event.source.emitter._id,
})
.select('-subscriptions')
.cursor()
.eachAsync(async (subscription) => {
await userNotificationService.create(subscription.user, event);
}, 3);
this.emit('kaleidoscope:event', event, client);
}
}
module.exports = {
slug: 'hive',
name: 'hive',
create: (dtp) => { return new HiveService(dtp); },
};

@ -10,20 +10,21 @@ const mongoose = require('mongoose');
const UserNotification = mongoose.model('UserNotification');
const pug = require('pug');
const striptags = require('striptags');
const { SiteService } = require('../../lib/site-lib');
const { SiteService, SiteError } = require('../../lib/site-lib');
class UserNotificationService extends SiteService {
constructor (dtp) {
super(dtp, module.exports);
this.populateComment = [
this.populateUserNotification = [
{
path: 'author',
path: 'user',
select: '_id username username_lc displayName picture',
},
{
path: 'replyTo',
path: 'attachment',
},
];
}
@ -35,13 +36,133 @@ class UserNotificationService extends SiteService {
async create (user, notificationDefinition) {
const NOW = new Date();
const notification = new UserNotification();
/*
* Validate general notification data
*/
if (!notificationDefinition.action) {
throw new SiteError(406, 'Missing action');
}
if (!notificationDefinition.label) {
throw new SiteError(406, 'Missing label');
}
if (!notificationDefinition.content) {
throw new SiteError(406, 'Missing content');
}
if (!notificationDefinition.href) {
throw new SiteError(406, 'Missing href');
}
/*
* Validate source data
*/
if (!notificationDefinition.source) {
throw new SiteError(406, 'Missing source information');
}
/*
* Validate source site
*/
if (!notificationDefinition.source.site) {
throw new SiteError(406, 'Missing source site information');
}
if (!notificationDefinition.source.site.name) {
throw new SiteError(406, 'Missing source site name');
}
if (!notificationDefinition.source.site.description) {
throw new SiteError(406, 'Missing source site description');
}
if (!notificationDefinition.source.site.domain) {
throw new SiteError(406, 'Missing source site domain');
}
if (!notificationDefinition.source.site.domainKey) {
throw new SiteError(406, 'Missing source site domain key');
}
if (!notificationDefinition.source.site.company) {
throw new SiteError(406, 'Missing source site company name');
}
if (!notificationDefinition.source.site.coreAuth ||
!notificationDefinition.source.site.coreAuth.scopes ||
!Array.isArray(notificationDefinition.source.site.coreAuth.scopes) ||
(notificationDefinition.source.site.coreAuth.scopes.length === 0)) {
throw new SiteError(406, 'Missing source site Core auth or scope information');
}
/*
* Validate source package
*/
if (!notificationDefinition.source.pkg) {
throw new SiteError(406, 'Missing source package information');
}
if (!notificationDefinition.source.pkg.name) {
throw new SiteError(406, 'Missing source package name');
}
if (!notificationDefinition.source.pkg.version) {
throw new SiteError(406, 'Missing source package version');
}
/*
* Validate source author
*/
if (!notificationDefinition.source.author) {
throw new SiteError(406, 'Missing source author information');
}
if (!notificationDefinition.source.author.userId) {
throw new SiteError(406, 'Missing source author userId');
}
if (!notificationDefinition.source.author.username) {
throw new SiteError(406, 'Missing source author username');
}
if (!notificationDefinition.source.author.href) {
throw new SiteError(406, 'Missing source author href');
}
const notification = new UserNotification();
notification.created = NOW;
notification.user = user._id;
notification.source = notificationDefinition.source;
notification.message = notificationDefinition.message;
notification.status = 'new';
notification.action = striptags(notificationDefinition.action.trim().toLowerCase());
notification.label = striptags(notificationDefinition.label.trim());
notification.content = striptags(notificationDefinition.content.trim());
notification.href = striptags(notificationDefinition.href.trim());
notification.source = {
pkg: {
name: striptags(notificationDefinition.source.pkt.name.trim().toLowerCase()),
version: striptags(notificationDefinition.source.pkt.version.trim()),
},
site: {
name: striptags(notificationDefinition.source.site.name.trim()),
domain: striptags(notificationDefinition.source.site.domain.trim().toLowerCase()),
domainKey: striptags(notificationDefinition.source.site.domainKey.trim().toLowerCase()),
coreAuth: {
scopes: notificationDefinition.source.site.coreAuth.scopes.map((scope) => scope.trim().toLowerCase()),
},
},
author: {
userId: mongoose.Types.ObjectId(notificationDefinition.source.author.userId),
username: striptags(notificationDefinition.source.author.username.trim()),
href: striptags(notificationDefinition.source.author.href.trim()),
},
};
if (notificationDefinition.source.site.company) {
notification.source.site.company = striptags(notificationDefinition.source.site.company.trim());
}
if (notificationDefinition.source.site.description) {
notification.source.site.description = striptags(notificationDefinition.source.site.description.trim());
}
if (notificationDefinition.source.author.displayName) {
notification.source.author.displayName = striptags(notificationDefinition.source.author.displayName);
}
notification.attachmentType = notificationDefinition.attachmentType;
notification.attachment = notificationDefinition.attachment;
@ -81,6 +202,7 @@ class UserNotificationService extends SiteService {
.sort({ created: -1 })
.skip(pagination.skip)
.limit(pagination.cpp)
.populate(this.populateUserNotification)
.lean();
const newNotifications = notifications.map((notif) => notif.status === 'new');
if (newNotifications.length > 0) {

Loading…
Cancel
Save