Technology Blogs by SAP
Learn how to extend and personalize SAP applications. Follow the SAP technology blog for insights into SAP BTP, ABAP, SAP Analytics Cloud, SAP HANA, and more.
cancel
Showing results for 
Search instead for 
Did you mean: 
Jacky_Liu
Product and Topic Expert
Product and Topic Expert
In previous blog Part I, we have prepared the on premise  kafka envirement . Today we will try build nodejs Kafka rest api proxy and use cpi to produce and consume kafka message by deployed rest api .

We can follow the following steps :

Step 1 :  deploy  CPI iflow to consume  kafka topic's msessage







Step 2 : deploy  nodejs rest api  to produce message to Kafka  and  consume kafka message and forward the message to cpi  .


code snippy :


Server.js


const express = require('express');
const bodyParser = require('body-parser');
const { Kafka, logLevel } = require('kafkajs');
const axios = require('axios');
const oauth = require('axios-oauth-client');
const cpiurl = 'https://1s4hcextension.it-cpi001-rt.cfapps.eu10.hana.ondemand.com/http/kafka/sender';

const getClientCredentials = oauth.client(axios.create(), {
url: 'https://1s4hcextension.authentication.eu10.hana.ondemand.com/oauth/token',
grant_type: 'client_credentials',
client_id: client_id_from_cpi_runtime_service_key,
client_secret: client_secret_from_cpi_runtime_service_key,
scope: ''
});

const app = express();

const kafka = new Kafka({
clientId: 'my-app',
requestTimeout: 25000,
connectionTimeout: 30000,
authenticationTimeout:30000,
retry: {
initialRetryTime: 3000,
retries: 0
},
logLevel: logLevel.INFO,
brokers: ['localhost:9092']
});

app.use(express.text())
app.use(bodyParser.json());

app.listen(4004, () => { console.log('===> Server started') })


// depoly api to produce message to kafka
app.post('/kafka/:topic', (req, res) => {
const topicv = req.params.topic;
console.log(topicv);
const kafka = new Kafka({
clientId: 'my-app',
requestTimeout: 25000,
connectionTimeout: 30000,
authenticationTimeout:30000,
retry: {
initialRetryTime: 3000,
retries: 0
},
brokers: ['localhost:9092']
})
const producer = kafka.producer();
const run = async () => {
await producer.connect();
await producer.send({
topic: topicv,
messages: [
{ value: req.body.msg.toString() },
],
});
await producer.disconnect();

res.status(200).send('message send');
}
run().catch((e)=>{
console.log(e);
res.status(500).send('error');
})
}) ;


// consume kafka message and forward the message to CPI
const consumer = kafka.consumer({ groupId: 'test-group' });

const run = async () => {
// Consuming

await consumer.connect()
await consumer.subscribe({ topic: 'dblab01', fromBeginning: true })

await consumer.run({
eachMessage: async ({ topic, partition, message }) => {

console.log({
partition,
offset: message.offset,
value: message.value.toString(),
});
getClientCredentials().then((token1)=>{
// console.log(token1.access_token);
const config = {
headers:{
"Authorization": `Bearer ${token1.access_token}`,
"Content-Type": "application/json"
}
};
const data ={
msg: message.value.toString()
};
axios.post(cpiurl,data,config).then(res=>{
console.log(res);
}).catch(e=>{
console.log(e);
});

}).catch(e=>{
console.log(e);
})

},
})
}

run().catch((e)=>{
console.log(e);
})






package.json


{
"name": "capkafka",
"version": "1.0.0",
"description": "capkafka",
"main": "server.js",
"dependencies": {
"axios": "^0.27.2",
"axios-oauth-client": "^1.4.4",
"body-parser": "latest",
"express": "^4.17.1",
"kafkajs": "latest",
"passport": "^0.4.0"
},
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "jacky liu",
"license": "ISC"
}

Start the nodejs application :

 


 

Step 3 : test the deployed rest api with postman locally .



 

Step 4 : check the message in CPI  .


 



 


 

Step 4 :  Deploy  a iflow to call the nodejs rest api to producet messge in Kafka . .I will skip this . Cpi  http or https adapter support  calling op rest api with the help of  sap cloud connector .



The end


Best regards!

Jacky Liu

 


 
2 Comments