
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic SACTopic
echo "Hello World from Kafka" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic SACTopic > /dev/null
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic SACTopic --from-beginning
npm install kafkajs http express fs url body-parser
"use strict";
const port = process.env.PORT || 3000;
const server = require("http").createServer();
const express = require("express");
const fs = require('fs');
const url = require('url');
const bodyParser = require("body-parser");
const {
Kafka
} = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
const producer = kafka.producer()
const consumer = kafka.consumer({
groupId: 'test-group'
})
const topic = 'SACTopic';
var sockett;
var app = express();
app.use(bodyParser.json());
//Start the Server
server.on("request", app);
// use socket.io
var io = require('socket.io').listen(server);
// define interactions with client
io.on('connection', function(socket) {
sockett = socket;
socket.on('fr_sac', function(data) {
console.log('--from SAC: ' + data);
var sendMessage = async () => {
await producer.connect()
await producer.send({
topic: topic,
messages: [
{ key: 'key1', value: data }
],
})
await producer.disconnect()
}
sendMessage().catch(console.error);
});
});
//Start the Server
//server.on("request", app);
server.listen(port, function() {
console.info(`HTTP Server: ${server.address().port}`);
});
const listenMessage = async () => {
// Consuming
await consumer.connect()
await consumer.subscribe({
topic: topic,
fromBeginning: false
})
await consumer.run({
eachMessage: async ({
topic,
partition,
message
}) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
key: message.key.toString()
})
if(message.key.toString() === "key2") {
sockett.emit("client_data", message.value.toString());
}
},
})
}
listenMessage().catch(console.error)
node nodeserver.js
function loadthis(that, changedProperties) {
var that_ = that;
var socketid;
//Socket Connection
//******************************************
socket = io("http://localhost:3000");
socket.on('disconnect', function() {
console.log("socket disconnected: " + socketid);
UI5(changedProperties, that, "");
});
socket.on('connect', function() {
socketid = socket.id;
console.log("socket connected: " + socketid);
UI5(changedProperties, that);
});
socket.on('client_data', function(data) {
console.log('Message from server: ' + data);
UI5(changedProperties, that, data);
});
}
onButtonPress: async function(oEvent) {
var oInput = this.byId("textArea_" + widgetName);
console.log(oInput.getValue());
socket.emit("fr_sac", oInput.getValue());
}
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })
let topic = 'SACTopic'
const listenMessage = async () => {
// Consuming
await consumer.connect()
await consumer.subscribe({
topic: topic,
fromBeginning: false
})
await consumer.run({
eachMessage: async ({
topic,
partition,
message
}) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
})
}
})
}
listenMessage().catch(console.error)
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
User | Count |
---|---|
10 | |
7 | |
7 | |
7 | |
6 | |
5 | |
5 | |
4 | |
4 | |
4 |