// core-node.js // Copyright (C) 2022 DTP Technologies, LLC // License: Apache-2.0 'use strict'; const uuidv4 = require('uuid').v4; const fetch = require('node-fetch'); // jshint ignore:line const mongoose = require('mongoose'); const CoreNode = mongoose.model('CoreNode'); const CoreUser = mongoose.model('CoreUser'); const CoreNodeConnect = mongoose.model('CoreNodeConnect'); const CoreNodeRequest = mongoose.model('CoreNodeRequest'); const passport = require('passport'); const OAuth2Strategy = require('passport-oauth2'); const striptags = require('striptags'); const { SiteService, SiteError, SiteAsync } = require('../../lib/site-lib'); const { ResourceStatsDefaults } = require('../models/lib/resource-stats'); class CoreAddress { constructor (host, port) { this.host = host; this.port = port; } parse (host) { const tokens = host.split(':'); this.host = tokens[0]; if (tokens[1]) { this.port = parseInt(tokens[1], 10); } else { this.port = 443; } return this; } } class CoreNodeService extends SiteService { constructor (dtp) { super(dtp, module.exports); this.populateCoreUser = [ { path: 'core' }, ]; this.populateCoreNodeRequest = [ { path: 'core', }, ]; } async start ( ) { await super.start(); const cores = await this.getConnectedCores(null, true); this.log.info('Core Node service starting', { connectedCoreCount: cores.length }); cores.forEach((core) => this.registerPassportCoreOAuth2(core)); } async attachExpressRoutes (router) { const cores = await this.getConnectedCores(null, true); cores.forEach((core) => { const coreAuthStrategyName = this.getCoreAuthStrategyName(core); const coreAuthUri = `/core/${core._id}`; const coreAuthCallbackUri = `/core/${core._id}/callback`; this.log.info('attach Core Auth route', { coreId: core._id, name: core.meta.name, strategy: coreAuthStrategyName, auth: coreAuthUri, callback: coreAuthCallbackUri, }); router.get( coreAuthUri, (req, res, next) => { this.log.debug('Core auth request', { coreAuthStrategyName, clientId: core.oauth.clientId }); return next(); }, passport.authenticate(coreAuthStrategyName), ); router.get( coreAuthCallbackUri, (req, res, next) => { this.log.debug('Core auth callback', { strategy: coreAuthStrategyName }); return next(); }, passport.authenticate(coreAuthStrategyName, { failureRedirect: '/' }), async (req, res, next) => { req.session.userType = 'Core'; req.session.coreId = core._id; req.login(req.user, (error) => { if (error) { return next(error); } req.session.userType = 'Core'; req.session.coreId = core._id; return res.redirect('/'); }); }, ); }); } parseCoreAddress (host) { const address = new CoreAddress(); return address.parse(host); } async getCoreById (coreNodeId) { const core = await CoreNode .findOne({ _id: coreNodeId }) .lean(); return core; } async getCoreByAddress (address) { const core = await CoreNode .findOne({ 'address.host': address.host, 'address.port': address.port, }) .lean(); return core; } getCoreAuthStrategyName (core) { return `dtp:${core.meta.domainKey}`; } getCoreRequestScheme ( ) { return process.env.DTP_CORE_AUTH_SCHEME || 'https'; } getCoreRequestUrl (core, requestUrl) { const coreScheme = this.getCoreRequestScheme(); return `${coreScheme}://${core.address.host}:${core.address.port}${requestUrl}`; } getLocalUrl (url) { const CORE_SCHEME = this.getCoreRequestScheme(); const { site } = this.dtp.config; return `${CORE_SCHEME}://${site.domain}${url}`; } /** * First ensures that a record exists in the local database for the Core node. * Then, calls the node's info services to resolve more metadata about the * node, it's operation, policies, and available services. * * @param {String} host hostname and optional port number of Core node to be * resolved. * @returns CoreNode document describing the Core node. */ async resolveCore (host) { const NOW = new Date(); this.log.info('resolving Core node', { host }); const address = this.parseCoreAddress(host); let core = await this.getCoreByAddress(address); if (!core) { core = new CoreNode(); core.created = NOW; core.address.host = address.host; core.address.port = address.port; await core.save(); core = core.toObject(); } const txSite = await this.sendRequest(core, { method: 'GET', url: '/core/info/site', }); const txPackage = await this.sendRequest(core, { method: 'GET', url: '/core/info/package', }); await CoreNode.updateOne( { _id: core._id }, { $set: { updated: NOW, 'meta.name': txSite.response.site.name, 'meta.description': txSite.response.site.description, 'meta.domain': txSite.response.site.domain, 'meta.domainKey': txSite.response.site.domainKey, 'meta.version': txPackage.response.package.version, 'meta.admin': txSite.response.site.admin, 'meta.supportEmail': txSite.response.site.supportEmail, }, }, ); core = await CoreNode.findOne({ _id: core._id }).lean(); this.log.info('resolved Core node', { core }); return { core, networkPolicy: txSite.response.site.networkPolicy }; } async getConstellationStats ( ) { const connectedCount = await CoreNode.countDocuments({ 'flags.isConnected': true }); const pendingCount = await CoreNode.countDocuments({ 'flags.isConnected': false }); const potentialReach = Math.round(Math.random() * 6000000); return { connectedCount, pendingCount, potentialReach }; } /** * Sends a Kaleidoscope event to an array of recipients, a single recipient, * or no recipients (undefined, a broadcast). * @param {Object} event The event to be sent * @param {Array} recipients Array of CoreUser to receive the event. Leave * undefined to broadcast the event to all connected Core nodes. * @returns Array of results, one per recipient. */ async sendKaleidoscopeEvent (event, recipients) { const { hive: hiveService, userNotification: userNotificationService } = this.dtp.services; const { pkg } = this.dtp; const { site } = this.dtp.config; const CORE_SCHEME = this.getCoreRequestScheme(); if (recipients && !Array.isArray(recipients)) { recipients = [recipients]; } event.source = Object.assign({ pkg: { name: pkg.name, version: pkg.version }, site, }, event.source); if (event.emitter) { let emitterUrl = event.emitter.coreUserId ? `/user/core/${event.emitter._id}` : `/user/${event.emitter._id}`; event.source.emitter = { emitterType: event.emitter.type, emitterId: event.emitter._id.toString(), displayName: event.emitter.displayName, username: event.emitter.username, href: `${CORE_SCHEME}://${site.domain}${emitterUrl}`, }; } const request = { tokenized: false, method: 'POST', url: '/hive/kaleidoscope/event', body: { event }, }; if (!recipients) { return this.broadcast(request); } let localEvent; // will be created if any local recipients const results = [ ]; await SiteAsync.each(recipients, async (recipient) => { switch (recipient.type) { case 'CoreUser': try { const response = await this.sendRequest(recipient.core, request); results.push({ success: true, recipient, request, response }); } catch (error) { this.log.error('failed to deliver request to Core node', { coreId: recipient.core._id, request, error, }); results.push({ success: false, recipient, request, error }); } break; case 'User': try { if (!localEvent) { localEvent = await hiveService.createKaleidoscopeEvent(event); } await userNotificationService.create(recipient, localEvent); results.push({ success: true, recipient, localEvent }); } catch (error) { this.log.error('failed to deliver Kaleidoscope event to local user', { recipient, event, error }); results.push({ success: false, error }); } break; default: results.push({ recipient, error: new SiteError(400, 'Recipient does not have a valid type')}); break; } }, 4); return results; } async broadcast (request) { const results = [ ]; await CoreNode .find({ 'flags.isConnected': true, 'flags.isBlocked': false, }) .cursor() .eachAsync(async (core) => { try { if (!core.kaleidoscope || !core.kaleidoscope.token) { throw new Error('Core has not provided a Kaleidoscope token'); } const response = await this.sendRequest(core, request); results.push({ coreId: core._id, request, response }); } catch (error) { this.log.error('failed to send Core Node request', { core: core._id, request: request.url, error }); } }, 2); return results; } async sendRequest (core, request) { try { const req = new CoreNodeRequest(); const options = { headers: { 'Content-Type': 'application/json', }, }; if (core.kaleidoscope && core.kaleidoscope.token) { options.headers.Authorization = `Bearer ${core.kaleidoscope.token}`; } req.created = new Date(); req.core = core._id; if (request.tokenized) { req.token = { value: uuidv4(), claimed: false, }; options.headers['X-DTP-Core-Token'] = req.token.value; } options.method = req.method = request.method || 'GET'; req.url = request.url; await req.save(); if (request.body) { options.body = JSON.stringify(request.body); } this.log.info('sending Core node request', { request: req }); const requestUrl = this.getCoreRequestUrl(core, request.url); const response = await fetch(requestUrl, options); if (!response.ok) { let json; try { json = await response.json(); } catch (error) { await this.setRequestResponse(req, response); throw new SiteError(response.status, response.statusText); } this.log.debug('received failure response', { json }); await this.setRequestResponse(req, response, json); throw new SiteError(response.status, json.message || response.statusText); } const json = await response.json(); await this.setRequestResponse(req, response, json); this.log.info('Core node request complete', { request: req }); return { request: req.toObject(), response: json }; } catch (error) { this.log.error('failed to send Core Node request', { core: core._id, request: request.url, error }); throw error; } } async setRequestResponse (request, response, json) { const DONE = new Date(); const ELAPSED = DONE.valueOf() - request.created.valueOf(); const updateOp = { $set: { 'response.received': DONE, 'response.elapsed': ELAPSED, 'response.statusCode': response.status, }, }; if (json) { updateOp.$set['response.success'] = json.success; } await CoreNodeRequest.updateOne({ _id: request._id }, updateOp); } async getCoreRequestHistory (core, pagination) { const requests = await CoreNodeRequest .find({ core: core._id }) .sort({ created: -1 }) .skip(pagination.skip) .limit(pagination.cpp) .lean(); for (const req of requests) { req.core = core; } const totalRequestCount = await CoreNodeRequest.countDocuments({ core: core._id }); return { requests, totalRequestCount }; } async connect (response) { const request = await CoreNodeRequest .findOne({ 'token.value': response.token }) .populate(this.populateCoreNodeRequest) .lean(); if (!request || request.token.claimed) { throw new SiteError(403, 'Unauthorized request'); } this.log.info('enabling Core community', { coreId: request.core._id, domain: request.core.meta.domain, }); await this.setCoreOAuth2Credentials(request.core, response.credentials); await CoreNodeRequest.updateOne( { _id: request._id }, { $set: { 'token.claimed': true, }, }, ); } async queueServiceNodeConnect (requestToken, appNode) { const NOW = new Date(); appNode.site.domain = striptags(appNode.site.domain.trim().toLowerCase()); appNode.site.domainKey = striptags(appNode.site.domainKey.trim().toLowerCase()); appNode.site.name = striptags(appNode.site.name.trim()); appNode.site.description = striptags(appNode.site.description.trim()); appNode.site.company = striptags(appNode.site.company.trim()); appNode.site.coreAuth.scopes = appNode.site.coreAuth.scopes.map((scope) => scope.trim().toLowerCase()); appNode.site.coreAuth.callbackUrl = striptags(appNode.site.coreAuth.callbackUrl.trim()); let request = await CoreNodeConnect.findOne({ $or: [ { domain: appNode.site.domain }, { domainKey: appNode.site.domainKey }, ], }); if (request) { throw new SiteError(406, 'You already have a pending Core Connect request'); } request = new CoreNodeConnect(); request.created = NOW; request.token = requestToken; request.status = 'pending'; request.pkg = { name: appNode.pkg.name, version: appNode.pkg.version, }; request.site = { domain: appNode.site.domain, domainKey: appNode.site.domainKey, name: appNode.site.name, description: appNode.site.description, company: appNode.site.company, coreAuth: { scopes: appNode.site.coreAuth.scopes, callbackUrl: appNode.site.coreAuth.callbackUrl, }, }; await request.save(); return request.toObject(); } async getServiceNodeQueue (pagination) { const queue = await CoreNodeConnect .find({ status: 'pending' }) .sort({ created: -1 }) .skip(pagination.skip) .limit(pagination.cpp) .lean(); return queue; } async getCoreConnectRequest (requestId) { const request = await CoreNodeConnect .findOne({ _id: requestId }) .lean(); return request; } async acceptServiceNode (requestToken, appNode) { const { oauth2: oauth2Service } = this.dtp.services; const response = { token: requestToken }; this.log.info('accepting app node', { requestToken, appNode }); response.client = await oauth2Service.createClient(appNode.site); return response; } async setCoreOAuth2Credentials (core, credentials) { const { client } = credentials; this.log.info('updating Core Connect credentials', { core, client }); await CoreNode.updateOne( { _id: core._id }, { $set: { 'flags.isConnected': true, 'oauth.clientId': client._id, 'oauth.clientSecret': client.secret, 'oauth.scopes': client.scopes, 'oauth.redirectUri': client.redirectUri, 'kaleidoscope.token': client.kaleidoscope.token, }, }, ); } registerPassportCoreOAuth2 (core) { const { coreNode: coreNodeService } = this.dtp.services; const AUTH_SCHEME = coreNodeService.getCoreRequestScheme(); const coreAuthStrategyName = this.getCoreAuthStrategyName(core); const authorizationHost = `${core.address.host}:${core.address.port}`; const authorizationURL = `${AUTH_SCHEME}://${authorizationHost}/oauth2/authorize`; const tokenURL = `${AUTH_SCHEME}://${authorizationHost}/oauth2/token`; const callbackURL = `${AUTH_SCHEME}://${process.env.DTP_SITE_DOMAIN}/auth/core/${core._id}/callback`; const coreAuthStrategy = new OAuth2Strategy( { authorizationURL, tokenURL, clientID: core.oauth.clientId.toString(), clientSecret: core.oauth.clientSecret, callbackURL, }, async (accessToken, refreshToken, params, profile, cb) => { const NOW = new Date(); try { const coreUserId = mongoose.Types.ObjectId(params.coreUserId); let user = await CoreUser.findOneAndUpdate( { core: core._id, coreUserId, }, { $setOnInsert: { created: NOW, core: core._id, coreUserId, flags: { isAdmin: false, isModerator: false, }, permissions: { canLogin: true, canChat: true, canComment: true, canReport: true, }, optIn: { system: true, marketing: false, }, theme: 'dtp-light', resourceStats: ResourceStatsDefaults, }, $set: { updated: NOW, username: params.username, username_lc: params.username_lc, displayName: params.displayName, bio: params.bio, }, }, { upsert: true, new: true, }, ); user = user.toObject(); user.type = 'CoreUser'; return cb(null, user); } catch (error) { return cb(error); } }, ); this.log.info('registering Core auth strategy', { name: coreAuthStrategyName, host: core.address.host, port: core.address.port, clientID: core.oauth.clientId.toString(), callbackURL, }); passport.use(coreAuthStrategyName, coreAuthStrategy); } async getConnectedCores (pagination, withOAuth = false) { let q = CoreNode.find({ 'flags.isConnected': true }); if (!withOAuth) { q = q.select('-oauth'); } q = q.sort({ 'meta.domain': 1 }); if (pagination) { q = q .skip(pagination.skip) .limit(pagination.cpp); } const cores = await q.lean(); return cores; } async searchUsers (search, pagination) { const users = await CoreUser .find(search) .select('+flags +permissions +optIn') .sort({ username_lc: 1 }) .skip(pagination.skip) .limit(pagination.cpp) .populate(this.populateCoreUser) .lean(); return users.map((user) => { user.type = 'CoreUser'; return user; }); } async getUserByLocalId (userId) { const { user: userService } = this.dtp.services; let user = await CoreUser .findOne({ _id: userId }) .select('+flags +permissions +optIn') .populate(this.populateCoreUser) .lean(); user.type = 'CoreUser'; userService.decorateUserObject(user); return user; } async updateUserForAdmin (user, settings) { const NOW = new Date(); if (!settings.username || !settings.username.length) { throw new SiteError(406, 'Must include username'); } settings.username = striptags(settings.username.trim()); settings.username_lc = settings.username.toLowerCase(); if (settings.badges) { settings.badges = settings.badges.split(',').map((badge) => striptags(badge.trim())); } else { settings.badges = [ ]; } await CoreUser.updateOne( { _id: user._id }, { $set: { updated: NOW, username: settings.username, username_lc: settings.username_lc, displayName: striptags(settings.displayName.trim()), bio: striptags(settings.bio.trim()), badges: settings.badges, 'flags.isAdmin': settings.isAdmin === 'on', 'flags.isModerator': settings.isModerator === 'on', 'flags.isEmailVerified': settings.isEmailVerified === 'on', 'permissions.canLogin': settings.canLogin === 'on', 'permissions.canChat': settings.canChat === 'on', 'permissions.canComment': settings.canComment === 'on', 'permissions.canReport': settings.canReport === 'on', 'permissions.canAuthorPages': settings.canAuthorPages === 'on', 'permissions.canAuthorPosts': settings.canAuthorPosts === 'on', 'permissions.canPublishPages': settings.canPublishPages === 'on', 'permissions.canPublishPosts': settings.canPublishPosts === 'on', 'optIn.system': settings.optInSystem === 'on', 'optIn.marketing': settings.optInMarketing === 'on', }, }, ); } async updateUserSettings (user, settings) { await CoreUser.updateOne( { _id: user._id }, { $set: { theme: settings.theme, }, }, ); } } module.exports = { slug: 'core-node', name: 'coreNode', create: (dtp) => { return new CoreNodeService(dtp); }, };