349 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
			
		
		
	
	
			349 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
| 'use strict'
 | |
| 
 | |
| const DEFAULT_OPTIONS = {
 | |
|           workerOptions               : {}
 | |
|         , maxCallsPerWorker           : Infinity
 | |
|         , maxConcurrentWorkers        : (require('os').cpus() || { length: 1 }).length
 | |
|         , maxConcurrentCallsPerWorker : 10
 | |
|         , maxConcurrentCalls          : Infinity
 | |
|         , maxCallTime                 : Infinity // exceed this and the whole worker is terminated
 | |
|         , maxRetries                  : Infinity
 | |
|         , forcedKillTime              : 100
 | |
|         , autoStart                   : false
 | |
|         , onChild                     : function() {}
 | |
|       }
 | |
| 
 | |
| const fork                    = require('./fork')
 | |
|     , TimeoutError            = require('errno').create('TimeoutError')
 | |
|     , ProcessTerminatedError  = require('errno').create('ProcessTerminatedError')
 | |
|     , MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError')
 | |
| 
 | |
| 
 | |
| function Farm (options, path) {
 | |
|   this.options     = Object.assign({}, DEFAULT_OPTIONS, options)
 | |
|   this.path        = path
 | |
|   this.activeCalls = 0
 | |
| }
 | |
| 
 | |
| 
 | |
| // make a handle to pass back in the form of an external API
 | |
| Farm.prototype.mkhandle = function (method) {
 | |
|   return function () {
 | |
|     let args = Array.prototype.slice.call(arguments)
 | |
|     if (this.activeCalls + this.callQueue.length >= this.options.maxConcurrentCalls) {
 | |
|       let err = new MaxConcurrentCallsError('Too many concurrent calls (active: ' + this.activeCalls + ', queued: ' + this.callQueue.length + ')')
 | |
|       if (typeof args[args.length - 1] == 'function')
 | |
|         return process.nextTick(args[args.length - 1].bind(null, err))
 | |
|       throw err
 | |
|     }
 | |
|     this.addCall({
 | |
|         method   : method
 | |
|       , callback : args.pop()
 | |
|       , args     : args
 | |
|       , retries  : 0
 | |
|     })
 | |
|   }.bind(this)
 | |
| }
 | |
| 
 | |
| 
 | |
| // a constructor of sorts
 | |
| Farm.prototype.setup = function (methods) {
 | |
|   let iface
 | |
|   if (!methods) { // single-function export
 | |
|     iface = this.mkhandle()
 | |
|   } else { // multiple functions on the export
 | |
|     iface = {}
 | |
|     methods.forEach(function (m) {
 | |
|       iface[m] = this.mkhandle(m)
 | |
|     }.bind(this))
 | |
|   }
 | |
| 
 | |
|   this.searchStart    = -1
 | |
|   this.childId        = -1
 | |
|   this.children       = {}
 | |
|   this.activeChildren = 0
 | |
|   this.callQueue      = []
 | |
| 
 | |
|   if (this.options.autoStart) {
 | |
|     while (this.activeChildren < this.options.maxConcurrentWorkers)
 | |
|       this.startChild()
 | |
|   }
 | |
| 
 | |
|   return iface
 | |
| }
 | |
| 
 | |
| 
 | |
| // when a child exits, check if there are any outstanding jobs and requeue them
 | |
| Farm.prototype.onExit = function (childId) {
 | |
|   // delay this to give any sends a chance to finish
 | |
|   setTimeout(function () {
 | |
|     let doQueue = false
 | |
|     if (this.children[childId] && this.children[childId].activeCalls) {
 | |
|       this.children[childId].calls.forEach(function (call, i) {
 | |
|         if (!call) return
 | |
|         else if (call.retries >= this.options.maxRetries) {
 | |
|           this.receive({
 | |
|               idx   : i
 | |
|             , child : childId
 | |
|             , args  : [ new ProcessTerminatedError('cancel after ' + call.retries + ' retries!') ]
 | |
|           })
 | |
|         } else {
 | |
|           call.retries++
 | |
|           this.callQueue.unshift(call)
 | |
|           doQueue = true
 | |
|         }
 | |
|       }.bind(this))
 | |
|     }
 | |
|     this.stopChild(childId)
 | |
|     doQueue && this.processQueue()
 | |
|   }.bind(this), 10)
 | |
| }
 | |
| 
 | |
| 
 | |
| // start a new worker
 | |
| Farm.prototype.startChild = function () {
 | |
|   this.childId++
 | |
| 
 | |
|   let forked = fork(this.path, this.options.workerOptions)
 | |
|     , id     = this.childId
 | |
|     , c      = {
 | |
|           send        : forked.send
 | |
|         , child       : forked.child
 | |
|         , calls       : []
 | |
|         , activeCalls : 0
 | |
|         , exitCode    : null
 | |
|       }
 | |
| 
 | |
|   this.options.onChild(forked.child);
 | |
| 
 | |
|   forked.child.on('message', function(data) {
 | |
|     if (data.owner !== 'farm') {
 | |
|       return;
 | |
|     }
 | |
|     this.receive(data);
 | |
|   }.bind(this))
 | |
|   forked.child.once('exit', function (code) {
 | |
|     c.exitCode = code
 | |
|     this.onExit(id)
 | |
|   }.bind(this))
 | |
| 
 | |
|   this.activeChildren++
 | |
|   this.children[id] = c
 | |
| }
 | |
| 
 | |
| 
 | |
| // stop a worker, identified by id
 | |
| Farm.prototype.stopChild = function (childId) {
 | |
|   let child = this.children[childId]
 | |
|   if (child) {
 | |
|     child.send({owner: 'farm', event: 'die'})
 | |
|     setTimeout(function () {
 | |
|       if (child.exitCode === null)
 | |
|         child.child.kill('SIGKILL')
 | |
|     }, this.options.forcedKillTime).unref()
 | |
|     ;delete this.children[childId]
 | |
|     this.activeChildren--
 | |
|   }
 | |
| }
 | |
| 
 | |
| 
 | |
| // called from a child process, the data contains information needed to
 | |
| // look up the child and the original call so we can invoke the callback
 | |
| Farm.prototype.receive = function (data) {
 | |
|   let idx     = data.idx
 | |
|     , childId = data.child
 | |
|     , args    = data.args
 | |
|     , child   = this.children[childId]
 | |
|     , call
 | |
| 
 | |
|   if (!child) {
 | |
|     return console.error(
 | |
|         'Worker Farm: Received message for unknown child. '
 | |
|       + 'This is likely as a result of premature child death, '
 | |
|       + 'the operation will have been re-queued.'
 | |
|     )
 | |
|   }
 | |
| 
 | |
|   call = child.calls[idx]
 | |
|   if (!call) {
 | |
|     return console.error(
 | |
|         'Worker Farm: Received message for unknown index for existing child. '
 | |
|       + 'This should not happen!'
 | |
|     )
 | |
|   }
 | |
| 
 | |
|   if (this.options.maxCallTime !== Infinity)
 | |
|     clearTimeout(call.timer)
 | |
| 
 | |
|   if (args[0] && args[0].$error == '$error') {
 | |
|     let e = args[0]
 | |
|     switch (e.type) {
 | |
|       case 'TypeError': args[0] = new TypeError(e.message); break
 | |
|       case 'RangeError': args[0] = new RangeError(e.message); break
 | |
|       case 'EvalError': args[0] = new EvalError(e.message); break
 | |
|       case 'ReferenceError': args[0] = new ReferenceError(e.message); break
 | |
|       case 'SyntaxError': args[0] = new SyntaxError(e.message); break
 | |
|       case 'URIError': args[0] = new URIError(e.message); break
 | |
|       default: args[0] = new Error(e.message)
 | |
|     }
 | |
|     args[0].type = e.type
 | |
|     args[0].stack = e.stack
 | |
| 
 | |
|     // Copy any custom properties to pass it on.
 | |
|     Object.keys(e).forEach(function(key) {
 | |
|       args[0][key] = e[key];
 | |
|     });
 | |
|   }
 | |
| 
 | |
|   process.nextTick(function () {
 | |
|     call.callback.apply(null, args)
 | |
|   })
 | |
| 
 | |
|   ;delete child.calls[idx]
 | |
|   child.activeCalls--
 | |
|   this.activeCalls--
 | |
| 
 | |
|   if (child.calls.length >= this.options.maxCallsPerWorker
 | |
|       && !Object.keys(child.calls).length) {
 | |
|     // this child has finished its run, kill it
 | |
|     this.stopChild(childId)
 | |
|   }
 | |
| 
 | |
|   // allow any outstanding calls to be processed
 | |
|   this.processQueue()
 | |
| }
 | |
| 
 | |
| 
 | |
| Farm.prototype.childTimeout = function (childId) {
 | |
|   let child = this.children[childId]
 | |
|     , i
 | |
| 
 | |
|   if (!child)
 | |
|     return
 | |
| 
 | |
|   for (i in child.calls) {
 | |
|     this.receive({
 | |
|         idx   : i
 | |
|       , child : childId
 | |
|       , args  : [ new TimeoutError('worker call timed out!') ]
 | |
|     })
 | |
|   }
 | |
|   this.stopChild(childId)
 | |
| }
 | |
| 
 | |
| 
 | |
| // send a call to a worker, identified by id
 | |
| Farm.prototype.send = function (childId, call) {
 | |
|   let child = this.children[childId]
 | |
|     , idx   = child.calls.length
 | |
| 
 | |
|   child.calls.push(call)
 | |
|   child.activeCalls++
 | |
|   this.activeCalls++
 | |
| 
 | |
|   child.send({
 | |
|       owner  : 'farm'
 | |
|     , idx    : idx
 | |
|     , child  : childId
 | |
|     , method : call.method
 | |
|     , args   : call.args
 | |
|   })
 | |
| 
 | |
|   if (this.options.maxCallTime !== Infinity) {
 | |
|     call.timer =
 | |
|       setTimeout(this.childTimeout.bind(this, childId), this.options.maxCallTime)
 | |
|   }
 | |
| }
 | |
| 
 | |
| 
 | |
| // a list of active worker ids, in order, but the starting offset is
 | |
| // shifted each time this method is called, so we work our way through
 | |
| // all workers when handing out jobs
 | |
| Farm.prototype.childKeys = function () {
 | |
|   let cka = Object.keys(this.children)
 | |
|     , cks
 | |
| 
 | |
|   if (this.searchStart >= cka.length - 1)
 | |
|     this.searchStart = 0
 | |
|   else
 | |
|     this.searchStart++
 | |
| 
 | |
|   cks = cka.splice(0, this.searchStart)
 | |
| 
 | |
|   return cka.concat(cks)
 | |
| }
 | |
| 
 | |
| 
 | |
| // Calls are added to a queue, this processes the queue and is called
 | |
| // whenever there might be a chance to send more calls to the workers.
 | |
| // The various options all impact on when we're able to send calls,
 | |
| // they may need to be kept in a queue until a worker is ready.
 | |
| Farm.prototype.processQueue = function () {
 | |
|   let cka, i = 0, childId
 | |
| 
 | |
|   if (!this.callQueue.length)
 | |
|     return this.ending && this.end()
 | |
| 
 | |
|   if (this.activeChildren < this.options.maxConcurrentWorkers)
 | |
|     this.startChild()
 | |
| 
 | |
|   for (cka = this.childKeys(); i < cka.length; i++) {
 | |
|     childId = +cka[i]
 | |
|     if (this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
 | |
|         && this.children[childId].calls.length < this.options.maxCallsPerWorker) {
 | |
| 
 | |
|       this.send(childId, this.callQueue.shift())
 | |
|       if (!this.callQueue.length)
 | |
|         return this.ending && this.end()
 | |
|     } /*else {
 | |
|       console.log(
 | |
|         , this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
 | |
|         , this.children[childId].calls.length < this.options.maxCallsPerWorker
 | |
|         , this.children[childId].calls.length , this.options.maxCallsPerWorker)
 | |
|     }*/
 | |
|   }
 | |
| 
 | |
|   if (this.ending)
 | |
|     this.end()
 | |
| }
 | |
| 
 | |
| 
 | |
| // add a new call to the call queue, then trigger a process of the queue
 | |
| Farm.prototype.addCall = function (call) {
 | |
|   if (this.ending)
 | |
|     return this.end() // don't add anything new to the queue
 | |
|   this.callQueue.push(call)
 | |
|   this.processQueue()
 | |
| }
 | |
| 
 | |
| 
 | |
| // kills child workers when they're all done
 | |
| Farm.prototype.end = function (callback) {
 | |
|   let complete = true
 | |
|   if (this.ending === false)
 | |
|     return
 | |
|   if (callback)
 | |
|     this.ending = callback
 | |
|   else if (this.ending == null)
 | |
|     this.ending = true
 | |
|   Object.keys(this.children).forEach(function (child) {
 | |
|     if (!this.children[child])
 | |
|       return
 | |
|     if (!this.children[child].activeCalls)
 | |
|       this.stopChild(child)
 | |
|     else
 | |
|       complete = false
 | |
|   }.bind(this))
 | |
| 
 | |
|   if (complete && typeof this.ending == 'function') {
 | |
|     process.nextTick(function () {
 | |
|       this.ending()
 | |
|       this.ending = false
 | |
|     }.bind(this))
 | |
|   }
 | |
| }
 | |
| 
 | |
| 
 | |
| module.exports              = Farm
 | |
| module.exports.TimeoutError = TimeoutError
 |