608 lines
14 KiB
JavaScript
608 lines
14 KiB
JavaScript
'use strict'
|
|
|
|
const fastq = require('fastq')
|
|
const EE = require('node:events').EventEmitter
|
|
const inherits = require('node:util').inherits
|
|
const {
|
|
AVV_ERR_EXPOSE_ALREADY_DEFINED,
|
|
AVV_ERR_CALLBACK_NOT_FN,
|
|
AVV_ERR_ROOT_PLG_BOOTED,
|
|
AVV_ERR_READY_TIMEOUT,
|
|
AVV_ERR_ATTRIBUTE_ALREADY_DEFINED
|
|
} = require('./lib/errors')
|
|
const {
|
|
kAvvio,
|
|
kIsOnCloseHandler
|
|
} = require('./lib/symbols')
|
|
const { TimeTree } = require('./lib/time-tree')
|
|
const { Plugin } = require('./lib/plugin')
|
|
const { debug } = require('./lib/debug')
|
|
const { validatePlugin } = require('./lib/validate-plugin')
|
|
const { isBundledOrTypescriptPlugin } = require('./lib/is-bundled-or-typescript-plugin')
|
|
const { isPromiseLike } = require('./lib/is-promise-like')
|
|
const { thenify } = require('./lib/thenify')
|
|
const { executeWithThenable } = require('./lib/execute-with-thenable')
|
|
|
|
function Boot (server, opts, done) {
|
|
if (typeof server === 'function' && arguments.length === 1) {
|
|
done = server
|
|
opts = {}
|
|
server = null
|
|
}
|
|
|
|
if (typeof opts === 'function') {
|
|
done = opts
|
|
opts = {}
|
|
}
|
|
|
|
opts = opts || {}
|
|
opts.autostart = opts.autostart !== false
|
|
opts.timeout = Number(opts.timeout) || 0
|
|
opts.expose = opts.expose || {}
|
|
|
|
if (!new.target) {
|
|
return new Boot(server, opts, done)
|
|
}
|
|
|
|
this._server = server || this
|
|
this._opts = opts
|
|
|
|
if (server) {
|
|
this._expose()
|
|
}
|
|
|
|
/**
|
|
* @type {Array<Plugin>}
|
|
*/
|
|
this._current = []
|
|
|
|
this._error = null
|
|
|
|
this._lastUsed = null
|
|
|
|
this.setMaxListeners(0)
|
|
|
|
if (done) {
|
|
this.once('start', done)
|
|
}
|
|
|
|
this.started = false
|
|
this.booted = false
|
|
this.pluginTree = new TimeTree()
|
|
|
|
this._readyQ = fastq(this, callWithCbOrNextTick, 1)
|
|
this._readyQ.pause()
|
|
this._readyQ.drain = () => {
|
|
this.emit('start')
|
|
// nooping this, we want to emit start only once
|
|
this._readyQ.drain = noop
|
|
}
|
|
|
|
this._closeQ = fastq(this, closeWithCbOrNextTick, 1)
|
|
this._closeQ.pause()
|
|
this._closeQ.drain = () => {
|
|
this.emit('close')
|
|
// nooping this, we want to emit close only once
|
|
this._closeQ.drain = noop
|
|
}
|
|
|
|
this._doStart = null
|
|
|
|
const instance = this
|
|
this._root = new Plugin(fastq(this, this._loadPluginNextTick, 1), function root (server, opts, done) {
|
|
instance._doStart = done
|
|
opts.autostart && instance.start()
|
|
}, opts, false, 0)
|
|
|
|
this._trackPluginLoading(this._root)
|
|
|
|
this._loadPlugin(this._root, (err) => {
|
|
debug('root plugin ready')
|
|
try {
|
|
this.emit('preReady')
|
|
this._root = null
|
|
} catch (preReadyError) {
|
|
err = err || this._error || preReadyError
|
|
}
|
|
|
|
if (err) {
|
|
this._error = err
|
|
if (this._readyQ.length() === 0) {
|
|
throw err
|
|
}
|
|
} else {
|
|
this.booted = true
|
|
}
|
|
this._readyQ.resume()
|
|
})
|
|
}
|
|
|
|
inherits(Boot, EE)
|
|
|
|
Boot.prototype.start = function () {
|
|
this.started = true
|
|
|
|
// we need to wait any call to use() to happen
|
|
process.nextTick(this._doStart)
|
|
return this
|
|
}
|
|
|
|
// allows to override the instance of a server, given a plugin
|
|
Boot.prototype.override = function (server, func, opts) {
|
|
return server
|
|
}
|
|
|
|
Boot.prototype[kAvvio] = true
|
|
|
|
// load a plugin
|
|
Boot.prototype.use = function (plugin, opts) {
|
|
this._lastUsed = this._addPlugin(plugin, opts, false)
|
|
return this
|
|
}
|
|
|
|
Boot.prototype._loadRegistered = function () {
|
|
const plugin = this._current[0]
|
|
const weNeedToStart = !this.started && !this.booted
|
|
|
|
// if the root plugin is not loaded, let's resume that
|
|
// so one can use after() before calling ready
|
|
if (weNeedToStart) {
|
|
process.nextTick(() => this._root.queue.resume())
|
|
}
|
|
|
|
if (!plugin) {
|
|
return Promise.resolve()
|
|
}
|
|
|
|
return plugin.loadedSoFar()
|
|
}
|
|
|
|
Object.defineProperty(Boot.prototype, 'then', { get: thenify })
|
|
|
|
Boot.prototype._addPlugin = function (pluginFn, opts, isAfter) {
|
|
if (isBundledOrTypescriptPlugin(pluginFn)) {
|
|
pluginFn = pluginFn.default
|
|
}
|
|
validatePlugin(pluginFn)
|
|
opts = opts || {}
|
|
|
|
if (this.booted) {
|
|
throw new AVV_ERR_ROOT_PLG_BOOTED()
|
|
}
|
|
|
|
// we always add plugins to load at the current element
|
|
const current = this._current[0]
|
|
|
|
let timeout = this._opts.timeout
|
|
|
|
if (!current.loaded && current.timeout > 0) {
|
|
const delta = Date.now() - current.startTime
|
|
// We need to decrease it by 3ms to make sure the internal timeout
|
|
// is triggered earlier than the parent
|
|
timeout = current.timeout - (delta + 3)
|
|
}
|
|
|
|
const plugin = new Plugin(fastq(this, this._loadPluginNextTick, 1), pluginFn, opts, isAfter, timeout)
|
|
this._trackPluginLoading(plugin)
|
|
|
|
if (current.loaded) {
|
|
throw new Error(plugin.name, current.name)
|
|
}
|
|
|
|
// we add the plugin to be loaded at the end of the current queue
|
|
current.enqueue(plugin, (err) => { err && (this._error = err) })
|
|
|
|
return plugin
|
|
}
|
|
|
|
Boot.prototype._expose = function _expose () {
|
|
const instance = this
|
|
const server = instance._server
|
|
const {
|
|
use: useKey = 'use',
|
|
after: afterKey = 'after',
|
|
ready: readyKey = 'ready',
|
|
onClose: onCloseKey = 'onClose',
|
|
close: closeKey = 'close'
|
|
} = this._opts.expose
|
|
|
|
if (server[useKey]) {
|
|
throw new AVV_ERR_EXPOSE_ALREADY_DEFINED(useKey, 'use')
|
|
}
|
|
server[useKey] = function (fn, opts) {
|
|
instance.use(fn, opts)
|
|
return this
|
|
}
|
|
|
|
if (server[afterKey]) {
|
|
throw new AVV_ERR_EXPOSE_ALREADY_DEFINED(afterKey, 'after')
|
|
}
|
|
server[afterKey] = function (func) {
|
|
if (typeof func !== 'function') {
|
|
return instance._loadRegistered()
|
|
}
|
|
instance.after(encapsulateThreeParam(func, this))
|
|
return this
|
|
}
|
|
|
|
if (server[readyKey]) {
|
|
throw new AVV_ERR_EXPOSE_ALREADY_DEFINED(readyKey, 'ready')
|
|
}
|
|
server[readyKey] = function (func) {
|
|
if (func && typeof func !== 'function') {
|
|
throw new AVV_ERR_CALLBACK_NOT_FN(readyKey, typeof func)
|
|
}
|
|
return instance.ready(func ? encapsulateThreeParam(func, this) : undefined)
|
|
}
|
|
|
|
if (server[onCloseKey]) {
|
|
throw new AVV_ERR_EXPOSE_ALREADY_DEFINED(onCloseKey, 'onClose')
|
|
}
|
|
server[onCloseKey] = function (func) {
|
|
if (typeof func !== 'function') {
|
|
throw new AVV_ERR_CALLBACK_NOT_FN(onCloseKey, typeof func)
|
|
}
|
|
instance.onClose(encapsulateTwoParam(func, this))
|
|
return this
|
|
}
|
|
|
|
if (server[closeKey]) {
|
|
throw new AVV_ERR_EXPOSE_ALREADY_DEFINED(closeKey, 'close')
|
|
}
|
|
server[closeKey] = function (func) {
|
|
if (func && typeof func !== 'function') {
|
|
throw new AVV_ERR_CALLBACK_NOT_FN(closeKey, typeof func)
|
|
}
|
|
|
|
if (func) {
|
|
instance.close(encapsulateThreeParam(func, this))
|
|
return this
|
|
}
|
|
|
|
// this is a Promise
|
|
return instance.close()
|
|
}
|
|
|
|
if (server.then) {
|
|
throw new AVV_ERR_ATTRIBUTE_ALREADY_DEFINED('then')
|
|
}
|
|
Object.defineProperty(server, 'then', { get: thenify.bind(instance) })
|
|
|
|
server[kAvvio] = true
|
|
}
|
|
|
|
Boot.prototype.after = function (func) {
|
|
if (!func) {
|
|
return this._loadRegistered()
|
|
}
|
|
|
|
this._addPlugin(_after.bind(this), {}, true)
|
|
|
|
function _after (s, opts, done) {
|
|
callWithCbOrNextTick.call(this, func, done)
|
|
}
|
|
|
|
return this
|
|
}
|
|
|
|
Boot.prototype.onClose = function (func) {
|
|
// this is used to distinguish between onClose and close handlers
|
|
// because they share the same queue but must be called with different signatures
|
|
|
|
if (typeof func !== 'function') {
|
|
throw new AVV_ERR_CALLBACK_NOT_FN('onClose', typeof func)
|
|
}
|
|
|
|
func[kIsOnCloseHandler] = true
|
|
this._closeQ.unshift(func, (err) => { err && (this._error = err) })
|
|
|
|
return this
|
|
}
|
|
|
|
Boot.prototype.close = function (func) {
|
|
let promise
|
|
|
|
if (func) {
|
|
if (typeof func !== 'function') {
|
|
throw new AVV_ERR_CALLBACK_NOT_FN('close', typeof func)
|
|
}
|
|
} else {
|
|
promise = new Promise(function (resolve, reject) {
|
|
func = function (err) {
|
|
if (err) {
|
|
return reject(err)
|
|
}
|
|
resolve()
|
|
}
|
|
})
|
|
}
|
|
|
|
this.ready(() => {
|
|
this._error = null
|
|
this._closeQ.push(func)
|
|
process.nextTick(this._closeQ.resume.bind(this._closeQ))
|
|
})
|
|
|
|
return promise
|
|
}
|
|
|
|
Boot.prototype.ready = function (func) {
|
|
if (func) {
|
|
if (typeof func !== 'function') {
|
|
throw new AVV_ERR_CALLBACK_NOT_FN('ready', typeof func)
|
|
}
|
|
this._readyQ.push(func)
|
|
queueMicrotask(this.start.bind(this))
|
|
return
|
|
}
|
|
|
|
return new Promise((resolve, reject) => {
|
|
this._readyQ.push(readyPromiseCB)
|
|
this.start()
|
|
|
|
/**
|
|
* The `encapsulateThreeParam` let callback function
|
|
* bind to the right server instance.
|
|
* In promises we need to track the last server
|
|
* instance loaded, the first one in the _current queue.
|
|
*/
|
|
const relativeContext = this._current[0].server
|
|
|
|
function readyPromiseCB (err, context, done) {
|
|
// the context is always binded to the root server
|
|
if (err) {
|
|
reject(err)
|
|
} else {
|
|
resolve(relativeContext)
|
|
}
|
|
process.nextTick(done)
|
|
}
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @param {Plugin} plugin
|
|
* @returns {void}
|
|
*/
|
|
Boot.prototype._trackPluginLoading = function (plugin) {
|
|
const parentName = this._current[0]?.name || null
|
|
plugin.once('start', (serverName, funcName, time) => {
|
|
const nodeId = this.pluginTree.start(parentName || null, funcName, time)
|
|
plugin.once('loaded', (serverName, funcName, time) => {
|
|
this.pluginTree.stop(nodeId, time)
|
|
})
|
|
})
|
|
}
|
|
|
|
Boot.prototype.prettyPrint = function () {
|
|
return this.pluginTree.prettyPrint()
|
|
}
|
|
|
|
Boot.prototype.toJSON = function () {
|
|
return this.pluginTree.toJSON()
|
|
}
|
|
|
|
/**
|
|
* @callback LoadPluginCallback
|
|
* @param {Error} [err]
|
|
*/
|
|
|
|
/**
|
|
* Load a plugin
|
|
*
|
|
* @param {Plugin} plugin
|
|
* @param {LoadPluginCallback} callback
|
|
*/
|
|
Boot.prototype._loadPlugin = function (plugin, callback) {
|
|
const instance = this
|
|
if (isPromiseLike(plugin.func)) {
|
|
plugin.func.then((fn) => {
|
|
if (typeof fn.default === 'function') {
|
|
fn = fn.default
|
|
}
|
|
plugin.func = fn
|
|
this._loadPlugin(plugin, callback)
|
|
}, callback)
|
|
return
|
|
}
|
|
|
|
const last = instance._current[0]
|
|
|
|
// place the plugin at the top of _current
|
|
instance._current.unshift(plugin)
|
|
|
|
if (instance._error && !plugin.isAfter) {
|
|
debug('skipping loading of plugin as instance errored and it is not an after', plugin.name)
|
|
process.nextTick(execCallback)
|
|
return
|
|
}
|
|
|
|
let server = (last && last.server) || instance._server
|
|
|
|
if (!plugin.isAfter) {
|
|
// Skip override for after
|
|
try {
|
|
server = instance.override(server, plugin.func, plugin.options)
|
|
} catch (overrideErr) {
|
|
debug('override errored', plugin.name)
|
|
return execCallback(overrideErr)
|
|
}
|
|
}
|
|
|
|
plugin.exec(server, execCallback)
|
|
|
|
function execCallback (err) {
|
|
plugin.finish(err, (err) => {
|
|
instance._current.shift()
|
|
callback(err)
|
|
})
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Delays plugin loading until the next tick to ensure any bound `_after` callbacks have a chance
|
|
* to run prior to executing the next plugin
|
|
*/
|
|
Boot.prototype._loadPluginNextTick = function (plugin, callback) {
|
|
process.nextTick(this._loadPlugin.bind(this), plugin, callback)
|
|
}
|
|
|
|
function noop () { }
|
|
|
|
function callWithCbOrNextTick (func, cb) {
|
|
const context = this._server
|
|
const err = this._error
|
|
|
|
// with this the error will appear just in the next after/ready callback
|
|
this._error = null
|
|
if (func.length === 0) {
|
|
this._error = err
|
|
executeWithThenable(func, [], cb)
|
|
} else if (func.length === 1) {
|
|
executeWithThenable(func, [err], cb)
|
|
} else {
|
|
if (this._opts.timeout === 0) {
|
|
const wrapCb = (err) => {
|
|
this._error = err
|
|
cb(this._error)
|
|
}
|
|
|
|
if (func.length === 2) {
|
|
func(err, wrapCb)
|
|
} else {
|
|
func(err, context, wrapCb)
|
|
}
|
|
} else {
|
|
timeoutCall.call(this, func, err, context, cb)
|
|
}
|
|
}
|
|
}
|
|
|
|
function timeoutCall (func, rootErr, context, cb) {
|
|
const name = func.unwrappedName ?? func.name
|
|
debug('setting up ready timeout', name, this._opts.timeout)
|
|
let timer = setTimeout(() => {
|
|
debug('timed out', name)
|
|
timer = null
|
|
const toutErr = new AVV_ERR_READY_TIMEOUT(name)
|
|
toutErr.fn = func
|
|
this._error = toutErr
|
|
cb(toutErr)
|
|
}, this._opts.timeout)
|
|
|
|
if (func.length === 2) {
|
|
func(rootErr, timeoutCb.bind(this))
|
|
} else {
|
|
func(rootErr, context, timeoutCb.bind(this))
|
|
}
|
|
|
|
function timeoutCb (err) {
|
|
if (timer) {
|
|
clearTimeout(timer)
|
|
this._error = err
|
|
cb(this._error)
|
|
} else {
|
|
// timeout has been triggered
|
|
// can not call cb twice
|
|
}
|
|
}
|
|
}
|
|
|
|
function closeWithCbOrNextTick (func, cb) {
|
|
const context = this._server
|
|
const isOnCloseHandler = func[kIsOnCloseHandler]
|
|
if (func.length === 0 || func.length === 1) {
|
|
let promise
|
|
if (isOnCloseHandler) {
|
|
promise = func(context)
|
|
} else {
|
|
promise = func(this._error)
|
|
}
|
|
if (promise && typeof promise.then === 'function') {
|
|
debug('resolving close/onClose promise')
|
|
promise.then(
|
|
() => process.nextTick(cb),
|
|
(e) => process.nextTick(cb, e))
|
|
} else {
|
|
process.nextTick(cb)
|
|
}
|
|
} else if (func.length === 2) {
|
|
if (isOnCloseHandler) {
|
|
func(context, cb)
|
|
} else {
|
|
func(this._error, cb)
|
|
}
|
|
} else {
|
|
if (isOnCloseHandler) {
|
|
func(context, cb)
|
|
} else {
|
|
func(this._error, context, cb)
|
|
}
|
|
}
|
|
}
|
|
|
|
function encapsulateTwoParam (func, that) {
|
|
return _encapsulateTwoParam.bind(that)
|
|
function _encapsulateTwoParam (context, cb) {
|
|
let res
|
|
if (func.length === 0) {
|
|
res = func()
|
|
if (res && res.then) {
|
|
res.then(function () {
|
|
process.nextTick(cb)
|
|
}, cb)
|
|
} else {
|
|
process.nextTick(cb)
|
|
}
|
|
} else if (func.length === 1) {
|
|
res = func(this)
|
|
|
|
if (res && res.then) {
|
|
res.then(function () {
|
|
process.nextTick(cb)
|
|
}, cb)
|
|
} else {
|
|
process.nextTick(cb)
|
|
}
|
|
} else {
|
|
func(this, cb)
|
|
}
|
|
}
|
|
}
|
|
|
|
function encapsulateThreeParam (func, that) {
|
|
const wrapped = _encapsulateThreeParam.bind(that)
|
|
wrapped.unwrappedName = func.name
|
|
return wrapped
|
|
function _encapsulateThreeParam (err, cb) {
|
|
let res
|
|
if (!func) {
|
|
process.nextTick(cb)
|
|
} else if (func.length === 0) {
|
|
res = func()
|
|
if (res && res.then) {
|
|
res.then(function () {
|
|
process.nextTick(cb, err)
|
|
}, cb)
|
|
} else {
|
|
process.nextTick(cb, err)
|
|
}
|
|
} else if (func.length === 1) {
|
|
res = func(err)
|
|
if (res && res.then) {
|
|
res.then(function () {
|
|
process.nextTick(cb)
|
|
}, cb)
|
|
} else {
|
|
process.nextTick(cb)
|
|
}
|
|
} else if (func.length === 2) {
|
|
func(err, cb)
|
|
} else {
|
|
func(err, this, cb)
|
|
}
|
|
}
|
|
}
|
|
|
|
module.exports = Boot
|