Worker Threads are available in the Node.js module. Before we dive into my example use of Worker threads in Node.js, we will see what are worker threads and why we "could" need it in Node.js.
When a Node.js process is launched, it runs:
In other words, Node.js runs on a single thread, and there is just one process happening at a time in the event loop. The code is NOT executed in parallel and you do not need to worry about concurrency issues.
There is a downside tough: if you have CPU-intensive code, like complex calculations in a large dataset taking place in-memory, it can block other processes from being executed.
The golden rule: don’t block the event loop, try to keep it running and pay attention and avoid anything that could
block the thread like synchronous network calls or infinite loops.
It’s important to differentiate between CPU operations and I/O (input/output) operations. As mentioned earlier, the code of Node.js is NOT executed in parallel. Only I/O operations are run in parallel, because they are executed asynchronously.
So Worker Threads will not help much with I/O-intensive work because asynchronous I/O operations are more efficient than Workers can be. The main goal of Workers is to improve the performance on CPU-intensive operations not I/O operations.
The best solution for CPU performance is Worker Threads. Browsers have had the concept of Workers for a long time.
Instead of having:
Worker threads have:
In my example I have an external service that can be "considered" having CPU intensive calculations, it is a Rest API endpoint built to enable long polling and can after 15 seconds require the client to make a new request. If 15 seconds has been reached the service responds with a status 204. The external services will in normal cases respond with event data from trot races with a start and finish event and id plus time for any horse in the race.
The external service will respond with event data for start time and finnish time for horses in trot races:
// Start data
{
event: "start",
horse: {
id: 1,
name: 'Spin Scorpion'
},
time: 0
}
// Goal data
{
event: "finish",
horse: {
id: 1,
name: 'Spin Scorpion'
},
time: 13230
}
First we start by implementing the Node.js server using Express.js that are going to run the worker threads for subscribing on trot race data and storing it in the database.
// server.js
const config = require("./config");
const http = require("http");
const express = require("express");
const run = require("./services");
const app = express();
const server = http.createServer(app);
/**
* Run worker threads to subscribe and save data
*/
run().catch(err => console.error(err));
server.listen(config.port, function() {
console.info(
`${config.appName} listening on: http://localhost:${config.port}`
);
});
module.exports = server;
The module responsible for all communication between the main thread and the worker threads exports only one async function, called run and in this little experiment it starts everything.
// .server/services/index.js
/**
* We import the Node.js module for worker threads
*/
const { Worker } = require("worker_threads");
/**
* Services that handles all messages between
* the main thread and worker threads
* @returns {Promise<void>}
*/
async function runService() {
// Create two new worker threads,
// - on for subscription of trot events,
// - one to store event data in a MongoDb
const subscribeWorker = new Worker("./server/services/subscribeWorker.js");
const dbWorker = new Worker("./server/services/dbWorker.js");
/**
* Send a message to the database worker thread
* telling it to connect to the MongoDb.
*/
dbWorker.postMessage({status: 'connect'})
/**
* Listening on feedback messages from the subscriberWorker
* The subscriberWorker has two message it sends to the main thread
* - 'subscribe' - means that it wants the main thread to start a new subscription
* - 'save' - means it has received event data that should be saved.
*/
subscribeWorker.on("message", incoming => {
// destructure incoming message to read status
// and possible data related to the status
const { status, data } = incoming
if( status === 'subscribe') {
// send a message and tell the subscriberWorker to re-start
subscribeWorker.postMessage('subscribe')
}
if( status === 'save') {
// send a message to the dbWorker to save the data
dbWorker.postMessage({status: 'save', data: data})
}
})
// Basic error handling for the subscriberWorker
subscribeWorker.on("error", code => new Error(`Subscribe Worker error with exit code ${code}`));
subscribeWorker.on("exit", code =>
console.log(`Subscribe Worker stopped with exit code ${code}`)
)
/**
* Listening on feedback messages from databaseWorker
* The databaseWorker sending only one message, 'subscribe'
* and it will do so after a successful connection has established
* and after data has been saved
*/
dbWorker.on("message", incoming => {
const { status, data } = incoming
// Call subscription worker after data has been saved by the save worker
if( status === 'subscribe') {
subscribeWorker.postMessage('subscribe')
}
})
// Basic error handling for the dbWorker
dbWorker.on("error", code => new Error(`Save Worker error with exit code ${code}`));
dbWorker.on("exit", code =>
console.log(`Save Worker stopped with exit code ${code}`)
)
}
/**
* Entry point to start everything
*/
module.exports = async function run() {
await runService()
};
The implementation of the subscriberWorker:
// .server/services/subscribeWorker.js
/**
* Subscriber worker thread module
*/
const { parentPort } = require("worker_threads")
/**
* XHR wrapper to call end point api
* @type {getEventData}
*/
const getEventData = require("./getEventData")
/**
* Listening on messages from parent thread
* It handles only one message that will tell it
* to start subscribing
*/
parentPort.on("message", message => {
if( message === 'subscribe') {
subscribe()
}
})
/**
* Subscription function, using axios to get data from polling enabled API endpoint
* @returns {Promise<void>}
*/
async function subscribe() {
let response = await getEventData()
// If no content
// Tell parent thread to start a new subscription
if (response.status !== 200) {
parentPort.postMessage({status: 'subscribe'})
} else {
// Data has been retrieves
// Tell parent to deal with saving
let { data } = await response
parentPort.postMessage({status: 'save', data: data})
}
}
The implementation of the dbWorker:
// .server/services/dbWorker.js
/**
* Database worker thread module
*/
const { parentPort } = require("worker_threads")
// Mongoose stuff
const config = require('../config')
const mongoose = require("mongoose")
const Schema = mongoose.Schema;
mongoose.Promise = global.Promise;
const schema = new Schema({
event: { type: String },
horse: { id: Number, horse: String },
time: { type: Number }
});
const Model = mongoose.model("Result", schema);
/**
* Listening on messages from parent thread
* The parent thread sends two message
* 'connect' to tell to connect to the database
* 'save' to save the event data to the database
*/
parentPort.on("message", incoming => {
const { status, data } = incoming;
// Parent issued a connect message
// After connection is successful - send a message
// to the parent thread to start subscribing
if (status === "connect") {
mongoose
.connect(config.mongoUri, config.mongoOptions)
.then(() => {
console.info("Mongoose connected successfully", config.mongoUri)
// Tell parent thread to start a new subscription
parentPort.postMessage({ status: "subscribe" });
})
.catch(error => {
console.error("Mongoose connected failed: " + error);
})
}
// Parent thread send us a 'save' message
if (status === "save") {
saveData(data);
}
});
/**
* Save data to mongo db
* @param data
* @returns {Promise<void>}
*/
async function saveData(data) {
// save data
const document = new Model(data)
await document.save()
// When data is saved - tell parent to start a new subscription
parentPort.postMessage({ status: "subscribe" })
}
It was really fun to experiment with worker threads and I think it will be used quiet often but, use it with care because I think postMessage API can bli slow in itself, however, that may have changed today.
Possible uses cases?
graph queries