From 4e1377241f38424de7f3fa59a6aa5e94bbf25918 Mon Sep 17 00:00:00 2001 From: avsdev-cw Date: Mon, 2 Nov 2020 12:03:44 +0000 Subject: [PATCH] Record stdout and stderr from the worker. Add a more detailed example --- src/services/calc-queue/CalcQueue.js | 31 +++++++++++++- src/services/calc-queue/CalcTask.js | 8 ++++ src/workers/example.js | 60 +++++++++++++++++++++++----- 3 files changed, 88 insertions(+), 11 deletions(-) diff --git a/src/services/calc-queue/CalcQueue.js b/src/services/calc-queue/CalcQueue.js index 37e9614..5cb6780 100644 --- a/src/services/calc-queue/CalcQueue.js +++ b/src/services/calc-queue/CalcQueue.js @@ -1,3 +1,4 @@ +const dotenv = require('dotenv') const EventEmitter = require('events') const threads = require('worker_threads') const path = require('path') @@ -6,6 +7,8 @@ const fs = require('fs') const Exceptions = require('./exceptions.js') const CalcTask = require('./CalcTask.js') +dotenv.config() + // Basic utility functions to help the queue manager const CalcQueueUtils = { getCalcWorker: function(calcFn) { @@ -28,8 +31,21 @@ const CalcQueueUtils = { const task = this.queue.shift() - task._worker = new threads.Worker(task.workerPath()) + task._worker = new threads.Worker( + task.workerPath(), + { + stderr: true, + stdout: true, + workerData: { 'id': task._id } + } + ) + task._worker.stdout.on('data', (chunk) => { + task._stdout = task._stdout.concat(chunk) + }) + task._worker.stderr.on('data', (chunk) => { + task._stderr = task._stderr.concat(chunk) + }) task._worker.on('message', (msg) => { task._messages.push(msg) }) @@ -97,8 +113,19 @@ CalcQueue.prototype = { 'id': q.id(), 'calcFn': q.calcFn(), 'result': q.result(), + 'stdout': q.stdout().trim().split('\n'), + 'stderr': q.stderr().trim().split('\n'), 'messages': q.messages(), - 'error': (q.error() == null ? null : q.error().message) + 'error': ( + q.error() == null + ? null + : q.error().name + ': ' + q.error().message + ), + 'errorStack': ( + process.env.NODE_ENV !== 'development' || q.error() == null + ? null + : q.error().stack.split('\n') + ) } }) }, diff --git a/src/services/calc-queue/CalcTask.js b/src/services/calc-queue/CalcTask.js index 1ffd555..3608a13 100644 --- a/src/services/calc-queue/CalcTask.js +++ b/src/services/calc-queue/CalcTask.js @@ -12,6 +12,8 @@ const CalcTask = function(id, calcFn, workerPath) { this._calcFn = calcFn this._workerPath = workerPath this._result = null + this._stderr = '' + this._stdout = '' this._messages = [] this._error = null this._worker = null @@ -36,6 +38,12 @@ CalcTask.prototype = { return this._result != null }, + stdout: function() { + return this._stdout + }, + stderr: function() { + return this._stderr + }, messages: function() { return this._messages }, diff --git a/src/workers/example.js b/src/workers/example.js index 4ddfcce..84abcb8 100644 --- a/src/workers/example.js +++ b/src/workers/example.js @@ -1,15 +1,57 @@ +const threads = require('worker_threads') -console.log('This is an example worker') +// Worker task info +/* maxParallel: [0,1,n] 0 = auto (usually nproc), 1 = single instance, + * n = max 'n' instances + * shareParallel: [true/false] indicates if the worker can be run in parallel + * with other workers. + * usesGPU: [true/false] indicates if the worker requires the GPU + */ +const taskInfo = { + maxParallel: 0, + shareParallel: true, + usesGPU: false +} -new Promise((resolve, reject) => { - setTimeout((() => { - console.log('I did something (nothing) for 5 seconds!'); - resolve() - }).bind(this), 5000) -}) +// The task to be run +const task = function() { + console.log('This is an example worker') + + console.log('This is the data from the calling thread:') + console.log(threads.workerData) + + threads.parentPort.postMessage('This is a message') + threads.parentPort.postMessage({ + color: 'red', + text: '$color is my favourite' + }) + + new Promise((resolve, reject) => { + setTimeout((() => { + console.log('I did something (nothing) for 5 seconds!'); + resolve() + }).bind(this), 5000) + }) + + new Promise((resolve, reject) => { + setTimeout((() => { + console.error('This is an error message after 6 seconds!'); + resolve() + }).bind(this), 6000) + }) + + new Promise((resolve, reject) => { + setTimeout((() => { + throw new Error('Exceptions in the worker are logged in the parent') + }).bind(this), 7000) + }) +} if (!module.parent) { - console.log('not a module') + if (!threads.isMainThread) { + task() + } } else { - console.log('loaded as a module') + module.exports = taskInfo + module.task = task }