Sap cloud integration has kafka adapter , but currently the kafka adapter does not support calling kafka on premise through SAP Cloud connector . We can refer to note
316484 for this . There are 2 options for resolving this currently :
1, Expost the kafka on premise to public web, then CPI kafka adapter can connect to it .
2, Build rest api proxy for kafka , then CPI can produce or consume message on kafka op by using CPI http adapter with the help of sap cloud connector .
Today I want try to investigate option 2 . There are existing open source kafka rest proxies, but maybe customer has some concern to use them . In this blog I will try to develope kafka rest api with nodejs to make it callable from CPI through sap cloud connector . In this part I , I will prepare kafka envirement .
To build the scenario, first let me use docker to create kafka service on my computer . Of course I have installed docker on my laptop . The following is the steps .
Step 1 : pull docker image for zookeeper and kafka
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
Step 2 : Create and start zookeeper container
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
Step 3 : Create and start kafka container and connect kafka to zookeeper container in step 2
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=localhost --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka
Step 4 : Enter kafka container
docker ps
docker exec -it 664dcfafd35c /bin/bash
Step 5 : create and check topic in kafka container
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic dblab01
kafka-topics.sh --list --zookeeper zookeeper:2181
Step 6 : start producer and produce data in kafka container
kafka-console-producer.sh --broker-list localhost:9092 --topic dblab01
Step 7 : start consumer and consume data in kafka container
Step 8 : start server.js to to test kafka locally .
The following is the code .
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
requestTimeout: 25000,
connectionTimeout: 30000,
authenticationTimeout:30000,
retry: {
initialRetryTime: 3000,
retries: 0
},
brokers: ['localhost:9092']
})
const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })
const run = async () => {
// Consuming
await consumer.connect()
await consumer.subscribe({ topic: 'dblab01', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
// await new Promise(r=>setTimeout(r,3000))
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
})
},
})
}
run().catch((e)=>{
debugger
})
setInterval(async ()=>{
await producer.connect()
await producer.send({
topic: 'dblab01',
messages: [
{ value: 'Hello KafkaJS user!' },
],
})
},1000)
npm install kafkajs
node server.js
to be continue in
Part II .
Best regards!
Jacky Liu