Tags: Node.js »»»» Asynchronous Programming
Processing large datasets can be done with a simple for loop. But, processing one data item at a time, eliminates possible efficiency gains from processing data in parallel. But, parallel processing is not scalable, since it can easily swamp CPU or memory resources. Worker queue packages help by constraining the number of parallel tasks executing at any one time.
Sometimes we have a large number of tasks, for instance to process hundreds of image files. The generalized idea is an input dataset, where you need to apply transformation to some or all data items. This is the Map pattern, but at a larger scale than normal.
One use case is in Machine Learning during the phase of assembling training and test data before training a model. You might have a million images, and for each image you apply vision algorithms to recognize features of the image. Those features are turned into the numbers which are what's actually used in the trained model.
In my case, I've developed a static website generation tool, AkashaCMS, where the job is rendering input files like Markdown or Nunjucks templates into the HTML of a website. I use AkashaCMS to publish this site (TechSparx) and several others. There are over 1500 pages on this site, meaning that regenerating the site renders all 1500+ pages. Inside AkashaCMS is a loop that first finds all the content files, and one-by-one it renders the content to HTML. In order to work on AkashaCMS performance, I've replaced that loop with a worker queue.
Processing a large dataset can be done with a worker queue. Your program adds tasks into the queue, and the queue management module schedules task execution, delivering the results. The management module constrains simultaneous task execution to avoid swamping the system.
There's a matter of scaling. How quickly do you want to process the dataset? How many CPU's do you want to assign? A distributed task queue can spread the work over multiple computers. With enough computers, the dataset can be processed in a jiffy. But distributed task queues are more complex, and in this tutorial we'll focus on an in-memory worker queue that keeps everything within the boundary of the traditional Node.js event queue.
Dataset processing 101
The simplistic approach is writing a simple loop:
let array = []; // Fill this array with data to process
for (let item of array) {
processItem(item);
}
It is easy to create and maintain such a loop. But, is this the best approach? For example the processItem
step might have pauses, such as reading data from disk or a database, during which the work can be parallelized. Processing items in parallel might reduce the overall time to process all items.
Remember that the original inspiration for Node.js was about handling asynchronous execution. There is a moment of time during which another system is handling the asynchronous request for data. We were asked to consider this statement:
database.query('SELECT * FROM ...', function (err, result) {
if (err) handleErrors(err);
else handleResult(result);
});
Yes I know that today this would be an async
function and we'd use await
. The point was, what happens between the moment where database.query
is called, and callback function is invoked? That could be several milliseconds of time. To a computer that's an eternity, during which the computer could be doing other things.
With the simple processing loop, that pause waiting for data from other systems is wasted time. What Node.js does is return to the event loop so it can handle other tasks.
With the simple one-at-a-time loop we have above, there is no opportunity for Node.js to handle any other tasks while waiting for an asynchronous result.
Think about the static website generation task. Rendering each page is its own isolated task. It starts by reading a Markdown file, then applying several templates, writing the result to the output directory. Therefore, the pages could be rendered in parallel. While the renderer loop is reading a template for one file, or waiting on data from a database, the Node.js event loop could be rendering a template for another page. Page rendering can easily be parallelized, since typically the rendering of one page is not dependent on the rendering of other pages.
The generalized scenario is to have a series of data objects, where each object can be independently processed in parallel to processing other objects. In such cases it's possible to run processItem
on each item in parallel.
For example we could rewrite the loop above like so:
let items = []; // Fill with data objects
let tasks = [];
for (let item of items) {
tasks.push(processItemAsync(item));
}
await new Promise.all(tasks);
async function processItemAsync(item) { ... }
What this does is invoke an async function on each item, and therefore the tasks
array is filled with Promise objects. Those Promises start out in the unresolved state, but as the tasks finish they'll flip to either the resolved or rejected state. The Promise.all
step will wait until every Promise resolves either to the resolved or rejected state. For any that fail, an Error is thrown.
This is also a simple loop to code and to maintain. But, consider the 1500+ pages on this website, and whether it is feasible to attempt to generate 1500 pages simultaneously. No, that's not feasible.
Instead it is best to take a middle ground. That is, to have some kind of task manager which maintains a maximum number of simultaneous processing tasks, while ensuring all tasks finish, and handling any which throw errors.
For instance we might think, we have the items
array, that we push the Promise from processItemAsync
into a workers
array, make sure there is at most 10 items on that array, and as soon as any finish to enqueue another worker. But as you try to work that out, your eyes start to glaze over with details and potential complications.
Fortunately our predecessor programmers have already recognized this need, and have coded up solutions. In this tutorial we'll look at two such Node.js libraries, FastQ and Better Queue.
Building a task queue using fastq
The fastq package bills itself as a Fast, in memory work queue. The writeup includes performance claims showing that it's faster than other alternatives.
One nice thing to learn is that the package is 33 kiloBytes unpacked, and it has only one dependency. In other words, it won't contribute to node_modules
bloat.
To set up a processing queue with FastQ we do this:
const fastq = require('fastq').promise;
let concurrency = 10;
async function worker(item) {
...
return processedItem;
}
const queue = fastq(worker, concurrency);
That is, we pass in a worker function as well as a concurrency configuration. The worker function is called once per item that is pushed to the queue. The concurrency setting, as the name implies, determines how many workers will execute simultaneously.
Let's try a concrete example. We'll write this with ES6 modules so that we can use top-level async
/await
statements. Save this as fq.mjs
and run it with Node.js 14.8 or later.
import { promise as fastq } from 'fastq';
let concurrency = 5;
async function worker(item) {
return await new Promise((resolve, reject) => {
setTimeout(() => { resolve(item * 2); }, 1000);
});
}
const queue = fastq(worker, concurrency);
const tasks = [];
for (let item of [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]) {
tasks.push(queue.push(item));
}
await Promise.all(tasks);
console.log(tasks);
const results = [];
for (let result of tasks) {
results.push(await result);
}
console.log(results);
The worker function emulates a long-running task by using the setTimeout
function. The functional result is a simple math calculation.
We use queue.push
to add numbers to the queue to be processed. In the Promises version of FastQ, that we're using, the queue.push
function returns a Promise. When the task finishes, the Promise resolves to whatever value is returned from the worker function. That means the tasks
array is filled with Promise objects.
We use Promise.all
to wait for every Promise to resolve. That leaves the tasks
array filled with Promise objects which have been resolved.
The next loop simply extracts the value from each Promise object into a new array, results
.
The result looks like this:
$ node fq.mjs
[
Promise { 2 },
Promise { 4 },
Promise { 6 },
Promise { 8 },
Promise { 10 },
Promise { 12 },
Promise { 14 },
Promise { 16 },
Promise { 18 },
Promise { 20 }
]
[
2, 4, 6, 8, 10,
12, 14, 16, 18, 20
]
The first array indeed is filled with resolved Promise objects, and the second array has the values from each Promise.
In thinking about this a bit further, try commenting out the Promise.all
line and the console.log
line immediately following. The loop over the tasks
array is equivalent to what Promise.all
will do, and does not require a second step.
const tasks = [];
for (let item of [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]) {
tasks.push(queue.push(item));
}
// await Promise.all(tasks);
// console.log(tasks);
const results = [];
for (let result of tasks) {
results.push(await result);
}
console.log(results);
This loop waits for each Promise to be resolved, which is exactly what Promise.all
does. Therefore it's redundant to use Promise.all
to wait for the tasks to finish, this loop also waits for the tasks to finish.
That lets us directly retrieve the results in one step:
$ node fq.mjs
[
2, 4, 6, 8, 10,
12, 14, 16, 18, 20
]
What we've demonstrated is that FastQ is easy to use and manages processing several identical tasks. I have already coded an almost identical loop in AkashaCMS, and can verify that indeed it works in a more complex application.
Building a task queue using Better Queue
An alternative queue processor package is Better Qeueue. As you might imagine, it is described as being better, and capable of handling more complex scenarios.
The comprehensive feature set also means that Better Queue is larger, weighing in at 78 kiloBytes, and it has three dependencies. That's not exactly node_modules
bloat, but it's larger than FastQ.
The API uses the old-school callback approach rather than the new async
/await
approach. To see what that means let's go over an equivalent to the previous example:
import Queue from 'better-queue';
function worker(item, cb) {
setTimeout(() => { cb(undefined, item * 2); }, 1000);
}
const queue = new Queue(worker, { concurrent: 5 });
const finished = new Promise((resolve, reject) => {
queue.on('drain', function() { resolve(); });
});
await new Promise((resolve, reject) => {
queue.on('task_failed', function (taskId, err, stats) {
reject(err);
});
});
const results = [];
queue.on('task_finish', function(taskId, result, stats) {
results.push(result);
});
for (let item of [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]) {
queue.push(item);
}
await finished;
console.log(results);
The worker function is required to accept a callback function, in the cb
argument. The signature for cb
has an error
indicator as the first argument, and the result as the second.
When creating a Queue, the first argument is the worker function, and the second is an options object. There is a long list of options available which is part of why the Better Queue team can correctly say they support complex operations.
The resulting object supports event listeners, and you see here we use three - drain
, task_failed
, and task_finish
. The drain
event fires when the queue is fully processed. What we've done here is to set up a Promise object that will be resolved when the drain
event triggers. The task_failed
event is triggered when an error occurs in one of the tasks. We have configured this with a Promise wrapper, so that the error is surfaced in our application. The task_finish
function fires when, as the name suggests, each task finishes. We use that to push results into the results
array.
The API doesn't offer a way to collect results other than this event handler.
Next, we push the work into the queue, using a simple for
loop calling queue.push
. Because queue.push
returns a Task object that emits events, we could reframe this with a Promise wrapper. That wrapper would capture the failed
or finish
events and capture the results (or failure) in a different way.
The statement await finished
refers to the Promise which will trigger when the drain
event is fired. In other words, the script will wait until that Promise resolves, which means all tasks have finished, and all task_finish
events have triggered. At that time the results
array should have been filled with data.
To run the script:
$ node bq.mjs
[
2, 4, 6, 8, 10,
12, 14, 16, 18, 20
]
And, indeed, we get the expected output.
Here's an alternate implementation inspired by the discussion earlier:
import Queue from 'better-queue';
function worker(item, cb) {
setTimeout(() => { cb(undefined, item * 2); }, 1000);
}
const queue = new Queue(worker, { concurrent: 5 });
const tasks = [];
for (let item of [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]) {
let task = queue.push(item);
tasks.push(new Promise((resolve, reject) => {
task.on('failed', function(err) { reject(err); });
task.on('finish', function(result) { resolve(result); });
}))
}
const results = [];
for (let result of tasks) {
results.push(await result);
}
console.log(results);
This implementation is probably easier to understand and maintain.
The tasks
array now contains a Promise object corresponding to each task in the queue. For tasks that succeed, the result is stored in the Promise, otherwise it stores the error. The second loop, as for the FastQ example, both waits for all tasks to finish, and retrieves the result from each Promise.
The result is the same:
$ node bq.mjs
[
2, 4, 6, 8, 10,
12, 14, 16, 18, 20
]
This demonstrates that Better Queue offers many options for organizing how you use the queue. Do you need to filter items added to the queue, or to reject processing some items? Better Queue lets you configure either of those behaviors, and many more.
Summary
In this tutorial we've learned about two packages that can help applications process large datasets. While it's possible to process a dataset using a simple loop, it's clearly possible to decrease the total time required by running the tasks in parallel.
For example, in AkashaCMS with a concurrency of 1
(no parallel execution) processing a test website takes 32 seconds, and with a concurrency of 10
it takes 9 seconds.
Let's try the same difference with the FastQ example.
$ time node fq.mjs
[
2, 4, 6, 8, 10,
12, 14, 16, 18, 20
]
real 0m10.573s
user 0m0.120s
sys 0m0.029s
$ time node fq.mjs
[
2, 4, 6, 8, 10,
12, 14, 16, 18, 20
]
real 0m1.255s
user 0m0.110s
sys 0m0.028s
The first timing is with concurrency
set to 1
, which means no parallel execution. Remember that the worker function includes a 1 second delay. Therefore the time shown here, approximately 10 seconds, is consistent with executing the 10
tasks. The second timing is with concurrency
set to 10
, which allows for up to ten tasks to execute in parallel. The time shown here is also consistent with executing 10 tasks in parallel.
In a real world setting, where the task is performing significant work, increasing the parallelization creates a tension. On the one hand your program can do simultaneous parallel tasks, but on the other hand this stretches the memory and CPU impact of the Node.js process. It's best to measure your application at different concurrency levels, and to adjust the concurrency to your needs.