Node.js Kafka Producers
7 min read
In this article I will examine two Node.js Kafka client libraries: Kaka-node and Node-rdkafka. As Kafka-node is currently the more widely used Node.js client library, I will introduce the two libraries by first providing basic implementations, and then I will address the pros and cons of adopting Node-rdkafka, the newer of the two libraries.
Kaka-node implementation:
(function() {
var config = require('../config/index');
var kafka = require('kafka-node');
var Promise = require('bluebird');
var retry = require('retry');
var client;
var producer;
var producerReady;
var closeClient = function() {
client.close();
};
var bindListeners = function() {
producer.on('error', function(err) {
console.log(err);
closeClient();
});
producer.on('SIGTERM', function() {
closeClient();
});
producerReady = new Promise(function(resolve, reject) {
producer.on('ready', function() {
resolve(producer);
});
});
};
var initializeClient = function() {
client = new kafka.Client('localhost:2181', 'kafka-broker');
producer = new kafka.HighLevelProducer(client, {});
bindListeners();
};
var messageHandler = function(err, data) {
if (err) {
console.log(err);
return;
} else {
console.log(data);
return;
};
};
var KafkaService = function() {
initializeClient();
};
KafkaService.prototype.sendMessage = function(payload) {
var operation = retry.operation({
// See https://github.com/tim-kos/node-retry for config options
});
operation.attempt(function(currentAttempt) {
producerReady.then(function(producer) {
producer.send(payload, function(err, data) {
// if (err) {
// operation.retry(err);
// return;
// }
return messageHandler(err, data);
});
});
});
};
module.exports = KafkaService;
})();
Node-rdkafka implementation:
(function() {
var kafka = require('node-rdkafka');
var Promise = require('bluebird');
var producer;
var producerReady;
var closeProducer = function() {
producer.disconnect();
};
var bindListeners = function() {
producer.on('error', function(err) {
console.log(err);
closeProducer();
});
producer.on('SIGTERM', function() {
closeProducer();
});
producer.on('delivery-report', function(report) {
// console.log(report);
});
producerReady = new Promise(function(resolve, reject) {
producer.on('ready', function() {
resolve(producer);
});
});
};
var initializeProducer = function() {
producer = new kafka.Producer({
'client.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
'compression.codec': 'gzip',
'retry.backoff.ms': 200,
'message.send.max.retries': 10,
'socket.keepalive.enable': true,
'queue.buffering.max.messages': 100000,
'queue.buffering.max.ms': 1000,
'batch.num.messages': 1000000,
'dr_cb': true
});
producer.connect({}, function(err) {
if (err) {
console.log(err);
return process.exit(1);
};
});
bindListeners();
};
var KafkaService = function() {
initializeProducer();
};
KafkaService.prototype.sendMessage = function(payload) {
producerReady.then(function(producer) {
producer.produce(payload, function(err) {
if (err) {
console.log(err);
};
});
});
};
module.exports = KafkaService;
})();
Pros:
- Code clarity: Because Node-rdkafka has built in retry logic, I do not have to wrap our send methods with convoluted retry logic.
- Quality maintenance: Node-rdkafka is maintained by Blizzard Entertainment. While Kafka-node is maintained by a large number of contributors, there are still many issues that exist.
- Better Performance: Node-rdkafka can handle a greater number of requests per second then Kafka-node. A Node.js Kafka producer with Node-rdkafka as the client library will benefit with a 5-10% cpu utilization decrease. Once cpu utilization exceeds 100% the Node.js server will begin to drop TCP connections. * See below for performance test results*. Four sets of tests were performed. You will notice that as the concurrency rate increases, so do the amount of errors produced by node-kafka compared to node-rdkafka.
Neutral:
- Each client would require a different logging strategy to trace events. In Node-rdkafka, write confirmation is not provided by the callback of the produce method, only error messages are available. In order to obtain a success confirmation, you would need to listen to the delivery report on the kafka producer object. This would require a more intricate solution to log transactional information.
- Node-rdkafka connects to the Kafka brokers directly, whereas Kafka-node uses zookeeper as an intermediary.
Cons:
- Immaturity: The project was released a couple months before the publication of this article. I think it is a safe assumption that many issues are still yet to be seen.
Performance results:
KAFKA-NODE loadtest -n 10000 -c 300 -m POST -k http://localhost:7777/produce Requests: 0 (0%), requests per second: 0, mean latency: 0 ms Requests: 3672 (37%), requests per second: 734, mean latency: 400 ms Requests: 8457 (85%), requests per second: 958, mean latency: 310 ms Target URL: http://localhost:7777/produce Max requests: 10000 Concurrency level: 300 Agent: keepalive Completed requests: 10000 Total errors: 0 Total time: 11.987781299 s Requests per second: 834 Total time: 11.987781299 s Percentage of the requests served within a certain time 50% 333 ms 90% 405 ms 95% 434 ms 99% 745 ms 100% 949 ms (longest request) NODE_RDKAFKA loadtest -n 10000 -c 300 -m POST -k http://localhost:7777/produce Requests: 0 (0%), requests per second: 0, mean latency: 0 ms Requests: 4441 (44%), requests per second: 887, mean latency: 330 ms Errors: 1, accumulated errors: 1, 0% of total requests Target URL: http://localhost:7777/produce Max requests: 10000 Concurrency level: 300 Agent: keepalive Completed requests: 10000 Total errors: 1 Total time: 10.069121181 s Requests per second: 993 Total time: 10.069121181 s Percentage of the requests served within a certain time 50% 259 ms 90% 376 ms 95% 528 ms 99% 734 ms 100% 934 ms (longest request) 100% 934 ms (longest request) -1: 1 errors KAFKA-NODE loadtest -n 10000 -c 400 -m POST -k http://localhost:7777/produce Requests: 0 (0%), requests per second: 0, mean latency: 0 ms Requests: 3643 (36%), requests per second: 728, mean latency: 530 ms Errors: 3, accumulated errors: 3, 0.1% of total requests Requests: 8024 (80%), requests per second: 877, mean latency: 460 ms Errors: 0, accumulated errors: 3, 0% of total requests Target URL: http://localhost:7777/produce Max requests: 10000 Concurrency level: 400 Agent: keepalive Completed requests: 10000 Total errors: 3 Total time: 12.408900674 s Requests per second: 806 Total time: 12.408900674 s Percentage of the requests served within a certain time 50% 446 ms 90% 627 ms 95% 756 ms 99% 1076 ms 100% 2331 ms (longest request) 100% 2331 ms (longest request) -1: 3 errors NODE_RDKAFKA loadtest -n 10000 -c 400 -m POST -k http://localhost:7777/produce Requests: 0 (0%), requests per second: 0, mean latency: 0 ms Requests: 4932 (49%), requests per second: 986, mean latency: 400 ms Errors: 2, accumulated errors: 2, 0% of total requests Target URL: http://localhost:7777/produce Max requests: 10000 Concurrency level: 400 Agent: keepalive Completed requests: 10000 Total errors: 2 Total time: 9.212834024 s Requests per second: 1085 Total time: 9.212834024 s Percentage of the requests served within a certain time 50% 320 ms 90% 389 ms 95% 713 ms 99% 1055 ms 100% 1676 ms (longest request) 100% 1676 ms (longest request) -1: 2 errors KAFKA-NODE loadtest -n 10000 -c 500 -m POST -k http://localhost:7777/produce Requests: 0 (0%), requests per second: 0, mean latency: 0 ms Requests: 4009 (40%), requests per second: 802, mean latency: 600 ms Errors: 16, accumulated errors: 16, 0.4% of total requests Requests: 9057 (91%), requests per second: 1011, mean latency: 500 ms Errors: 0, accumulated errors: 16, 0.2% of total requests Target URL: http://localhost:7777/produce Max requests: 10000 Concurrency level: 500 Agent: keepalive Completed requests: 10000 Total errors: 16 Total time: 11.023021238 s Requests per second: 907 Total time: 11.023021238 s Percentage of the requests served within a certain time 50% 472 ms 90% 611 ms 95% 921 ms 99% 1612 ms 100% 3936 ms (longest request) 100% 3936 ms (longest request) -1: 16 errors NODE_RDKAFKA loadtest -n 10000 -c 500 -m POST -k http://localhost:7777/produce Requests: 0 (0%), requests per second: 0, mean latency: 0 ms Requests: 4568 (46%), requests per second: 913, mean latency: 530 ms Errors: 6, accumulated errors: 6, 0.1% of total requests Target URL: http://localhost:7777/produce Max requests: 10000 Concurrency level: 500 Agent: keepalive Completed requests: 10000 Total errors: 6 Total time: 9.493732744999999 s Requests per second: 1053 Total time: 9.493732744999999 s Percentage of the requests served within a certain time 50% 404 ms 90% 524 ms 95% 895 ms 99% 1369 ms 100% 2745 ms (longest request) 100% 2745 ms (longest request) -1: 6 errors NODE_KAFKA loadtest -n 10000 -c 1000 -m POST -k http://localhost:7777/produce Requests: 0 (0%), requests per second: 0, mean latency: 0 ms Requests: 3376 (34%), requests per second: 675, mean latency: 1130 ms Errors: 520, accumulated errors: 520, 15.4% of total requests Requests: 8464 (85%), requests per second: 1018, mean latency: 1100 ms Errors: 0, accumulated errors: 520, 6.1% of total requests Target URL: http://localhost:7777/produce Max requests: 10000 Concurrency level: 1000 Agent: keepalive Completed requests: 10000 Total errors: 520 Total time: 11.756190981 s Requests per second: 851 Total time: 11.756190981 s Percentage of the requests served within a certain time 50% 880 ms 90% 1875 ms 95% 2634 ms 99% 6223 ms 100% 9658 ms (longest request) 100% 9658 ms (longest request) -1: 520 errors NODE_RDKAFKA loadtest -n 10000 -c 1000 -m POST -k http://localhost:7777/produce Requests: 0 (0%), requests per second: 0, mean latency: 0 ms Requests: 4070 (41%), requests per second: 814, mean latency: 920 ms Errors: 51, accumulated errors: 51, 1.3% of total requests Target URL: http://localhost:7777/produce Max requests: 10000 Concurrency level: 1000 Agent: keepalive Completed requests: 10000 Total errors: 51 Total time: 10.008185327 s Requests per second: 999 Total time: 10.008185327 s Percentage of the requests served within a certain time 50% 710 ms 90% 1414 ms 95% 2035 ms 99% 5760 ms 100% 5861 ms (longest request) 100% 5861 ms (longest request) -1: 51 errors