NodeJS to IBM Event Streams - 101
“IBM Event Streams is a high-throughput message bus built with Apache Kafka. It is optimized for event ingestion into IBM Cloud and event stream distribution between your services and applications” Taken from https://cloud.ibm.com/catalog/services/event-streams
This guide shows how to write your first NodeJS application to be a producer or consumer for IBM Event Streams. This article is based on the official sample provided here https://github.com/ibm-messaging/event-streams-samples/ i.e. all the sample code is from there.
In order to create an app to connect to IBM EventStreams as a Provider or Consumer you must first register an application in order to get an API Key, and Certificate.
Register an Application
First of all we need to register a client with IBM Event Streams to get the APIKey and SSL Certificate.
- Log into IBM Event Streams
- Click on Connect to this cluster.
- Make a note of the Bootstrap Server
- Download the PEM Certificate
- Enter a name for the Application
- Select which permissions the NodeJS application requires
- Select which topics the NodeJS application is allowed to connect to.
- Click Generate API Key
- Make a note of the API Key, if you do not do this here you will need to reset it.
Starting the NodeJS Application
This article is built around the node-rdkafka
npm library. This is a third party library provided for securely connecting to Kafka streams.
mkdir ibm-es-nodejs
npm init
npm i --save node-rdkafka
Common Code
Starting off with the global code we need for both Producer and Consuming.
var Kafka = require('node-rdkafka');
var driver_options = {
//'debug': 'all',
'metadata.broker.list': '<Bootstrap Sever address>',
'security.protocol': 'sasl_ssl',
'ssl.ca.location': '<Path to the PEM certificate downloaded>',
'sasl.mechanisms': 'PLAIN',
'sasl.username': 'token',
'sasl.password': '<API Key>',
'broker.version.fallback': '0.10.0',
'log.connection.close' : false,
'client.id': 'Es-NodeJS-101'
};
var topicName = "<Topic Name>"
Producer
Connecting as a Producer
var producerTopicOpts = {
'request.required.acks': -1,
'produce.offset.report': true
};
producer = new Kafka.Producer(producer_opts, producerTopicOpts);
producer.setPollInterval(100);
// Register listener for debug information; only invoked if debug option set in driver_options
producer.on('event.log', function(log) {
console.log(log);
});
// Register error listener
producer.on('event.error', function(err) {
console.error('Error from producer:' + JSON.stringify(err));
});
// Register delivery report listener
producer.on('delivery-report', function(err, dr) {
if (err) {
console.error('Delivery report: Failed sending message ' + dr.value);
console.error(err);
// We could retry sending the message
} else {
console.log('Message produced, partition: ' + dr.partition + ' offset: ' + dr.offset);
}
});
// Register callback invoked when producer has connected
producer.on('ready', function() {
console.log('The producer has connected.');
// request metadata for all topics
producer.getMetadata({
timeout: 10000
},
function(err, metadata) {
if (err) {
console.error('Error getting metadata: ' + JSON.stringify(err));
shutdown(-1);
} else {
console.log('Producer obtained metadata: ' + JSON.stringify(metadata));
var topicsByName = metadata.topics.filter(function(t) {
return t.name === topicName;
});
if (topicsByName.length === 0) {
console.error('ERROR - Topic ' + topicName + ' does not exist. Exiting');
shutdown(-1);
}
}
});
var counter = 0;
});
producer.connect()
Send a message
var message = new Buffer('Message I want to send');
var key = 'Key';
var partition = 0
// Short sleep for flow control in this sample app
// to make the output easily understandable
try {
producer.produce(topicName, partition, message, key);
} catch (err) {
console.error('Failed sending message ' + message);
console.error(err);
}
Consumer
var topicOpts = {
'auto.offset.reset': 'latest'
};
consumer = new Kafka.KafkaConsumer(consumer_opts, topicOpts);
// Register listener for debug information; only invoked if debug option set in driver_options
consumer.on('event.log', function(log) {
console.log(log);
});
// Register error listener
consumer.on('event.error', function(err) {
console.error('Error from consumer:' + JSON.stringify(err));
});
var consumedMessages = []
// Register callback to be invoked when consumer has connected
consumer.on('ready', function() {
console.log('The consumer has connected.');
// request metadata for one topic
consumer.getMetadata({
topic: topicName,
timeout: 10000
},
function(err, metadata) {
if (err) {
console.error('Error getting metadata: ' + JSON.stringify(err));
shutdown(-1);
} else {
console.log('Consumer obtained metadata: ' + JSON.stringify(metadata));
if (metadata.topics[0].partitions.length === 0) {
console.error('ERROR - Topic ' + topicName + ' does not exist. Exiting');
shutdown(-1);
}
}
});
consumer.subscribe([topicName]);
consumerLoop = setInterval(function () {
if (consumer.isConnected()) {
// The consume(num, cb) method can take a callback to process messages.
// In this sample code we use the ".on('data')" event listener instead,
// for illustrative purposes.
consumer.consume(10);
}
if (consumedMessages.length === 0) {
console.log('No messages consumed');
} else {
for (var i = 0; i < consumedMessages.length; i++) {
var m = consumedMessages[i];
console.log('Message consumed: topic=' + m.topic + ', partition=' + m.partition + ', offset=' + m.offset + ', key=' + m.key + ', value=' + m.value.toString());
}
consumedMessages = [];
}
}, 2000);
});
// Register a listener to process received messages
consumer.on('data', function(m) {
consumedMessages.push(m);
});]
consumer.connect()
All Code taken from https://github.com/ibm-messaging/event-streams-samples/