Files
CalcScheduler/src/services/calc-queue/CalcQueue.js

219 lines
5.6 KiB
JavaScript

const dotenv = require('dotenv')
const EventEmitter = require('events')
const threads = require('worker_threads')
const path = require('path')
const fs = require('fs')
const Exceptions = require('./exceptions.js')
const CalcTask = require('./CalcTask.js')
dotenv.config()
// Basic utility functions to help the queue manager
const CalcQueueUtils = {
getCalcWorker: function(calcFn) {
return path.resolve(path.join('src/workers', calcFn + '.js'))
},
isWorkerValid: function(workerPath) {
return fs.existsSync(workerPath)
},
// Start the next task if there are any queued
processNext: function() {
if (this.queue.length == 0) {
return false
}
/* TODO: work out if the process can be run in parallel with outer
* processes or must be done sequentially
*/
const task = this.queue.shift()
task._worker = new threads.Worker(
task.workerPath(),
{
stderr: true,
stdout: true,
workerData: { 'id': task._id }
}
)
task._worker.stdout.on('data', (chunk) => {
task._stdout = task._stdout.concat(chunk)
})
task._worker.stderr.on('data', (chunk) => {
task._stderr = task._stderr.concat(chunk)
})
task._worker.on('message', (msg) => {
task._messages.push(msg)
})
task._worker.on('error', (err) => {
task._error = err
task._messages.push('Error: ' + err.message)
})
task._worker.on('exit', (exitCode) => {
task._result = exitCode
task._worker = null
const idx = this.inProgress.findIndex((t) => {
return t.id() == task.id() && t.calcFn() == task.calcFn()
})
this.inProgress.splice(idx, 1)
this.finished.push(task)
console.log(task)
})
this.inProgress.push(task)
}
}
// A calculation task. Only one of these per app
/* Allows calculations to be queued as tasks. Currently tasks are ryun
* synchronously.
* TODO: Add async tasks.
*/
const CalcQueue = function() {
if (!(this instanceof CalcQueue)) {
return new CalcQueue()
}
this.queue = []
this.inProgress = []
this.finished = []
return this
}
CalcQueue.prototype = {
isBusy: function() {
return this.inProgress.length > 0;
},
// Report if a task or collection of tasks by id are finished
isFinished: function(id, calcFn = null) {
if (calcFn == null) {
const fin = this.finished.filter((el) => {
return el.id() == id
}).reduce((agg, el) => {
return agg && el.result() != null
}, true)
return fin
} else {
const idx = this.finished.findIndex((el) => {
return el.id() == id && (calcFn == null || el.calcFn() == calcFn)
})
return idx != -1
}
},
// Get the list of finished tasks
getFinished: function() {
return this.finished.map((q) => {
return {
'id': q.id(),
'calcFn': q.calcFn(),
'result': q.result(),
'stdout': q.stdout().trim().split('\n'),
'stderr': q.stderr().trim().split('\n'),
'messages': q.messages(),
'error': (
q.error() == null
? null
: q.error().name + ': ' + q.error().message
),
'errorStack': (
process.env.NODE_ENV !== 'development' || q.error() == null
? null
: q.error().stack.split('\n')
)
}
})
},
// Report if a task or collection of tasks by id are in progress
isInProgress: function(id, calcFn = null) {
const idx = this.inProgress.findIndex((el) => {
return el.id() == id && (calcFn == null || el.calcFn() == calcFn)
})
return idx != -1
},
// Get the list of in-progress tasks
getInProgress: function() {
return this.inProgress.map((q) => {
return { 'id': q.id(), 'calcFn': q.calcFn() }
})
},
// Report if a task or collection of tasks by id are queued
isQueued: function(id, calcFn = null) {
return (this.queuePosition(id, calcFn) != -1)
},
// Report the position of a task in the queue
queuePosition: function(id, calcFn = null) {
return this.queue.findIndex((el) => {
return el.id() == id && (calcFn == null || el.calcFn() == calcFn)
})
},
// Get the list of queued tasks
getQueue: function() {
return this.queue.map((q) => {
return { 'id': q.id(), 'calcFn': q.calcFn() }
})
},
// Remove all tasks from the queue, does not affect running tasks
clearQueue: function() {
var okay = true
while (this.queue.length > 0) {
okay |= this.dequeue(this.queue[0].id)
this.queue.splice(0, 1)
}
return okay
},
// Add a new task to the queue if it does not already exist
enqueue: function(id, calcFn) {
if (this.isQueued(id, calcFn)) {
throw new Exceptions.AlreadyQueued()
}
if (this.isInProgress(id, calcFn)) {
throw new Exceptions.AlreadyInProgress()
}
const workerPath = CalcQueueUtils.getCalcWorker(calcFn)
if (!CalcQueueUtils.isWorkerValid(workerPath)) {
throw new Exceptions.CalculationUnknown()
}
this.queue.push(new CalcTask(id, calcFn, workerPath))
if (!this.isBusy()) {
CalcQueueUtils.processNext.call(this)
}
return true
},
// Remove a task from the queue if it is queued
dequeue: function(id, calcFn) {
var idx = this.queuePosition(id, calcFn)
if (idx == -1) {
return false
}
// TODO: if item is in process, wait for it to finish
while (idx != -1) {
this.queue.splice(idx, 1)
idx = this.queuePosition(id, calcFn)
}
return true
}
// TODO: find, store & list worker scripts on start up instead of on request?
}
module.exports = new CalcQueue()