Node.js worker threads

Worker Threads are available in the Node.js module. Before we dive into my example use of Worker threads in Node.js, we will see what are worker threads and why we "could" need it in Node.js.

Understand Node.js and worker threads

When a Node.js process is launched, it runs:

  1. One process: A process is a global object that can be accessed anywhere and has information about what’s being executed at a time.
  2. One thread: Being single-threaded means that only one set of instructions is executed at a time in a given process.
  3. One event loop: This is one of the most important aspects to understand about Node. It’s what allows Node to be asynchronous and have non-blocking I/O, — despite the fact that JavaScript is single-threaded — by offloading operations to the system kernel whenever possible through callbacks, promises and async/await.
  4. One JS Engine Instance: This is a computer program that executes JavaScript code.
  5. One Node.js Instance: The computer program that executes Node.js code.

In other words, Node.js runs on a single thread, and there is just one process happening at a time in the event loop. The code is NOT executed in parallel and you do not need to worry about concurrency issues.

There is a downside tough: if you have CPU-intensive code, like complex calculations in a large dataset taking place in-memory, it can block other processes from being executed.

The golden rule: don’t block the event loop, try to keep it running and pay attention and avoid anything that could

block the thread like synchronous network calls or infinite loops.

It’s important to differentiate between CPU operations and I/O (input/output) operations. As mentioned earlier, the code of Node.js is NOT executed in parallel. Only I/O operations are run in parallel, because they are executed asynchronously.

So Worker Threads will not help much with I/O-intensive work because asynchronous I/O operations are more efficient than Workers can be. The main goal of Workers is to improve the performance on CPU-intensive operations not I/O operations.

When to use worker threads?

The best solution for CPU performance is Worker Threads. Browsers have had the concept of Workers for a long time.

Instead of having:

  1. One process
  2. One thread
  3. One event loop
  4. One JS Engine Instance
  5. One Node.js Instance

Worker threads have:

  1. One process
  2. Multiple threads
  3. One event loop per thread
  4. One JS Engine Instance per thread
  5. One Node.js Instance per thread

Experiment

In my example I have an external service that can be "considered" having CPU intensive calculations, it is a Rest API endpoint built to enable long polling and can after 15 seconds require the client to make a new request. If 15 seconds has been reached the service responds with a status 204. The external services will in normal cases respond with event data from trot races with a start and finish event and id plus time for any horse in the race.

The external service will respond with event data for start time and finnish time for horses in trot races:

// Start data
{
  event: "start",
  horse: {
    id: 1,
    name: 'Spin Scorpion' 
  },
  time: 0
}

// Goal data
{
  event: "finish",
  horse: {
    id: 1,
    name: 'Spin Scorpion' 
  },
  time: 13230
}

First we start by implementing the Node.js server using Express.js that are going to run the worker threads for subscribing on trot race data and storing it in the database.

// server.js

const config = require("./config");
const http = require("http");
const express = require("express");
const run = require("./services");

const app = express();
const server = http.createServer(app);

/**
 * Run worker threads to subscribe and save data
 */
run().catch(err => console.error(err));

server.listen(config.port, function() {
  console.info(
    `${config.appName} listening on: http://localhost:${config.port}`
  );
});

module.exports = server;

The module responsible for all communication between the main thread and the worker threads exports only one async function, called run and in this little experiment it starts everything.

// .server/services/index.js

/**
 * We import the Node.js module for worker threads
 */
const { Worker } = require("worker_threads");

/**
 * Services that handles all messages between 
 * the main thread and worker threads
 * @returns {Promise<void>}
 */
async function runService() {
  // Create two new worker threads, 
  // - on for subscription of trot events, 
  // - one to store event data in a MongoDb 
  const subscribeWorker = new Worker("./server/services/subscribeWorker.js");
  const dbWorker = new Worker("./server/services/dbWorker.js");

  /**
   * Send a message to the database worker thread
   * telling it to connect to the MongoDb.
   */
  dbWorker.postMessage({status: 'connect'})

  /**
   * Listening on feedback messages from the subscriberWorker
   * The subscriberWorker has two message it sends to the main thread
   * - 'subscribe' - means that it wants the main thread to start a new subscription
   * - 'save' - means it has received event data that should be saved.
   */
  subscribeWorker.on("message", incoming => {
    // destructure incoming message to read status
    // and possible data related to the status
    const { status, data } = incoming
    
    if( status === 'subscribe') {
      // send a message and tell the subscriberWorker to re-start  
      subscribeWorker.postMessage('subscribe')
    }

    if( status === 'save') {
      // send a message to the dbWorker to save the data
      dbWorker.postMessage({status: 'save', data: data})
    }

  })

  // Basic error handling for the subscriberWorker
  subscribeWorker.on("error", code => new Error(`Subscribe Worker error with exit code ${code}`));
  subscribeWorker.on("exit", code =>
    console.log(`Subscribe Worker stopped with exit code ${code}`)
  )

  /**
   * Listening on feedback messages from databaseWorker
   * The databaseWorker sending only one message, 'subscribe'
   * and it will do so after a successful connection has established
   * and after data has been saved
   */
  dbWorker.on("message", incoming => {
    const { status, data } = incoming

    // Call subscription worker after data has been saved by the save worker
    if( status === 'subscribe') {
      subscribeWorker.postMessage('subscribe')
    }

  })

  // Basic error handling for the dbWorker
  dbWorker.on("error", code => new Error(`Save Worker error with exit code ${code}`));
  dbWorker.on("exit", code =>
    console.log(`Save Worker stopped with exit code ${code}`)
  )
}

/**
 * Entry point to start everything 
 */
module.exports = async function run() {
  await runService()
};

The implementation of the subscriberWorker:

// .server/services/subscribeWorker.js

/**
 * Subscriber worker thread module
 */
const { parentPort } = require("worker_threads")

/**
 * XHR wrapper to call end point api
 * @type {getEventData}
 */
const getEventData = require("./getEventData")

/**
 * Listening on messages from parent thread
 * It handles only one message that will tell it
 * to start subscribing
 */
parentPort.on("message", message => {
  if( message === 'subscribe') {
    subscribe()
  }
})

/**
 * Subscription function, using axios to get data from polling enabled API endpoint
 * @returns {Promise<void>}
 */
async function subscribe() {
  let response = await getEventData()

  // If no content
  // Tell parent thread to start a new subscription
  if (response.status !== 200) {
    parentPort.postMessage({status: 'subscribe'})
  } else {

    // Data has been retrieves
    // Tell parent to deal with saving
    let { data } = await response
    parentPort.postMessage({status: 'save', data: data})
  }
}

The implementation of the dbWorker:

// .server/services/dbWorker.js

/**
 * Database worker thread module
 */
const { parentPort } = require("worker_threads")

// Mongoose stuff
const config = require('../config')
const mongoose = require("mongoose")
const Schema = mongoose.Schema;
mongoose.Promise = global.Promise;
const schema = new Schema({
  event: { type: String },
  horse: { id: Number, horse: String },
  time: { type: Number }
});
const Model = mongoose.model("Result", schema);

/**
 * Listening on messages from parent thread
 * The parent thread sends two message
 * 'connect' to tell to connect to the database
 * 'save' to save the event data to the database
 */
parentPort.on("message", incoming => {
  const { status, data } = incoming;

  // Parent issued a connect message
  // After connection is successful - send a message 
  // to the parent thread to start subscribing
  if (status === "connect") {
    mongoose
      .connect(config.mongoUri, config.mongoOptions)
      .then(() => {
        console.info("Mongoose connected successfully", config.mongoUri)

        // Tell parent thread to start a new subscription
        parentPort.postMessage({ status: "subscribe" });
      })
      .catch(error => {
        console.error("Mongoose connected failed: " + error);
      })
  }

  // Parent thread send us a 'save' message
  if (status === "save") {
    saveData(data);
  }
});

/**
 * Save data to mongo db
 * @param data
 * @returns {Promise<void>}
 */
async function saveData(data) {
  
  // save data
  const document = new Model(data)
  await document.save()

  // When data is saved - tell parent to start a new subscription
  parentPort.postMessage({ status: "subscribe" })
}

Summary

It was really fun to experiment with worker threads and I think it will be used quiet often but, use it with care because I think postMessage API can bli slow in itself, however, that may have changed today.

Possible uses cases?

  • Serializing to JSON large responses to API requests, such as those resulting from GraphQL or objection.js relation

graph queries

  • Evaluating complex business logic rules on large data volumes (e.g. dynamic pricing of many products)
  • Collecting and transforming larger bunches of data from the database for e.g. generating some single report or document
  • Parsing large complex documents to an easily queryable representation, e.g. when web scraping

Published: 2019-11-14
Author: Henrik Grönvall
Henrik Grönvall
Copyright © 2022 Henrik Grönvall Consulting AB