Technology Blogs by SAP
Learn how to extend and personalize SAP applications. Follow the SAP technology blog for insights into SAP BTP, ABAP, SAP Analytics Cloud, SAP HANA, and more.
cancel
Showing results for 
Search instead for 
Did you mean: 
schardosin
Product and Topic Expert
Product and Topic Expert
3,597
Cloud Application Programming (CAP) is an excellent development framework if you are working with the SAP Business Technology Platform (SAP BTP), it provides you an easy way to create your data domain models and expose them as OData Services, which will use Hana Database as the persistence layer. All of this is done in an easy way, which requires minimal configuration.

But CAP is more than that, it has the goal to provide a framework that will allow you to consume other SAP BTP services in a seamless way, and providing you the best practices when using these services.

One of the additional capabilities of SAP CAP is to easily integrate with SAP Event Mesh, there are lots of blogs that show how to configure to be able to emit to and receive messages from SAP Event Mesh, it requires simple configuration and a bind between the SAP Event Mesh service and your application.

But this post has the goal to present when you cannot use the standard implementation from CAP to consume messages from SAP Event Mesh.

CAP by convention creates a single queue, which is in the format of CAP/0000 or {Application_name}/{4 digits of application_id}, to consume the topic you inform in the handler, handlers don't handle queues directly, you need to specify a topic on it.

messaging.on('my/em/namespace/topic', async (msg) => { ... }

If you start specifying multiple handlers to consume multiple topics, it will consume the queues as expected by subscribing all these topics to the queue CAP/0000, CAP can distinguish when it receives a message in a queue, what is the topic associated with the message, and then it delivers to the right handler.

The "problem" is when you already have a queue to be consumed, for any reason you had to create it manually, for example, to configure a dead queue or any other configuration not supported by CAP for a queue.

In the configuration file (package.json) you can set which queue must be used by each messaging client, in this format:
{
cds: {
requires: {
messaging1: {
kind: 'enterprise-messaging-shared',
queue: {
name: 'my/em/namespace/my/queue/one'
}
},
messaging2: {
kind: 'enterprise-messaging-shared',
queue: {
name: 'my/em/namespace/my/queue/two'
}
}
}
}
}


with this configuration, when you create your client, it knows what queue name it must use.

const messaging = await cds.connect.to('messaging1')

The implementation above knows it must use the already created queue my/em/namespace/my/queue/one.

Easy solution, right? Well, be cautious with that!

CAP always expects a message in a queue to have a topic associated with it, in case you start consuming queues that contain messages not associated with topics, the message will be consumed by CAP, but it will not be delivered to any handler, once handlers ALWAYS expect a topic.

A good example of these scenarios are the dead queues, once a message reaches the max retry and is sent to a dead queue, this message will not be associated with a topic in the dead queue, and by consequence, CAP will not be able to consume it. So for dead queues, expects to use the @Sisn/xb-msg-amqp-v100 manually to consume it (in case of node.js).

Hope this helps!

15 Comments
david_kunz2
Product and Topic Expert
Product and Topic Expert
0 Kudos

Hi schardosin

Thanks for this nice write up! I hope we can implement the feature to consume all messages from the queue, maybe with messaging.on('*', msg => {...}), but I'll have to think about that a bit more.

Regarding the queue name: It's {application_name}/{4 digits of application_id} which defaults to CAP/0000 if the variables are not provided. This makes sure that every application gets its own queue (shared amongst multiple instances).

Best regards,
David

schardosin
Product and Topic Expert
Product and Topic Expert
Hi david.kunz2,

thank you for your continued support. I fixed the {4 digits of application_id} in the content, thank you for calling it out.

One good thing would be to have a way to configure CAP to create the queues with the configuration for max retrial, dead queue, and max time to live, then the manual creation of the queues wouldn't be needed.

Just sharing some thoughts, thanks again for your support! 🙂
david_kunz2
Product and Topic Expert
Product and Topic Expert
Hi schardosin,

Thanks for fixing it!

You can already pass the configuration options to the queue object in the configuration, e.g.
{
cds: {
requires: {
messaging: {
kind: "enterprise-messaging-shared",
queue: {
name: "my/awesome/super/queue",
maxRedeliveryCount: 100,
deadMsgQueue: "my/awesome/super/dead/queue",
maxTtl: 100,
respectTtl: true
}
}
}
}

All options specified here are allowed.

Best regards,
David
schardosin
Product and Topic Expert
Product and Topic Expert
Nice David!

So basically we can define the queues and dead queues in the configuration file, establish the connection between them, and only the consumption from the dead queue would need manual consumption (until CAP introduces the feature to consume from queues or another way of consuming message with no topic associated to it)

I'll update the post accordingly, this is really useful information.

Thank you,

Rafael
david_kunz2
Product and Topic Expert
Product and Topic Expert
0 Kudos
Yes, exactly!
Former Member
0 Kudos
Hi schardosin ,

Thanks for a great blog.

I have a question.
A good example of these scenarios are the dead queues, once a message reaches the max retry...

How do you set max retry?

Regards,

Minh
lochner_louw
Product and Topic Expert
Product and Topic Expert
0 Kudos
Hey schardosin,

Nice article!

One thing you can add is is that you can set the format to CloudEvents spec in the config too:
{
cds: {
requires: {
messaging: {
kind: 'enterprise-messaging-shared',
format: 'cloudevents'
}
}
}

It will produce something like by filling in all the header fields for you:
{
"type": "sap.cap.salesorder.v1.SalesOrder.Created.v1",
"specversion": "1.0",
"source": "/default/sap.cap/CAPCLNT001",
"id": "0894ef45-7741-1eea-b7be-ce30f48e9a1d",
"time": "2021-08-14T06:21:52Z",
"datacontenttype": "application/json",
"data": {
"SalesOrder":"123456789"
}
}

 

Cheers

Lochner
carlonnheim
Participant
0 Kudos
Hi,

We are trying this and have a few issues. First, the queuing service is only connecting if we keep the exact name "messaging" in the configuration.
... this works
"cds": {
"requires": {
"messaging": {
"kind": "enterprise-messaging-shared",
"queue": {
"name": "default/my/queue",
"maxDeliveredUnackedMsgsPerFlow": 5,
"maxRedeliveryCount": 10,
"deadMsgQueue": "default/my/dmq",
"maxTtl": 0,
"respectTtl": true
}
}
... but just changing "messaging" to "messaging1" makes the queue not connecte
"messaging1": {
"kind": "enterprise-messaging-shared",
"queue": {
"name": "default/my/queue",
"maxDeliveredUnackedMsgsPerFlow": 5,
"maxRedeliveryCount": 10,
"deadMsgQueue": "default/my/dmq",
"maxTtl": 0,
"respectTtl": true
}
}

The service implementation is like this:
module.exports = srv => {
cds.connect.to("messaging").then(messaging => {
messaging.on("default/my/queue", async (msg) => {
const messagePayload = JSON.stringify(msg)
console.log('===> Received message : ' + messagePayload)
});
});
}

We also notice that the dead message queue does not get created. A cosmetic issue (if we create it first it gets properly connected). But with that sorted we are still not getting messages to pass over to the dead message queue. We are throwing a simple error from the handler function and the result of that is that the message keeps getting repeated (but never goes into the dmq).

Do you have any input on what we are doing wrong?

Thanks in advance!

//Carl
david_kunz2
Product and Topic Expert
Product and Topic Expert
0 Kudos

Hi onnheimc ,

The name of your service in cds.requires must be the same as the one you connect to in your custom handler.

That means:

"cds": {
"requires": {
"messaging1": {...}
}
}

in combination with

module.exports = srv => {
cds.connect.to("messaging1").then(messaging => {
messaging.on("default/my/queue", async (msg) => {
const messagePayload = JSON.stringify(msg)
console.log('===> Received message : ' + messagePayload)
});
});
}

should work.

Note: The dead msg queue is solely a feature of Event Mesh, this must be created manually before, CAP doesn't do that automatically, only for own queues.

Maybe tobias.griebe knows why the message doesn't get pushed into the dead queue after many failed attempts? (CAP delegates the queue configuration to Even Mesh).

Thanks and best regards,
David

carlonnheim
Participant
0 Kudos
Hi,

Thanks for the quick reply. This is what we have been trying, sorry I was not clear on that aspect in the first comment. With "messaging" in "cds.requires" and in "conect.to" we can see in the output that it connects, like this:
[cds] - server listening on { url: 'http://localhost:4004' }
[cds] - launched in: 6.741s
[ terminate with ^C ]

[messaging] - Create messaging artifacts
[messaging] - Create queue { queue: 'default/my/queue' }
[messaging] - Get subscriptions { queue: 'default/my/queuee' }
[messaging] - Unchanged subscriptions [ 'default/my/queue' ]

If we have "messaging1" in both places, the log stops after "terminate with ^C" and the messaging does not work.

I tried to isolate it a bit further (on version 5.4.3). I think it is related to this.
// in @sap/cds/server.js, line 47, there is a dedicated connect to 'messaging' if specified

// connect to essential framework services if required
const _init = o.in_memory && (db => cds.deploy(csn).to(db,o))
if (cds.requires.db) cds.db = await cds.connect.to ('db') .then (_init)
if (cds.requires.messaging) await cds.connect.to ('messaging') //<<<<<<<<<<<<<<<<<<<<<<
if (cds.requires.multitenancy) await cds.mtx.in (app)

// serve all services declared in models
await cds.serve (o.service,o).in (app)
await cds.emit ('served', cds.services) //> hook for listeners

// in @sap/cds/libx/_runtime/messaging/AMQPWebhookMessaging.js, line 17 there is a "startListening" hook which does the connection

cds.once('listening', () => {
this.startListening()
})

The "connect.to("messaging")" in "server.js" causes the "subscribedTopics" of the AMQPWebhookMessaging to be populated before the "listening" event occurs. If we use a different name they occur in the wrong order (subscribedTopics is not populated when the startListening method is called and nothing is connected).

Adding this to a custom server.js makes the other-named messaging instance connect early and then it works properly.
const cds = require('@sap/cds')
module.exports = async (o) => {
// Workaround to connect messaging services with a different name than "messaging"
await cds.connect.to("messaging1");
// Delegate to default server
return cds.server(o);
}

Hope this helps finding the root cause.

Thanks!

//Carl
david_kunz2
Product and Topic Expert
Product and Topic Expert
0 Kudos
Hi Carl,

Thanks for this nice writeup, yes, that explains the problem.

You're connecting to 'messaging1' too late.

I just saw that you didn't await the promise when registering the messaging handlers, this will lead to a race condition. So you need to write the following:
module.exports = srv => {
return cds.connect.to("messaging1").then(messaging => { // <--- return here
messaging.on("default/my/queue", async (msg) => {
const messagePayload = JSON.stringify(msg)
console.log('===> Received message : ' + messagePayload)
});
});
}

Otherwise, when we cannot await the srv => {...} function.

Best regards,
David
carlonnheim
Participant
0 Kudos
Perfect, thanks! That did the trick!
Tobias_Griebe
Product and Topic Expert
Product and Topic Expert
Hi david.kunz2 & onnheimc,

unfortunately the maxRedeliveryCount -> DMQ configuration is not supported when you are using WebHooks. To my knowledge the DMQ for WebHooks only evaluates TTL.

maxRedeliveryCount -> DMQ works with AMQP, but the behaviour is unpredictable with webhooks.

Best regards,
Tobias
david_kunz2
Product and Topic Expert
Product and Topic Expert
0 Kudos
Hi tobias.griebe ,

Thanks for clarification, this is good to know!

Best regards,
David
maxi1555
Contributor
0 Kudos
Hello, is there any way to consume text and not json messages from the queue?, it seems that is always expecting json messages the cap amqp client.