NodeJS - Clustering

nodejs

http://www.sitepoint.com/how-to-create-a-node-js-cluster-for-speeding-up-your-apps/ - done reading
https://nodejs.org/api/cluster.html

What is Node Clustering?

Each Node.js process runs in a single thread and by default it has a memory limit of 512MB on 32-bit systems and 1GB on 64-bit systems. Although the memory limit can be bumped to ~1GB on 32-bit systems and ~1.7GB on 64-bit systems, both memory and processing power can still become bottlenecks for various processes.

The elegant solution Node.js provides for scaling up the applications is to split a single process into multiple processes or workers, in Node.js terminology. This can be achieved through a cluster module. The cluster module allows you to create child processes (workers), which share all the server ports with the main Node process (master).

A cluster is a pool of similar workers running under a parent Node process. Workers are spawned using the fork() method of the child_processes module. This means workers can share server handles and use IPC (Inter-process communication) to communicate with the parent Node process.

The master process is in charge of initiating workers and controlling them. You can create an arbitrary number of workers in your master process. Moreover, remember that by default incoming connections are distributed in a round-robin approach among workers (except in Windows). Actually there is another approach to distribute incoming connections, that I won’t discuss here, which hands the assignment over to the OS (default in Windows). Node.js documentation suggests using the default round-robin style as the scheduling policy.

How can we use clustering?

First, we have to include it:

var cluster = require('cluster);

A cluster module executes the same Node.js process multiple times. Therefore, the first thing you need to do is to identify what portion of the code is for the master process and what portion is for the workers. The cluster module allows you to identify the master process as follows:

if(cluster.isMaster) { ... }

The master process is the process you initiate, which in turn initialize the workers. To start a worker process inside a master process, we’ll use the fork() method:

cluster.fork();

This method returns a worker object that contains some methods and properties about the forked worker.

var cluster = require('cluster');
var http = require('http');
var numCPUs = 4;

if (cluster.isMaster) {
    for (var i = 0; i < numCPUs; i++) {
        cluster.fork();
    }
} else {
    http.createServer(function(req, res) {
        res.writeHead(200);
        res.end('process ' + process.pid + ' says hello!');
    }).listen(8000);
}

You can test this server on your machine by starting it (run the command node simple.js) and accessing the URL http://127.0.0.1:8000/. When requests are received, they are distributed one at a time to each worker. If a worker is available, it immediately starts processing the request; otherwise it’ll be added to a queue.

There are a few points that are not very efficient in the above example. For instance, imagine if a worker dies for some reason. In this case, you lose one of your workers and if the same happens again, you will end up with a master process with no workers to handle incoming requests. Another issue is related to the number of workers. There are different number of cores/threads in the systems that you deploy your application to. In the mentioned example, to use all of the system’s resources, you have to manually check the specifications of each deployment server, find how many threads there are available, and update it in your code.

var cluster = require('cluster');

if(cluster.isMaster) {
    var numWorkers = require('os').cpus().length;

    console.log('Master cluster setting up ' + numWorkers + ' workers...');

    for(var i = 0; i < numWorkers; i++) {
        cluster.fork();
    }

    cluster.on('online', function(worker) {
        console.log('Worker ' + worker.process.pid + ' is online');
    });

    cluster.on('exit', function(worker, code, signal) {
        console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
        console.log('Starting a new worker');
        cluster.fork();
    });
} else {
    var app = require('express')();
    app.all('/*', function(req, res) {res.send('process ' + process.pid + ' says hello!').end();})

    var server = app.listen(8000, function() {
        console.log('Process ' + process.pid + ' is listening to all incoming requests');
    });
}

What events are emitted by the cluster module?

A cluster module contains several events. Two common events related to the moments of start and termination of workers are the online and the exit events. online is emitted when the worker is forked and sends the online message. exit is emitted when a worker process dies.

How can we use the events emitted by the cluster module to control the lifetime of the workers?

See the example above.

How can we handle communication between the master and the workers?

Occasionally you may need to send messages from the master to a worker to assign a task or perform other operations. In return, workers may need to inform the master that the task is completed. To listen for messages, an event listener for the message event should be set up in both master and workers:

worker.on('message', function(message) {
    console.log(message);
});

The worker object is the reference returned by the fork() method. To listen for messages from the master in a worker:

process.on('message', function(message) {
    console.log(message);
});

Messages can be strings or JSON objects. To send a message from the master to a specific worker, you can write a code like the on reported below:

worker.send('hello from the master');

An important point to note here is that message event callbacks are handled asynchronously. There isn’t a defined order of execution.

var cluster = require('cluster'),
    factorial = function factorial(num) {
        if(num === 1 || num === 0) return 1;
        else return num * factorial(num-1);
    };

if(cluster.isMaster) {
    var numWorkers = require('os').cpus().length;

    console.log('Master cluster setting up ' + numWorkers + ' workers...');

    for(var i = 0; i < numWorkers; i++) {
        var worker = cluster.fork();
        worker.on('message', function(message) {
            console.log(message.from + ': ' + message.type + ' ' + message.data.number + ' = ' + message.data.result);
        });
    }

    cluster.on('online', function(worker) {
        console.log('Worker ' + worker.process.pid + ' is online');
    });

    for(var wid in cluster.workers) {
        cluster.workers[wid].send({
            type: 'factorial',
            from: 'master',
            data: {
                number: Math.floor(Math.random() * 50)
            }
        });
    }

    cluster.on('exit', function(worker, code, signal) {
        console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
        console.log('Starting a new worker');
        var worker = cluster.fork();
        worker.on('message', function(message) {
            console.log(message.from + ': ' + message.type + ' ' + message.data.number + ' = ' + message.data.result);
        });
    });
} else {
    process.on('message', function(message) {
        if(message.type === 'factorial') {
            process.send({
                type:'factorial',
                from: 'Worker ' + process.pid,
                data: {
                    number: message.data.number,
                    result: factorial(message.data.number)
                }
            });
        }
    });
}

How can we achieve zero downtime with clustering?

One important result that can be achieved using workers is (almost) zero down-time servers. Within the master process, you can terminate and restart the workers one at a time, after you make changes to your application. This allows you to have older version running, while loading the new one.

To be able to restart your application while running, you have to keep two points in mind. Firstly, the master process runs the whole time, and only workers are terminated and restarted. Therefore, it’s important to keep your master process short and only in charge of managing workers.

Secondly, you need to notify the master process somehow that it needs to restart workers. There are several methods for doing this, including a user input or watching the files for changes. The latter is more efficient, but you need to identify files to watch in the master process.

One suggestion for restarting your workers is to try to shut them down safely first; then, if they did not safely terminate, forcing to kill them. You can do the former by sending a shutdown message to the worker as follows:

workers[wid].send({type: 'shutdown', from: 'master'});

And start the safe shutdown in the worker message event handler:

process.on('message', function(message) {
    if(message.type === 'shutdown') {
        process.exit(0);
    }
});

To do this for all the workers, you can use the workers property of the cluster module that keeps a reference to all the running workers. We can also wrap all the tasks in a function in the master process, which can be called whenever we want to restart all the workers.

We can get the ID of all the running workers from the workers object in the cluster module. This object keeps a reference to all the running workers and is dynamically updated when workers are terminated and restarted. First we store the ID of all the running workers in a workerIds array. This way, we avoid restarting newly forked workers.

Then, we request a safe shutdown from each worker. If after 5 seconds the worker is still running and it still exists in the workers object, we then call the kill function on the worker to force it shutdown.

var cluster = require('cluster'),
    restartWorkers = function restartWorkers() {
        var wid, workerIds = [];

        // create a copy of current running worker ids
        for(wid in cluster.workers) {
            workerIds.push(wid);
        }

        workerIds.forEach(function(wid) {
            cluster.workers[wid].send({
                text: 'shutdown',
                from: 'master'
            });

            setTimeout(function() {
                if(cluster.workers[wid]) {
                    cluster.workers[wid].kill('SIGKILL');
                }
            }, 5000);
        });
    };

if(cluster.isMaster) {
    var numWorkers = require('os').cpus().length,
        fs = require('fs'),
        i, worker;

    console.log('Master cluster setting up ' + numWorkers + ' workers...');

    for(i = 0; i < numWorkers; i++) {
        worker = cluster.fork();
        worker.on('message', function() {
            console.log('arguments', arguments);
        });
    }

    // set up listener of file changes for restarting workers
    fs.readdir('.', function(err, files) {
        files.forEach(function(file) {
            fs.watch(file, function() {
                restartWorkers();
            });
        });
    });

    cluster.on('exit', function(_worker, code, signal) {
        console.log('Worker ' + _worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
        console.log('Starting a new worker');
        worker = cluster.fork();
        worker.on('message', function() {
            console.log('arguments', arguments);
        });
    });
} else {
    process.on('message', function(message) {
        if(message.type === 'shutdown') {
            process.exit(0);
        }
    });

    console.log('Worker ' + process.pid + ' is alive!');
}
Unless otherwise stated, the content of this page is licensed under Creative Commons Attribution-ShareAlike 3.0 License