Node Worker Threads With Async Parallel

worker_threads is a new feature of Node 11, allowing multithreaded operations with Node. Especially useful for CPU intensive workloads where you wish to make use of a multicore machine.

async is a module for Node which helps provide a framework for various async operations in Node. It can provide speedups for IO (disk, network) intensive workloads, but since it still only runs on a single thread doesn’t help much for CPU intensive workloads.

Using async/parallel

Lets have a look at how we’d traditionally run parallel tasks with async:

const os = require('os')
const parallelLimit = require('async/parallelLimit')

// our CPU intensive operation
function fibonacci(n) {
  if (n < 2) {
    return 1
  } else {
    return fibonacci(n - 2) + fibonacci(n - 1)
  }
}

// number of jobs to run will be the number of CPU cores
const limit = os.cpus().length

const fibonacciSize = 40

// build a set of tasks for parallelLimit to run
const tasks = Array.from({ length: limit }, (v, k) => k + 1).map((task) => {
  return cb => {
    const result = fibonacci(fibonacciSize)
    cb(null, result)
  }
})

// run tasks with parallelLimit
parallelLimit(tasks, limit, (err, results) => {
  console.log('Finished with', err, results)
})

Those tasks run in parallel but since we’re still limited to a single thread the total time is the same as running them sequentially.

Using worker_threads

Now let’s look at how we’d adapt this to using worker_threads. We need two files now, one for the task and one to manage the whole operation.

index.js

const { Worker } = require('worker_threads')
const path = require('path')
const os = require('os')
const parallelLimit = require('async/parallelLimit')

// number of jobs to run will be the number of CPU cores
const limit = os.cpus().length

const workerScript = path.join(__dirname, './worker.js')

// build a set of tasks for parallelLimit to run
const tasks = Array.from({ length: limit }, (v, k) => k + 1).map((task) => {
  return cb => {
    const worker = new Worker(workerScript, { workerData: task })
    worker.on('message', (result) => { cb(null, result) })
    worker.on('error', (err) => { cb(err, null) })
  }
})

// run tasks with parallelLimit
parallelLimit(tasks, limit, (err, results) => {
  console.log('Finished with', err, results)
})

worker.js

const { parentPort, workerData, isMainThread } = require('worker_threads')

// our CPU intensive operation
function fibonacci(n) {
  if (n < 2) {
    return 1
  } else {
    return fibonacci(n - 2) + fibonacci(n - 1)
  }
}

const fibonacciSize = 40

if (!isMainThread) {
  const result = fibonacci(fibonacciSize)
  parentPort.postMessage(result)
}

Those tasks run in parallel but each task is a different thread, so on a multicore machine it completes much faster.

Checkout the full code repository at https://gitlab.com/beyondtracks/async-parallel-worker-threads.