Node Worker Threads With Async Parallel
Posted
worker_threads
is a new feature of Node 11, allowing multithreaded operations with Node. Especially useful for CPU intensive workloads where you wish to make use of a multicore machine.
async
is a module for Node which helps provide a framework for various async operations in Node. It can provide speedups for IO (disk, network) intensive workloads, but since it still only runs on a single thread doesn’t help much for CPU intensive workloads.
Using async/parallel
Lets have a look at how we’d traditionally run parallel tasks with async:
const os = require('os')
const parallelLimit = require('async/parallelLimit')
// our CPU intensive operation
function fibonacci(n) {
if (n < 2) {
return 1
} else {
return fibonacci(n - 2) + fibonacci(n - 1)
}
}
// number of jobs to run will be the number of CPU cores
const limit = os.cpus().length
const fibonacciSize = 40
// build a set of tasks for parallelLimit to run
const tasks = Array.from({ length: limit }, (v, k) => k + 1).map((task) => {
return cb => {
const result = fibonacci(fibonacciSize)
cb(null, result)
}
})
// run tasks with parallelLimit
parallelLimit(tasks, limit, (err, results) => {
console.log('Finished with', err, results)
})
Those tasks run in parallel but since we’re still limited to a single thread the total time is the same as running them sequentially.
Using worker_threads
Now let’s look at how we’d adapt this to using worker_threads
. We need two files now, one for the task and one to manage the whole operation.
index.js
const { Worker } = require('worker_threads')
const path = require('path')
const os = require('os')
const parallelLimit = require('async/parallelLimit')
// number of jobs to run will be the number of CPU cores
const limit = os.cpus().length
const workerScript = path.join(__dirname, './worker.js')
// build a set of tasks for parallelLimit to run
const tasks = Array.from({ length: limit }, (v, k) => k + 1).map((task) => {
return cb => {
const worker = new Worker(workerScript, { workerData: task })
worker.on('message', (result) => { cb(null, result) })
worker.on('error', (err) => { cb(err, null) })
}
})
// run tasks with parallelLimit
parallelLimit(tasks, limit, (err, results) => {
console.log('Finished with', err, results)
})
worker.js
const { parentPort, workerData, isMainThread } = require('worker_threads')
// our CPU intensive operation
function fibonacci(n) {
if (n < 2) {
return 1
} else {
return fibonacci(n - 2) + fibonacci(n - 1)
}
}
const fibonacciSize = 40
if (!isMainThread) {
const result = fibonacci(fibonacciSize)
parentPort.postMessage(result)
}
Those tasks run in parallel but each task is a different thread, so on a multicore machine it completes much faster.
Checkout the full code repository at https://gitlab.com/beyondtracks/async-parallel-worker-threads.