// host-services.js // Copyright (C) 2022 DTP Technologies, LLC // License: Apache-2.0 'use strict'; const os = require('os'); const diskusage = require('diskusage-ng'); const path = require('path'); const fs = require('fs'); const si = require('systeminformation'); require('dotenv').config({ path: path.resolve(__dirname, '..', '..', '.env') }); const dgram = require('dgram'); const numeral = require('numeral'); const { SiteAsync, SiteLog, SiteError, SiteWorker, } = require(path.join(__dirname, '..', '..', 'lib', 'site-lib')); const { CronJob } = require('cron'); const CRON_TIMEZONE = 'America/New_York'; module.rootPath = path.resolve(__dirname, '..', '..'); module.pkg = require(path.resolve(__dirname, '..', '..', 'package.json')); module.config = { environment: process.env.NODE_ENV, root: module.rootPath, site: require(path.join(module.rootPath, 'config', 'site')), http: require(path.join(module.rootPath, 'config', 'http')), component: { logId: 'host-services-worker', index: 'hostServicesWorker', className: 'HostServicesWorker' }, }; module.log = new SiteLog(module, module.config.component); class CacheStats { constructor ( ) { this.itemCount = 0; this.dataSize = 0; this.expireCount = 0; this.expireDataSize = 0; this.hitCount = 0; this.hitDataSize = 0; this.missCount = 0; this.missDataSize = 0; } add (size) { this.itemCount += 1; this.dataSize += (size / 1024.0); } remove (size) { this.itemCount -= 1; this.dataSize -= size / 1024.0; } hit (size) { this.hitCount += 1; this.hitDataSize += (size / 1024.0); } miss (size) { this.missCount += 1; this.missDataSize += (size / 1024.0); } expire (size) { this.expireCount += 1; this.expireDataSize += size; } report ( ) { const report = { itemCount: this.itemCount, dataSize: this.dataSize, expireCount: this.expireCount, expireDataSize: this.expireDataSize, hitCount: this.hitCount, hitDataSize: this.hitDataSize, missCount: this.missCount, missDataSize: this.missDataSize, }; this.resetCounters(); return report; } resetCounters ( ) { this.expireCount = 0; this.expireDataSize = 0; this.hitCount = 0; this.hitDataSize = 0; this.missCount = 0; this.missDataSize = 0; } } class HostCacheTransaction { get tid ( ) { return this.message.tid; } get cmd ( ) { return this.message.cmd; } get params ( ) { return this.message.params; } get address ( ) { return this.rinfo.address; } get port ( ) { return this.rinfo.port; } get size ( ) { return this.rinfo.size; } constructor (dtp, message, rinfo) { this.dtp = dtp; this.created = Date.now(); // timestamp, not Date instance this.component = { name: 'Host Cache Transaction', slug: 'host-cache-transaction' }; this.log = new SiteLog(dtp, this.component); this.message = message; this.rinfo = rinfo; this.flags = { isFetched: false, isCached: false, isResolved: false, isError: false, }; } async getFile ( ) { const { minio: minioService } = this.dtp.services; const filePath = path.join( process.env.DTP_HOST_CACHE_PATH, this.params.bucket, this.params.key, ); const res = { cmd: this.cmd, success: true, message: undefined, file: { stats: undefined, path: undefined, }, }; try { res.file.stats = await fs.promises.stat(filePath); if (!res.file.stats.isFile()) { throw new SiteError(500, 'invalid object requested'); } res.file.path = filePath; this.flags.isCached = true; this.dtp.cacheStats.hit(res.file.stats.size); return this.dtp.manager.resolveTransaction(this, res); } catch (error) { if (error.code !== 'ENOENT') { this.log.error('failed to stat requested object', { transaction: this, error }); res.success = false; res.statusCode = 500; res.message = error.message; this.error = error; this.flags.isError = true; return this.dtp.manager.resolveTransaction(this, res); } // fall through to MinIO fetch since file not found in cache } try { await minioService.downloadFile({ bucket: this.params.bucket, key: this.params.key, filePath, }); res.file.path = filePath; res.file.stats = await fs.promises.stat(filePath); if (!res.file.stats.isFile()) { throw new SiteError(500, 'invalid object requested'); } this.flags.isFetched = true; this.dtp.cacheStats.add(res.file.stats.size); this.dtp.cacheStats.miss(res.file.stats.size); return this.dtp.manager.resolveTransaction(this, res); } catch (error) { if (error.code !== 'NotFound') { this.log.error('failed to fetch requested object from MinIO', { transaction: this, error }); res.success = false; res.statusCode = 500; res.message = error.message; this.error = error; this.flags.isError = true; return this.dtp.manager.resolveTransaction(this, res); } } res.success = false; res.statusCode = 404; res.message = 'Not Found'; this.error = new SiteError(404, 'Not Found'); this.flags.isError = true; return this.dtp.manager.resolveTransaction(this, res); } async cancel (reason) { const res = { res: this.cmd, success: false, message: `Operation canceled: ${reason}`, }; return this.dtp.manager.resolveTransaction(this, res); } async sendResponse (res) { const NOW = Date.now(); const duration = this.duration = (NOW - this.created) / 1000.0; const { flags } = this; flags.isResolved = true; const payload = { tid: this.tid, res, flags, duration }; const reply = Buffer.from(JSON.stringify(payload)); this.dtp.server.send(reply, this.port, this.address); } } class TransactionManager { constructor (dtp) { this.dtp = dtp; this.component = { logId: 'transaction-manager', index: 'transactionManager', className: 'TransactionManager', }; this.log = new SiteLog(dtp, this.component); this.transactions = { }; } async addTransaction (transaction) { if (this.hasPendingRequest(transaction)) { this.transactions[transaction.tid] = transaction; // queue it and be done return; } this.transactions[transaction.tid] = transaction; // queue it and process the command switch (transaction.cmd) { case 'getFile': return transaction.getFile(); default: break; // unknown/undefined command } this.log.error('invalid host-services command', { cmd: transaction.cmd, params: transaction.params, from: { address: transaction.address, port: transaction.port, }, }); await this.dtp.manager.cancelTransaction(transaction, 'Rejected'); } hasPendingRequest (transaction) { if (!transaction) { return false; } const keys = Object.keys(this.transactions); const match = keys.find((key) => { const cmp = this.transactions[key]; if (!cmp) { return false; } if (cmp.cmd !== transaction.cmd) { return false; } if (cmp.params.bucket !== transaction.params.bucket) { return false; } if (cmp.params.key !== transaction.params.key) { return false; } return true; }); return !!match; } async resolveTransaction (transaction, res) { await transaction.sendResponse(res); this.removeTransaction(transaction, 'resolved'); } async cancelTransaction (transaction, reason) { await transaction.cancel(); this.removeTransaction(transaction, reason); } removeTransaction (transaction) { if (this.transactions[transaction.tid]) { delete this.transactions[transaction.tid]; } } async expireTransactions ( ) { const NOW = Date.now(); const keys = Object.keys(this.transactions); let expired = 0; await SiteAsync.each(keys, async (key) => { const transaction = this.transactions[key]; const age = NOW - transaction.created; if (age > (1000 * 30)) { this.log.alert('expiring transaction', { transaction }); await this.cancelTransaction(transaction, 'expired'); ++expired; } }, 8); this.log.info('transaction watchdog', { expired }); } } class HostServicesWorker extends SiteWorker { constructor (dtp) { super(dtp, dtp.config.component); } async onHostCacheMessage (message, rinfo) { try { message = message.toString('utf8'); message = JSON.parse(message); const transaction = new HostCacheTransaction(this.dtp, message, rinfo); this.dtp.manager.addTransaction(transaction); } catch (error) { this.log.error('failed to receive UDP message', { message, error }); } } async startHostCache (basePath) { basePath = basePath || process.env.DTP_HOST_CACHE_PATH; const NOW = Date.now(); const dir = await fs.promises.opendir(basePath); for await (const dirent of dir) { if (dirent.isDirectory()) { this.log.debug('cache start descend into directory', { name: dirent.name }); await this.startHostCache(path.join(basePath, dirent.name)); } if (dirent.isFile()) { const filePath = path.join(basePath, dirent.name); const stats = await fs.promises.stat(filePath); const age = (NOW - stats.atime.valueOf()) / 1000.0; // seconds this.log.debug('examining file', { filePath, age: numeral(age).format('hh:mm:ss') }); if ((age / 60.0) > 60.0) { this.log.info('expiring file', { filePath, age: numeral(age).format('hh:mm:ss') }); await fs.promises.rm(filePath, { force: true }); } else { this.dtp.cacheStats.add(stats.size); } } } // await dir.close(); this.log.info('cache startup cleanup complete', { basePath }); } /** * When a file is accessed for read or otherwise, it's atime is updated. If the * atime of a file exceeds the configured max file idle time, the file is * removed. * @param {String} basePath The root of the cache store on disk. */ async cleanHostCache (basePath) { const NOW = Date.now(); // timestamp, not Date instance basePath = basePath || process.env.DTP_HOST_CACHE_PATH; this.log.info('cache directory cleanup', { path: basePath }); const dir = await fs.promises.opendir(basePath); for await (const dirent of dir) { if (dirent.isDirectory()) { this.log.debug('cache clean descend into directory', { name: dirent.name }); await this.cleanHostCache(path.join(basePath, dirent.name)); } if (dirent.isFile()) { const filePath = path.join(basePath, dirent.name); const stats = await fs.promises.stat(filePath); const age = NOW - stats.atime.valueOf(); this.log.debug('examining file', { filePath, age: numeral(age / 1000.0).format('hh:mm:ss') }); if ((age / 1000.0 / 60.0) > 60.0) { this.log.info('expiring file', { filePath, age: numeral(age / 1000.0).format('hh:mm:ss') }); await fs.promises.rm(filePath, { force: true }); this.dtp.cacheStats.remove(stats.size); this.dtp.cacheStats.expire(stats.size); } } } this.log.info('cache directory cleanup complete', { basePath }); } async expireTransactions ( ) { await this.dtp.manager.expireTransactions(); } async registerHost ( ) { const NOW = new Date(); const mongoose = require('mongoose'); const NetHost = mongoose.model('NetHost'); const memory = await si.mem(); this.host = new NetHost(); this.host.created = NOW; this.host.status = 'starting'; this.host.hostname = os.hostname(); this.host.arch = os.arch(); this.host.cpus = os.cpus().map((cpu) => { return { model: cpu.model, speed: cpu.speed, }; }); this.host.totalmem = memory.total; this.host.freemem = memory.available; this.host.platform = os.platform(); this.host.release = os.release(); this.host.version = os.version(); this.host.network = (await si.networkInterfaces()).map((iface) => { return { iface: iface.iface, speed: iface.speed, mac: iface.mac, ip4: iface.ip4, ip4subnet: iface.ip4subnet, ip6: iface.ip6, ip6subnet: iface.ip6subnet, flags: { internal: iface.internal, virtual: iface.virtual, }, }; }); this.log.info('host registration: network', { network: this.host.network }); await this.host.save(); this.host = this.host.toObject(); this.log.info('registered host with platform', { host: this.host._id }); return this.host; } async reportHostStats ( ) { const NOW = new Date(); const mongoose = require('mongoose'); const NetHost = mongoose.model('NetHost'); const NetHostStats = mongoose.model('NetHostStats'); const memory = await si.mem(); const network = (await si.networkStats('*')).map((iface) => { const record = { iface: iface.iface }; record.rxDropped = iface.rx_dropped; record.rxErrors = iface.rx_errors; record.txDropped = iface.tx_dropped; record.txErrors = iface.tx_errors; if (iface.ms !== 0) { record.rxPerSecond = iface.rx_sec / (iface.ms / 1000.0); record.txPerSecond = iface.tx_sec / (iface.ms / 1000.0); } else { record.rxPerSecond = 0; record.txPerSecond = 0; } return record; }); const cpus = os.cpus().map((cpu) => { return { user: cpu.times.user, nice: cpu.times.nice, sys: cpu.times.sys, idle: cpu.times.idle, irq: cpu.times.irq, }; }); const load = os.loadavg(); const cache = this.dtp.cacheStats.report(); const disk = { cache: await this.getDiskUsage(process.env.DTP_HOST_CACHE_PATH), }; const cpuDeltas = [ ]; if (this.oldCpuStats) { for (let idx = 0; idx < cpus.length; ++idx) { cpuDeltas.push({ user: cpus[idx].user - this.oldCpuStats[idx].user, nice: cpus[idx].nice - this.oldCpuStats[idx].nice, sys: cpus[idx].sys - this.oldCpuStats[idx].sys, idle: cpus[idx].idle - this.oldCpuStats[idx].idle, irq: cpus[idx].irq - this.oldCpuStats[idx].irq, }); } } else { cpus.forEach(( ) => { cpuDeltas.push({ user: 0, nice: 0, sys: 0, idle: 0, irq: 0, }); }); } this.oldCpuStats = cpus; await NetHostStats.create({ created: NOW, host: this.host._id, load, cpus: cpuDeltas, memory, cache, disk, network, }); await NetHost.updateOne( { _id: this.host._id }, { updated: NOW, freemem: memory.available, }, ); this.log.info('platform host report', { host: this.host._id, }); } getDiskUsage (pathname) { return new Promise((resolve, reject) => { diskusage(pathname, (err, usage) => { if (err) { return reject(err); } usage.pctUsed = (usage.used / usage.total) * 100.0; return resolve(usage); }); }); } async setHostStatus (status) { if (!this.host) { return; } const NOW = new Date(); const mongoose = require('mongoose'); const NetHost = mongoose.model('NetHost'); await NetHost.updateOne( { _id: this.host._id }, { $set: { updated: NOW, status, }, }, ); } async expireNetHosts ( ) { const NOW = new Date(); const OLDEST = new Date(Date.now() - 1000 * 60 * 2); const mongoose = require('mongoose'); const NetHost = mongoose.model('NetHost'); const hosts = await NetHost.find({ status: { $nin: ['inactive', 'crashed'] }, updated: { $lt: OLDEST } }); this.log.info('expired host cleanup', { hostCount: hosts.length }); await SiteAsync.each(hosts, async (host) => { try { await NetHost.updateOne( { _id: host._id }, { $set: { updated: NOW, status: 'crashed', }, }, ); } catch (error) { this.log.error('failed to clean expired host', { host, error }); } }, 4); } async start ( ) { await super.start(); this.networkStats = await si.networkStats('*'); this.log.info('starting cache service', { path: process.env.DTP_HOST_CACHE_PATH }); this.dtp.cacheStats = new CacheStats(); await this.startHostCache(process.env.DTP_HOST_CACHE_PATH); this.log.info('cache stats at start', { stats: this.dtp.cacheStats }); /* * Host Cache server socket setup */ const HOST_PORT = parseInt(process.env.DTP_HOST_CACHE_PORT || '8000', 10); this.log.info('creating server UDP socket'); this.dtp.server = dgram.createSocket('udp4', this.onHostCacheMessage.bind(this)); this.log.info('binding server UDP socket', { port: HOST_PORT }); this.dtp.server.bind(HOST_PORT); this.log.info('starting transaction manager'); this.dtp.manager = new TransactionManager(this.dtp); this.expireJob = new CronJob('*/5 * * * * *', this.expireTransactions.bind(this), null, true, CRON_TIMEZONE); /* * Worker startup */ const cleanCronJob = process.env.DTP_HOST_CACHE_CLEAN_CRON || '*/30 * * * * *'; this.log.info('starting host cache clean cron', { cleanCronJob }); this.cleanupJob = new CronJob(cleanCronJob, this.cleanHostCache.bind(this), null, true, CRON_TIMEZONE); this.log.info('starting stats report job'); this.statsReportJob = new CronJob('*/5 * * * * *', this.reportHostStats.bind(this), null, true, CRON_TIMEZONE); this.log.info('starting host expiration job'); this.expireHostsJob = new CronJob('*/20 * * * * *', this.expireNetHosts.bind(this), null, true, CRON_TIMEZONE); this.log.info('registering host with DTP Sites platform', { }); await this.registerHost(); await this.setHostStatus('active'); } } (async ( ) => { try { module.log.info('ensuring host-services path exists', { path: process.env.DTP_HOST_CACHE_PATH }); await fs.promises.mkdir(process.env.DTP_HOST_CACHE_PATH, { recursive: true }); module.worker = new HostServicesWorker(module); await module.worker.start(); module.log.info(`${module.pkg.name} v${module.pkg.version} ${module.config.component.name} started`); } catch (error) { module.log.error('failed to start worker', { component: module.config.component, error }); process.exit(-1); } })();