Node.js Kafka Producers

In this article I will examine two Node.js Kafka client libraries: Kaka-node and Node-rdkafka. As Kafka-node is currently the more widely used Node.js client library, I will introduce the two libraries by first providing basic implementations, and then I will address the pros and cons of adopting Node-rdkafka, the newer of the two libraries.

Kaka-node implementation:

(function() {
	var config = require('../config/index');
	var kafka = require('kafka-node');
	var Promise = require('bluebird');
	var retry = require('retry');
	var client;
	var producer;
	var producerReady;

	var closeClient = function() {
		client.close();
	};

	var bindListeners = function() {
		producer.on('error', function(err) {
			console.log(err);
			closeClient();
		});
		producer.on('SIGTERM', function() {
			closeClient();
		});
		producerReady = new Promise(function(resolve, reject) {
			producer.on('ready', function() {
				resolve(producer);
			});
		});
	};

	var initializeClient = function() {
		client = new kafka.Client('localhost:2181', 'kafka-broker');
		producer = new kafka.HighLevelProducer(client, {});
		bindListeners();
	};

	var messageHandler = function(err, data) {
		if (err) {
			console.log(err);
			return;
		} else {
			console.log(data);
			return;
		};
	};

	var KafkaService = function() {
		initializeClient();
	};

	KafkaService.prototype.sendMessage = function(payload) {
		var operation = retry.operation({
			// See https://github.com/tim-kos/node-retry for config options
		});
		operation.attempt(function(currentAttempt) {
			producerReady.then(function(producer) {
				producer.send(payload, function(err, data) {

					// if (err) {
					// 	operation.retry(err);
					// 	return;
					// }

					return messageHandler(err, data);
				});
			});
		});
	};
	module.exports = KafkaService;
})();

Node-rdkafka implementation:

(function() {
	var kafka = require('node-rdkafka');
	var Promise = require('bluebird');
	var producer;
	var producerReady;

	var closeProducer = function() {
		producer.disconnect();
	};

	var bindListeners = function() {
		producer.on('error', function(err) {
			console.log(err);
			closeProducer();
		});
		producer.on('SIGTERM', function() {
			closeProducer();
		});
		producer.on('delivery-report', function(report) {
			// console.log(report);
		});
		producerReady = new Promise(function(resolve, reject) {
			producer.on('ready', function() {
				resolve(producer);
			});
		});
	};

	var initializeProducer = function() {
		producer = new kafka.Producer({
		    'client.id': 'kafka',
		    'metadata.broker.list': 'localhost:9092',
		    'compression.codec': 'gzip',
		    'retry.backoff.ms': 200,
		    'message.send.max.retries': 10,
		    'socket.keepalive.enable': true,
		    'queue.buffering.max.messages': 100000,
	            'queue.buffering.max.ms': 1000,
		    'batch.num.messages': 1000000,
		    'dr_cb': true
		});

		producer.connect({}, function(err) {
		    if (err) {
		    	console.log(err);
		    	return process.exit(1);
		   	};
		});

		bindListeners();
	};

	var KafkaService = function() {
		initializeProducer();
	};

	KafkaService.prototype.sendMessage = function(payload) {
		producerReady.then(function(producer) {
			producer.produce(payload, function(err) {
				if (err) {
					console.log(err);
				};
			});
		});
	};

	module.exports = KafkaService;
})();

Pros:

  • Code clarity: Because Node-rdkafka has built in retry logic, I do not have to wrap our send methods with convoluted retry logic.
  • Quality maintenance: Node-rdkafka is maintained by Blizzard Entertainment. While Kafka-node is maintained by a large number of contributors, there are still many issues that exist.
  • Better Performance: Node-rdkafka can handle a greater number of requests per second then Kafka-node. A Node.js Kafka producer with Node-rdkafka as the client library will benefit with a 5-10% cpu utilization decrease. Once cpu utilization exceeds 100% the Node.js server will begin to drop TCP connections. * See below for performance test results*. Four sets of tests were performed. You will notice that as the concurrency rate increases, so do the amount of errors produced by node-kafka compared to node-rdkafka.

Neutral:

  • Each client would require a different logging strategy to trace events. In Node-rdkafka, write confirmation is not provided by the callback of the produce method, only error messages are available. In order to obtain a success confirmation, you would need to listen to the delivery report on the kafka producer object. This would require a more intricate solution to log transactional information.
  • Node-rdkafka connects to the Kafka brokers directly, whereas Kafka-node uses zookeeper as an intermediary.

Cons:

  • Immaturity: The project was released a couple months before the publication of this article. I think it is a safe assumption that many issues are still yet to be seen.

Performance results:

KAFKA-NODE
loadtest -n 10000 -c 300 -m POST -k http://localhost:7777/produce
Requests: 0 (0%), requests per second: 0, mean latency: 0 ms
Requests: 3672 (37%), requests per second: 734, mean latency: 400 ms
Requests: 8457 (85%), requests per second: 958, mean latency: 310 ms
Target URL:          http://localhost:7777/produce
Max requests:        10000
Concurrency level:   300
Agent:               keepalive

Completed requests:  10000
Total errors:        0
Total time:          11.987781299 s
Requests per second: 834
Total time:          11.987781299 s
Percentage of the requests served within a certain time
50%      333 ms
90%      405 ms
95%      434 ms
99%      745 ms
100%      949 ms (longest request)


NODE_RDKAFKA
loadtest -n 10000 -c 300 -m POST -k http://localhost:7777/produce
Requests: 0 (0%), requests per second: 0, mean latency: 0 ms
Requests: 4441 (44%), requests per second: 887, mean latency: 330 ms
Errors: 1, accumulated errors: 1, 0% of total requests

Target URL:          http://localhost:7777/produce
Max requests:        10000
Concurrency level:   300
Agent:               keepalive

Completed requests:  10000
Total errors:        1
Total time:          10.069121181 s
Requests per second: 993
Total time:          10.069121181 s

Percentage of the requests served within a certain time
  50%      259 ms
  90%      376 ms
  95%      528 ms
  99%      734 ms
 100%      934 ms (longest request)

 100%      934 ms (longest request)

 -1:   1 errors

KAFKA-NODE
loadtest -n 10000 -c 400 -m POST -k http://localhost:7777/produce
Requests: 0 (0%), requests per second: 0, mean latency: 0 ms
Requests: 3643 (36%), requests per second: 728, mean latency: 530 ms
Errors: 3, accumulated errors: 3, 0.1% of total requests
Requests: 8024 (80%), requests per second: 877, mean latency: 460 ms
Errors: 0, accumulated errors: 3, 0% of total requests

Target URL:          http://localhost:7777/produce
Max requests:        10000
Concurrency level:   400
Agent:               keepalive

Completed requests:  10000
Total errors:        3
Total time:          12.408900674 s
Requests per second: 806
Total time:          12.408900674 s

Percentage of the requests served within a certain time
  50%      446 ms
  90%      627 ms
  95%      756 ms
  99%      1076 ms
 100%      2331 ms (longest request)

 100%      2331 ms (longest request)

   -1:   3 errors

NODE_RDKAFKA
loadtest -n 10000 -c 400 -m POST -k http://localhost:7777/produce
Requests: 0 (0%), requests per second: 0, mean latency: 0 ms
Requests: 4932 (49%), requests per second: 986, mean latency: 400 ms
Errors: 2, accumulated errors: 2, 0% of total requests

Target URL:          http://localhost:7777/produce
Max requests:        10000
Concurrency level:   400
Agent:               keepalive

Completed requests:  10000
Total errors:        2
Total time:          9.212834024 s
Requests per second: 1085
Total time:          9.212834024 s

Percentage of the requests served within a certain time
  50%      320 ms
  90%      389 ms
  95%      713 ms
  99%      1055 ms
 100%      1676 ms (longest request)

 100%      1676 ms (longest request)

   -1:   2 errors



KAFKA-NODE
loadtest -n 10000 -c 500 -m POST -k http://localhost:7777/produce
Requests: 0 (0%), requests per second: 0, mean latency: 0 ms
Requests: 4009 (40%), requests per second: 802, mean latency: 600 ms
Errors: 16, accumulated errors: 16, 0.4% of total requests
Requests: 9057 (91%), requests per second: 1011, mean latency: 500 ms
Errors: 0, accumulated errors: 16, 0.2% of total requests

Target URL:          http://localhost:7777/produce
Max requests:        10000
Concurrency level:   500
Agent:               keepalive

Completed requests:  10000
Total errors:        16
Total time:          11.023021238 s
Requests per second: 907
Total time:          11.023021238 s

Percentage of the requests served within a certain time
  50%      472 ms
  90%      611 ms
  95%      921 ms
  99%      1612 ms
 100%      3936 ms (longest request)

 100%      3936 ms (longest request)

   -1:   16 errors

NODE_RDKAFKA
loadtest -n 10000 -c 500 -m POST -k http://localhost:7777/produce
Requests: 0 (0%), requests per second: 0, mean latency: 0 ms
Requests: 4568 (46%), requests per second: 913, mean latency: 530 ms
Errors: 6, accumulated errors: 6, 0.1% of total requests

Target URL:          http://localhost:7777/produce
Max requests:        10000
Concurrency level:   500
Agent:               keepalive

Completed requests:  10000
Total errors:        6
Total time:          9.493732744999999 s
Requests per second: 1053
Total time:          9.493732744999999 s

Percentage of the requests served within a certain time
  50%      404 ms
  90%      524 ms
  95%      895 ms
  99%      1369 ms
 100%      2745 ms (longest request)

 100%      2745 ms (longest request)

   -1:   6 errors



NODE_KAFKA
loadtest -n 10000 -c 1000 -m POST -k http://localhost:7777/produce
Requests: 0 (0%), requests per second: 0, mean latency: 0 ms
Requests: 3376 (34%), requests per second: 675, mean latency: 1130 ms
Errors: 520, accumulated errors: 520, 15.4% of total requests
Requests: 8464 (85%), requests per second: 1018, mean latency: 1100 ms
Errors: 0, accumulated errors: 520, 6.1% of total requests

Target URL:          http://localhost:7777/produce
Max requests:        10000
Concurrency level:   1000
Agent:               keepalive

Completed requests:  10000
Total errors:        520
Total time:          11.756190981 s
Requests per second: 851
Total time:          11.756190981 s

Percentage of the requests served within a certain time
  50%      880 ms
  90%      1875 ms
  95%      2634 ms
  99%      6223 ms
 100%      9658 ms (longest request)

 100%      9658 ms (longest request)

   -1:   520 errors



NODE_RDKAFKA
loadtest -n 10000 -c 1000 -m POST -k http://localhost:7777/produce
Requests: 0 (0%), requests per second: 0, mean latency: 0 ms
Requests: 4070 (41%), requests per second: 814, mean latency: 920 ms
Errors: 51, accumulated errors: 51, 1.3% of total requests

Target URL:          http://localhost:7777/produce
Max requests:        10000
Concurrency level:   1000
Agent:               keepalive

Completed requests:  10000
Total errors:        51
Total time:          10.008185327 s
Requests per second: 999
Total time:          10.008185327 s

Percentage of the requests served within a certain time
  50%      710 ms
  90%      1414 ms
  95%      2035 ms
  99%      5760 ms
 100%      5861 ms (longest request)

 100%      5861 ms (longest request)

   -1:   51 errors