Core synchronous queue with an example worker

This commit is contained in:
2020-11-02 10:51:08 +00:00
parent d6b5b033f2
commit 0a30f2d466
16 changed files with 1103 additions and 1 deletions

View File

@@ -0,0 +1,191 @@
const EventEmitter = require('events')
const threads = require('worker_threads')
const path = require('path')
const fs = require('fs')
const Exceptions = require('./exceptions.js')
const CalcTask = require('./CalcTask.js')
// Basic utility functions to help the queue manager
const CalcQueueUtils = {
getCalcWorker: function(calcFn) {
return path.resolve(path.join('src/workers', calcFn + '.js'))
},
isWorkerValid: function(workerPath) {
return fs.existsSync(workerPath)
},
// Start the next task if there are any queued
processNext: function() {
if (this.queue.length == 0) {
return false
}
/* TODO: work out if the process can be run in parallel with outer
* processes or must be done sequentially
*/
const task = this.queue.shift()
task._worker = new threads.Worker(task.workerPath())
task._worker.on('message', (msg) => {
task._messages.push(msg)
})
task._worker.on('error', (err) => {
task._error = err
task._messages.push('Error: ' + err.message)
})
task._worker.on('exit', (exitCode) => {
task._result = exitCode
task._worker = null
const idx = this.inProgress.findIndex((t) => {
return t.id() == task.id() && t.calcFn() == task.calcFn()
})
this.inProgress.splice(idx, 1)
this.finished.push(task)
console.log(task)
})
this.inProgress.push(task)
}
}
// A calculation task. Only one of these per app
/* Allows calculations to be queued as tasks. Currently tasks are ryun
* synchronously.
* TODO: Add async tasks.
*/
const CalcQueue = function() {
if (!(this instanceof CalcQueue)) {
return new CalcQueue()
}
this.queue = []
this.inProgress = []
this.finished = []
return this
}
CalcQueue.prototype = {
isBusy: function() {
return this.inProgress.length > 0;
},
// Report if a task or collection of tasks by id are finished
isFinished: function(id, calcFn = null) {
if (calcFn == null) {
const fin = this.finished.filter((el) => {
return el.id() == id
}).reduce((agg, el) => {
return agg && el.result() != null
}, true)
return fin
} else {
const idx = this.finished.findIndex((el) => {
return el.id() == id && (calcFn == null || el.calcFn() == calcFn)
})
return idx != -1
}
},
// Get the list of finished tasks
getFinished: function() {
return this.finished.map((q) => {
return {
'id': q.id(),
'calcFn': q.calcFn(),
'result': q.result(),
'messages': q.messages(),
'error': (q.error() == null ? null : q.error().message)
}
})
},
// Report if a task or collection of tasks by id are in progress
isInProgress: function(id, calcFn = null) {
const idx = this.inProgress.findIndex((el) => {
return el.id() == id && (calcFn == null || el.calcFn() == calcFn)
})
return idx != -1
},
// Get the list of in-progress tasks
getInProgress: function() {
return this.inProgress.map((q) => {
return { 'id': q.id(), 'calcFn': q.calcFn() }
})
},
// Report if a task or collection of tasks by id are queued
isQueued: function(id, calcFn = null) {
return (this.queuePosition(id, calcFn) != -1)
},
// Report the position of a task in the queue
queuePosition: function(id, calcFn = null) {
return this.queue.findIndex((el) => {
return el.id() == id && (calcFn == null || el.calcFn() == calcFn)
})
},
// Get the list of queued tasks
getQueue: function() {
return this.queue.map((q) => {
return { 'id': q.id(), 'calcFn': q.calcFn() }
})
},
// Remove all tasks from the queue, does not affect running tasks
clearQueue: function() {
var okay = true
while (this.queue.length > 0) {
okay |= this.dequeue(this.queue[0].id)
this.queue.splice(0, 1)
}
return okay
},
// Add a new task to the queue if it does not already exist
enqueue: function(id, calcFn) {
if (this.isQueued(id, calcFn)) {
throw new Exceptions.AlreadyQueued()
}
if (this.isInProgress(id, calcFn)) {
throw new Exceptions.AlreadyInProgress()
}
const workerPath = CalcQueueUtils.getCalcWorker(calcFn)
if (!CalcQueueUtils.isWorkerValid(workerPath)) {
throw new Exceptions.CalculationUnknown()
}
this.queue.push(new CalcTask(id, calcFn, workerPath))
if (!this.isBusy()) {
CalcQueueUtils.processNext.call(this)
}
return true
},
// Remove a task from the queue if it is queued
dequeue: function(id, calcFn) {
var idx = this.queuePosition(id, calcFn)
if (idx == -1) {
return false
}
// TODO: if item is in process, wait for it to finish
while (idx != -1) {
this.queue.splice(idx, 1)
idx = this.queuePosition(id, calcFn)
}
return true
}
// TODO: find, store & list worker scripts on start up instead of on request?
}
module.exports = new CalcQueue()

View File

@@ -0,0 +1,50 @@
// A calculation task.
/* Contains the queued calculation and the results post calculation as well as
* any output messages from the task
*/
const CalcTask = function(id, calcFn, workerPath) {
if (!(this instanceof CalcTask)) {
return new CalcTask(id, calcFn, workerPath);
}
this._id = id
this._calcFn = calcFn
this._workerPath = workerPath
this._result = null
this._messages = []
this._error = null
this._worker = null
return this
}
CalcTask.prototype = {
id: function() {
return this._id
},
calcFn: function() {
return this._calcFn
},
workerPath: function() {
return this._workerPath
},
isInProgress: function() {
return this._worker != null
},
isFinished: function() {
return this._result != null
},
messages: function() {
return this._messages
},
error: function() {
return this._error
},
result: function() {
return this._result
}
}
module.exports = CalcTask

View File

@@ -0,0 +1,4 @@
module.exports.AlreadyQueued = require('./exceptions/AlreadyQueued.js')
module.exports.AlreadyInProgress = require('./exceptions/AlreadyInProgress.js')
module.exports.CalculationUnknown = require('./exceptions/CalculationUnknown.js')

View File

@@ -0,0 +1,11 @@
module.exports = function AlreadyInProgress(extra) {
Error.captureStackTrace(this, this.constructor)
this.name = this.constructor.name
this.message = 'calculation request is already in progress'
if (typeof extra !== 'undefined') {
this.extra = extra
}
}
require('util').inherits(module.exports, Error)

View File

@@ -0,0 +1,11 @@
module.exports = function AlreadyQueued(extra) {
Error.captureStackTrace(this, this.constructor)
this.name = this.constructor.name
this.message = 'calculation request is already queued'
if (typeof extra !== 'undefined') {
this.extra = extra
}
};
require('util').inherits(module.exports, Error)

View File

@@ -0,0 +1,11 @@
module.exports = function CalculationUnknown(extra) {
Error.captureStackTrace(this, this.constructor)
this.name = this.constructor.name
this.message = 'calculation requested is unknown'
if (typeof extra !== 'undefined') {
this.extra = extra
}
};
require('util').inherits(module.exports, Error)

View File

@@ -0,0 +1,3 @@
module.exports = require('./CalcQueue.js')
module.exports.CalcQueueExceptions = require('./exceptions.js')

4
src/services/index.js Normal file
View File

@@ -0,0 +1,4 @@
module.exports = {
'calcQueue': require('./calc-queue')
}