Open Source Blogs
Immerse yourself in SAP open source! Discover collaborative projects, insights into the latest technologies, and best practices in open source development.
cancel
Showing results for 
Search instead for 
Did you mean: 
Heureso1
Explorer
0 Kudos

Send and Receive Kafka Messages - Quarkus

Build a java application with a REST interface. The incoming rest call is forwarded as Kafka message. A Kafka processor receives the incoming messages and answers with an outgoing message.

The use case is to meterreadings to a utilities company for billing electricity, a part of the Meter to Cash process known form SAP IS-U and Cloud for Utilities.

Please see the blog for installing the prerequisites: Setup KYMA, Kafka, Postgres, Redis

Github

Code is available from Github Repository: Github

Run in Dev or Prod

Components of quarkus_kafka application

quarkus_kafka.png

 

Postgres, DB entities

REST Interface and outgoing Kafka Message

Receveing the post and preparing outgoing Kafka Message (METERREADING_OUT).

com.heureso.boundaries.MeterReadingResource.java

@POST - POST a MeterReading MeterReading is a Hibernate Object, which is persisted. Message is mapped to MeterReadingEvent and forwarded as Kafka Message.

@GET - GET all readings in Database Returning all meterreading objects in the database.

Kafka Event

MeterReadingEvent for Kafka

PoJo: com.heureso.events.MeterReadingEvent.java

  • Deserializer JSON -> Java ( com.heureso.event.MeterReadingEventDeserializer.java )

Configure Incoming and Outgoing Message in application.properties

#outgoing message
mp.messaging.outgoing.METERREADING_OUT.connector=smallrye-kafka
mp.messaging.outgoing.METERREADING_OUT.topic=METERREADING.CREATION.CONFIRMED
mp.messaging.outgoing.METERREADING_OUT.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

#incoming message
mp.messaging.incoming.METERREADING_IN.connector=smallrye-kafka
mp.messaging.incoming.METERREADING_IN.propagate-headers=Authorization,Proxy-Authorization
mp.messaging.incoming.METERREADING_IN.topic=METERREADING.CREATION.REQUESTED
mp.messaging.incoming.METERREADING_IN.value.deserializer=com.heureso.events.MeterReadingEventDeserializer
 

Handling the Kafka Message

Handling the Kafka Message (METERREADING_IN), storing in Redis Cache.

com.heureso.boundaries.MeterReadingResource.java

 @ActivateRequestContext
    @Incoming("METERREADING_IN")
    @Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)

    public Uni<Void> process(Message<MeterReadingEvent> meterreadingmessage)
    {
        MeterReadingEvent meterreadingevent = meterreadingmessage.getPayload();

        Optional<IncomingKafkaRecordMetadata> incomingKafkaRecordMetadata = meterreadingmessage.getMetadata(IncomingKafkaRecordMetadata.class);
 

Testing REST and checking KAFKA messages

Test the REST using the http curls

After successfully transmitting a POST call, the KAFKA messages should be visible in the AKHQ interface, as described in the first blog.

Labels in this area