cf create-service kafka reference <instance name> -c
'{"advertisement":"<advertisement id from the API response>"}'
public class MainVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
String consumerGroup = "DemoGroup";
String topicName = "<subaccountid>.com.sap.dsc.ac.equipment";
//refer latest documentation for the updated topic names format or refer the response of the marketplace API
Properties props = new Properties();
props.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
props.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
props.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "30000");
props.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20000");
// use consumer for interacting with Apache Kafka
KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx,
KafkaRulesConsumerFactory.kafkaConsumerCreate(consumerGroup, props));
consumer.handler(record -> {
String rec = record.value().toString();
byte[] decoded = Base64.getMimeDecoder().decode(rec.getBytes());
System.out.println("Processing key=" + record.key() + ",value=" + rec +
",partition=" + record.partition() + ",offset=" + record.offset());
try {
String result = KafkaUtils.readAvroMessageWithSchema(decoded);
System.out.println(result);
}
catch(Exception e)
{
System.out.println(e);
}
});
// or just subscribe to a single topic
consumer
.subscribe(topicName)
.onSuccess(v ->
System.out.println("subscribed")
).onFailure(cause ->
System.out.println("Could not subscribe " + cause.getMessage())
);
}
}
public static String readAvroMessageWithSchema(byte[] message) throws IOException {
StringBuilder queueMessage = new StringBuilder();
SeekableByteArrayInput input = new SeekableByteArrayInput(message);
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
DataFileReader dataFileReader = new DataFileReader(input, reader);
GenericRecord data = (GenericRecord) dataFileReader.next();
return data.toString();
}
"io.vertx:vertx-kafka-client:4.0.3"
"io.projectreactor.kafka:reactor-kafka:1.3.3"
"org.apache.avro:avro:1.10.2"
async function kConnect() {
let tokenVal;
tokenVal = await config().token();
const kafka = new Kafka({
clientId: 'node-kconsumer',
brokers: credentials.sslBroker.split(','),
ssl: {
rejectUnauthorized: false,
ca: [fs.readFileSync('./kConsumer/ssl/kafka.crt', 'utf-8')],
},
sasl: {
mechanism: 'plain', // scram-sha-256 or scram-sha-512
username: credentials.username,
password: tokenVal
},
});
const consumer = kafka.consumer({groupId: 'my-group1'})
await consumer.connect()
await consumer.subscribe({topic: '<subaccountid>.com.sap.dsc.ac.equipment',fromBeginning: true})
//Refer documentation or the advertisement API response for the topic name format
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
key: message.key.toString(),
value: message.value,
headers: message.headers,
})
console.log("JSON "+message.value.toJSON());
console.log("STRR "+message.value.toString());
avro().avroConvert(message.value.toString());
},
})
}
curl https://kafka-service-broker.cf.<landscape>.
hana.ondemand.com/certs/rootCA/current > kafka.crt
const avro = require('avro-js');const {Kafka} = require("kafkajs");
def main():
env = AppEnv()
kafka_env = env.get_service(label='kafka')
token = getToken(kafka_env)
consumer = KafkaConsumer(<subaccountid>.com.sap.dsc.ac.equipment',
bootstrap_servers=kafka_env.credentials[u'cluster'][u'brokers'].split(','),
auto_offset_reset='earliest',
enable_auto_commit=True,
client_id='py-kconsumer',
group_id='Pyconn',
security_protocol='SASL_SSL',
ssl_cafile='./ssl/kafka.crt',
sasl_mechanism='PLAIN',
sasl_plain_username=kafka_env.credentials[u'username'],
sasl_plain_password=token,
api_version=(2, 5, 1)
)
print("subscribed") for message in consumer:
print(message)
reader = DataFileReader(open("./message.avro", "rb"), DatumReader())
for data in reader:
print(data)
reader.close()
from kafka import KafkaConsumer
import avro.schema
from avro.datafile import DataFileReader,
from avro.io import DatumReader
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
User | Count |
---|---|
8 | |
8 | |
3 | |
3 | |
3 | |
2 | |
2 | |
2 | |
2 | |
2 |