// host-services.js // Copyright (C) 2022 DTP Technologies, LLC // License: Apache-2.0 'use strict'; const os = require('os'); const diskusage = require('diskusage-ng'); const path = require('path'); const fs = require('fs'); const si = require('systeminformation'); require('dotenv').config({ path: path.resolve(__dirname, '..', '..', '.env') }); const dgram = require('dgram'); const numeral = require('numeral'); const { SitePlatform, SiteAsync, SiteLog, SiteError, } = require(path.join(__dirname, '..', '..', 'lib', 'site-lib')); const { CronJob } = require('cron'); const CRON_TIMEZONE = 'America/New_York'; module.pkg = require(path.resolve(__dirname, '..', '..', 'package.json')); module.config = { componentName: 'host-services', root: path.resolve(__dirname, '..', '..'), }; module.log = new SiteLog(module, module.config.componentName); class CacheStats { constructor ( ) { this.itemCount = 0; this.dataSize = 0; this.expireCount = 0; this.expireDataSize = 0; this.hitCount = 0; this.hitDataSize = 0; this.missCount = 0; this.missDataSize = 0; } add (size) { this.itemCount += 1; this.dataSize += (size / 1024.0); } remove (size) { this.itemCount -= 1; this.dataSize -= size / 1024.0; } hit (size) { this.hitCount += 1; this.hitDataSize += (size / 1024.0); } miss (size) { this.missCount += 1; this.missDataSize += (size / 1024.0); } expire (size) { this.expireCount += 1; this.expireDataSize += size; } report ( ) { const report = { itemCount: this.itemCount, dataSize: this.dataSize, expireCount: this.expireCount, expireDataSize: this.expireDataSize, hitCount: this.hitCount, hitDataSize: this.hitDataSize, missCount: this.missCount, missDataSize: this.missDataSize, }; this.resetCounters(); return report; } resetCounters ( ) { this.expireCount = 0; this.expireDataSize = 0; this.hitCount = 0; this.hitDataSize = 0; this.missCount = 0; this.missDataSize = 0; } } class HostCacheTransaction { get tid ( ) { return this.message.tid; } get cmd ( ) { return this.message.cmd; } get params ( ) { return this.message.params; } get address ( ) { return this.rinfo.address; } get port ( ) { return this.rinfo.port; } get size ( ) { return this.rinfo.size; } constructor (message, rinfo) { this.created = Date.now(); // timestamp, not Date instance this.message = message; this.rinfo = rinfo; this.flags = { isFetched: false, isCached: false, isResolved: false, isError: false, }; } async getFile ( ) { const { minio: minioService } = module.services; const filePath = path.join( process.env.DTP_HOST_CACHE_PATH, this.params.bucket, this.params.key, ); const res = { cmd: this.cmd, success: true, message: undefined, file: { stats: undefined, path: undefined, }, }; try { res.file.stats = await fs.promises.stat(filePath); if (!res.file.stats.isFile()) { throw new SiteError(500, 'invalid object requested'); } res.file.path = filePath; this.flags.isCached = true; module.cacheStats.hit(res.file.stats.size); return module.manager.resolveTransaction(this, res); } catch (error) { if (error.code !== 'ENOENT') { module.log.error('failed to stat requested object', { transaction: this, error }); res.success = false; res.statusCode = 500; res.message = error.message; this.error = error; this.flags.isError = true; return module.manager.resolveTransaction(this, res); } // fall through to MinIO fetch since file not found in cache } try { await minioService.downloadFile({ bucket: this.params.bucket, key: this.params.key, filePath, }); res.file.path = filePath; res.file.stats = await fs.promises.stat(filePath); if (!res.file.stats.isFile()) { throw new SiteError(500, 'invalid object requested'); } this.flags.isFetched = true; module.cacheStats.add(res.file.stats.size); module.cacheStats.miss(res.file.stats.size); return module.manager.resolveTransaction(this, res); } catch (error) { if (error.code !== 'NotFound') { module.log.error('failed to fetch requested object from MinIO', { transaction: this, error }); res.success = false; res.statusCode = 500; res.message = error.message; this.error = error; this.flags.isError = true; return module.manager.resolveTransaction(this, res); } } res.success = false; res.statusCode = 404; res.message = 'Not Found'; this.error = new SiteError(404, 'Not Found'); this.flags.isError = true; return module.manager.resolveTransaction(this, res); } async cancel (reason) { const res = { res: this.cmd, success: false, message: `Operation canceled: ${reason}`, }; return module.manager.resolveTransaction(this, res); } async sendResponse (res) { const NOW = Date.now(); const duration = this.duration = (NOW - this.created) / 1000.0; const { flags } = this; flags.isResolved = true; const payload = { tid: this.tid, res, flags, duration }; const reply = Buffer.from(JSON.stringify(payload)); module.server.send(reply, this.port, this.address); } } class TransactionManager { constructor ( ) { this.transactions = { }; } async addTransaction (transaction) { if (this.hasPendingRequest(transaction)) { this.transactions[transaction.tid] = transaction; // queue it and be done return; } this.transactions[transaction.tid] = transaction; // queue it and process the command switch (transaction.cmd) { case 'getFile': return transaction.getFile(); default: break; // unknown/undefined command } this.log.error('invalid host-services command', { cmd: transaction.cmd, params: transaction.params, from: { address: transaction.address, port: transaction.port, }, }); await module.manager.cancelTransaction(transaction, 'Rejected'); } hasPendingRequest (transaction) { if (!transaction) { return false; } const keys = Object.keys(this.transactions); const match = keys.find((key) => { const cmp = this.transactions[key]; if (!cmp) { return false; } if (cmp.cmd !== transaction.cmd) { return false; } if (cmp.params.bucket !== transaction.params.bucket) { return false; } if (cmp.params.key !== transaction.params.key) { return false; } return true; }); return !!match; } async resolveTransaction (transaction, res) { await transaction.sendResponse(res); this.removeTransaction(transaction, 'resolved'); } async cancelTransaction (transaction, reason) { await transaction.cancel(); this.removeTransaction(transaction, reason); } removeTransaction (transaction) { if (this.transactions[transaction.tid]) { delete this.transactions[transaction.tid]; } } async expireTransactions ( ) { const NOW = Date.now(); const keys = Object.keys(this.transactions); let expired = 0; await SiteAsync.each(keys, async (key) => { const transaction = this.transactions[key]; const age = NOW - transaction.created; if (age > (1000 * 30)) { module.log.alert('expiring transaction', { transaction }); await this.cancelTransaction(transaction, 'expired'); ++expired; } }, 8); module.log.info('transaction watchdog', { expired }); } } module.onHostCacheMessage = async (message, rinfo) => { try { message = message.toString('utf8'); message = JSON.parse(message); const transaction = new HostCacheTransaction(message, rinfo); module.manager.addTransaction(transaction); } catch (error) { module.log.error('failed to receive UDP message', { message, error }); } }; module.startHostCache = async (basePath) => { basePath = basePath || process.env.DTP_HOST_CACHE_PATH; const NOW = Date.now(); const dir = await fs.promises.opendir(basePath); for await (const dirent of dir) { if (dirent.isDirectory()) { module.log.debug('cache start descend into directory', { name: dirent.name }); await module.startHostCache(path.join(basePath, dirent.name)); } if (dirent.isFile()) { const filePath = path.join(basePath, dirent.name); const stats = await fs.promises.stat(filePath); const age = (NOW - stats.atime.valueOf()) / 1000.0; // seconds module.log.debug('examining file', { filePath, age: numeral(age).format('hh:mm:ss') }); if ((age / 60.0) > 60.0) { module.log.info('expiring file', { filePath, age: numeral(age).format('hh:mm:ss') }); await fs.promises.rm(filePath, { force: true }); } else { module.cacheStats.add(stats.size); } } } // await dir.close(); module.log.info('cache startup cleanup complete', { basePath }); }; /** * When a file is accessed for read or otherwise, it's atime is updated. If the * atime of a file exceeds the configured max file idle time, the file is * removed. * @param {String} basePath */ module.cleanHostCache = async (basePath) => { const NOW = Date.now(); // timestamp, not Date instance basePath = basePath || process.env.DTP_HOST_CACHE_PATH; module.log.info('cache directory cleanup', { path: basePath }); const dir = await fs.promises.opendir(basePath); for await (const dirent of dir) { if (dirent.isDirectory()) { module.log.debug('cache clean descend into directory', { name: dirent.name }); await module.cleanHostCache(path.join(basePath, dirent.name)); } if (dirent.isFile()) { const filePath = path.join(basePath, dirent.name); const stats = await fs.promises.stat(filePath); const age = NOW - stats.atime.valueOf(); module.log.debug('examining file', { filePath, age: numeral(age / 1000.0).format('hh:mm:ss') }); if ((age / 1000.0 / 60.0) > 60.0) { module.log.info('expiring file', { filePath, age: numeral(age / 1000.0).format('hh:mm:ss') }); await fs.promises.rm(filePath, { force: true }); module.cacheStats.remove(stats.size); module.cacheStats.expire(stats.size); } } } module.log.info('cache directory cleanup complete', { basePath }); }; module.expireTransactions = async ( ) => { await module.manager.expireTransactions(); }; module.registerHost = async ( ) => { const NOW = new Date(); const mongoose = require('mongoose'); const NetHost = mongoose.model('NetHost'); const memory = await si.mem(); module.host = new NetHost(); module.host.created = NOW; module.host.status = 'starting'; module.host.hostname = os.hostname(); module.host.arch = os.arch(); module.host.cpus = os.cpus().map((cpu) => { return { model: cpu.model, speed: cpu.speed, }; }); module.host.totalmem = memory.total; module.host.freemem = memory.available; module.host.platform = os.platform(); module.host.release = os.release(); module.host.version = os.version(); module.host.network = (await si.networkInterfaces()).map((iface) => { return { iface: iface.iface, speed: iface.speed, mac: iface.mac, ip4: iface.ip4, ip4subnet: iface.ip4subnet, ip6: iface.ip6, ip6subnet: iface.ip6subnet, flags: { internal: iface.internal, virtual: iface.virtual, }, }; }); module.log.info('host registration: network', { network: module.host.network }); await module.host.save(); module.host = module.host.toObject(); module.log.info('registered host with platform', { host: module.host._id }); return module.host; }; module.reportHostStats = async ( ) => { const NOW = new Date(); const mongoose = require('mongoose'); const NetHost = mongoose.model('NetHost'); const NetHostStats = mongoose.model('NetHostStats'); const memory = await si.mem(); const network = (await si.networkStats('*')).map((iface) => { const record = { iface: iface.iface }; record.rxDropped = iface.rx_dropped; record.rxErrors = iface.rx_errors; record.txDropped = iface.tx_dropped; record.txErrors = iface.tx_errors; if (iface.ms !== 0) { record.rxPerSecond = iface.rx_sec / (iface.ms / 1000.0); record.txPerSecond = iface.tx_sec / (iface.ms / 1000.0); } else { record.rxPerSecond = 0; record.txPerSecond = 0; } return record; }); const cpus = os.cpus().map((cpu) => { return { user: cpu.times.user, nice: cpu.times.nice, sys: cpu.times.sys, idle: cpu.times.idle, irq: cpu.times.irq, }; }); const load = os.loadavg(); const cache = module.cacheStats.report(); const disk = { cache: await module.getDiskUsage(process.env.DTP_HOST_CACHE_PATH), }; const cpuDeltas = [ ]; if (module.oldCpuStats) { for (let idx = 0; idx < cpus.length; ++idx) { cpuDeltas.push({ user: cpus[idx].user - module.oldCpuStats[idx].user, nice: cpus[idx].nice - module.oldCpuStats[idx].nice, sys: cpus[idx].sys - module.oldCpuStats[idx].sys, idle: cpus[idx].idle - module.oldCpuStats[idx].idle, irq: cpus[idx].irq - module.oldCpuStats[idx].irq, }); } } else { cpus.forEach(( ) => { cpuDeltas.push({ user: 0, nice: 0, sys: 0, idle: 0, irq: 0, }); }); } module.oldCpuStats = cpus; await NetHostStats.create({ created: NOW, host: module.host._id, load, cpus: cpuDeltas, memory, cache, disk, network, }); await NetHost.updateOne( { _id: module.host._id }, { updated: NOW, freemem: memory.available, }, ); module.log.info('platform host report', { host: module.host._id, }); }; module.getDiskUsage = (pathname) => { return new Promise((resolve, reject) => { diskusage(pathname, (err, usage) => { if (err) { return reject(err); } usage.pctUsed = (usage.used / usage.total) * 100.0; return resolve(usage); }); }); }; module.setHostStatus = async (status) => { if (!module.host) { return; } const NOW = new Date(); const mongoose = require('mongoose'); const NetHost = mongoose.model('NetHost'); await NetHost.updateOne( { _id: module.host._id }, { $set: { updated: NOW, status, }, }, ); }; module.expireNetHosts = async ( ) => { const NOW = new Date(); const OLDEST = new Date(Date.now() - 1000 * 60 * 2); const mongoose = require('mongoose'); const NetHost = mongoose.model('NetHost'); const hosts = await NetHost.find({ status: { $nin: ['inactive', 'crashed'] }, updated: { $lt: OLDEST } }); module.log.info('expired host cleanup', { hostCount: hosts.length }); await SiteAsync.each(hosts, async (host) => { try { await NetHost.updateOne( { _id: host._id }, { $set: { updated: NOW, status: 'crashed', }, }, ); } catch (error) { module.log.error('failed to clean expired host', { host, error }); } }, 4); }; (async ( ) => { try { process.on('unhandledRejection', (error, p) => { module.log.error('Unhandled rejection', { error: error, promise: p, stack: error.stack }); }); process.on('warning', (error) => { module.log.alert('warning', { error }); }); process.once('SIGINT', async ( ) => { module.log.info('SIGINT received'); module.log.info('requesting shutdown...'); await module.setHostStatus('shutdown'); const exitCode = await SitePlatform.shutdown(); await module.setHostStatus('inactive'); process.nextTick(( ) => { process.exit(exitCode); }); }); module.log.info('ensuring host-services path exists', { path: process.env.DTP_HOST_CACHE_PATH }); await fs.promises.mkdir(process.env.DTP_HOST_CACHE_PATH, { recursive: true }); module.networkStats = await si.networkStats('*'); module.log.info('starting cache service', { path: process.env.DTP_HOST_CACHE_PATH }); module.cacheStats = new CacheStats(); await module.startHostCache(process.env.DTP_HOST_CACHE_PATH); module.log.info('cache stats at start', { stats: module.cacheStats }); /* * Host Cache server socket setup */ module.log.info('creating server UDP socket'); module.server = dgram.createSocket('udp4', module.onHostCacheMessage); module.log.info('binding server UDP socket'); module.server.bind(8000); /* * Site Platform startup */ await SitePlatform.startPlatform(module); module.log.info('starting transaction manager'); module.manager = new TransactionManager(); module.expireJob = new CronJob('*/5 * * * * *', module.expireTransactions, null, true, CRON_TIMEZONE); /* * Worker startup */ const cleanCronJob = process.env.DTP_HOST_CACHE_CLEAN_CRON || '*/30 * * * * *'; module.log.info('starting host cache clean cron', { cleanCronJob }); module.cleanupJob = new CronJob(cleanCronJob, module.cleanHostCache, null, true, CRON_TIMEZONE); module.log.info('starting stats report job'); module.statsReportJob = new CronJob('*/5 * * * * *', module.reportHostStats, null, true, CRON_TIMEZONE); module.log.info('starting host expiration job'); module.expireHostsJob = new CronJob('*/20 * * * * *', module.expireNetHosts, null, true, CRON_TIMEZONE); module.log.info('registering host with DTP Sites platform', { }); await module.registerHost(); await module.setHostStatus('active'); module.log.info(`${module.pkg.name} v${module.pkg.version} ${module.config.componentName} started`); } catch (error) { module.log.error('failed to start Host Cache worker', { error }); process.exit(-1); } })();