2024 Mar 25 8:14 AM - edited 2024 Mar 29 10:37 AM
Last week we learnt more about Event-Driven Architecture and we successfully published a CloudEvent to SAP Integration Suite, advanced event mesh (AEM). This week we will build upon what we learnt last week and we will extend our program to subscribe to a topic. By subscribing to a topic we will be able to receive messages from AEM. Before we get to the challenge, we might need to expand a bit on some concepts. Let's get started.
Week 4 - Data flow
Links to March's developer challenge:
- Week 1: https://community.sap.com/t5/application-development-discussions/march-developer-challenge-cloudeven...
- Week 2: https://community.sap.com/t5/application-development-discussions/march-developer-challenge-cloudeven...
- Week 3: https://community.sap.com/t5/application-development-discussions/march-developer-challenge-cloudeven...
- Week 4: https://community.sap.com/t5/application-development-discussions/march-developer-challenge-cloudeven...
Last week we mentioned that a topic is a means by which a publisher classifies a message. The topic tells us what type of message we will receive if we subscribe to a specific topic. In essence, it is a string that is composed of one or more levels. Each level is separated by a forward slash (/) and the levels can be anything. This is commonly known as topic-level granularity. The granularity allows for more targeted and efficient information exchange. Instead of having a single topic for all updates on a business object in a complex system (/BusinessPartner), the system can have distinct topics for different types of updates on a business object (/BusinessPartner/Created, /BusinessPartner/Updated, /BusinessPartner/Deleted). There is no specific schema/specification on how you need to structure your topic string but you do find that patterns are established within a system. Let's get familiar with the structure of a topic by "dissecting" a real-world topic. Below we can see a topic on which an SAP S/4HANA Cloud system will publish a Business Partner message.
Example: sap/S4HANAOD/E4L/ce/sap/s4/beh/businesspartner/v1/BusinessPartner/Created/v1:
In our case, we've defined levels on our topic string based on the week, SAP Community ID and action, e.g. dev-challenge/week-4/ajmaradiaga/notification.
Now, by knowing the topic on which a message type will be published, we can create a consumer program/service that subscribes to the topic directly (aka topic endpoint) and processes the messages sent to it. Generally, you can subscribe to a topic by specifying the entire topic string when establishing the connection, e.g. sap/S4HANAOD/E4L/ce/sap/s4/beh/businesspartner/v1/BusinessPartner/Created/v1. But what if we want to subscribe to all actions (Created, Updated, Deleted) that occur on a BusinessPartner object? Luckily, in the case of SAP Integration Suite, advanced event mesh we can subscribe to the topic by using wildcards (*). For example, by subscribing to the topic sap/S4HANAOD/E4L/ce/sap/s4/beh/businesspartner/v1/BusinessPartner/*/v1 I will be able to get all messages for different actions (Created, Updated, Deleted) whose version is v1. In AEM, the > character can be used at the last level of a subscription to indicate a "one or more" wildcard match for any topics, e.g. by subscribing to the topic sap/S4HANAOD/E4L/ce/sap/s4/beh/> will bring all objects that are published under that prefix, independent of type, action, and version.
In the example above we can see how the topic level granularity can allow a consumer program/service to subscribe only to the information it needs. To learn more about wildcard characters in topic subscriptions 👉: https://help.pubsub.em.services.cloud.sap/Messaging/Wildcard-Charaters-Topic-Subs.htm
If our consumer program/service subscribes to a topic, we create a topic endpoint, and we will receive all messages for that topic subscription. That said, topic endpoints last only as long as the consumer is connected. The problem here is that our consumer needs to be online in order to receive a message. If the consumer becomes unavailable then we will end up losing messages In some scenarios, this is unacceptable and we need to ensure that we receive and process all messages published. Fortunately, there is a mechanism to retain messages without the need for a consumer service to be online 100%. Then, the consumer can process the messages asynchronously or whenever it is available. Enter Queues.
Queues allow us to subscribe to one or more topics and receive messages for all topics matching their subscriptions. The messages are received by the messaging system, saved in the queue and delivered to consuming clients if they are online and connected or held in the queue until the consumer becomes available. Queues can provide exclusive access to one consumer or access to multiple consumers where messages are distributed among the consumers. The message will be in the queue until a consumer acknowledges that a message has been processed. Only then the message will be removed from the queue.
Queue
In the case of AEM, Queues can be durable or non-durable:
As mentioned before, we can subscribe to a topic directly. A topic endpoint is created after establishing a connection to AEM (the messaging service - messaging_service.connect()) and subscribing to the topic (dev-challenge/week-4/[sapcommunityid]/processed - direct_receive_service.with_subscriptions(topics).build()). This is not a polling mechanism, but a running connection is required, through which AEM will send a message to your service. If your service is not online the message will be missed. See the code sample in the section below.
To learn more about Topic endpoints and Queues 👉: https://help.pubsub.em.services.cloud.sap/Get-Started/topic-endpoints-queues.htm
👉 Your task for this week is: Extend the program you created for last week's challenge and subscribe to the dev-challenge/week-4/[sapcommunityid]/processed topic (this will create a topic endpoint). Your program should send a message to the topic dev-challenge/week-4/[sapcommunityid]/notification and expect a response on the following topic dev-challenge/week-4/[sapcommunityid]/processed. The response will contain a hash. Please share the hash value as a comment in the discussion.
Remember that we are creating a topic endpoint, which means that before sending the notification message (dev-challenge/week-4/[sapcommunityid]/notification), you need to ensure that the subscriber on your end is working and subscribed to dev-challenge/week-4/[sapcommunityid]/processed, as the processor service will handle the notification almost instantaneously. If your subscriber is not online, the message will be missed/"lost" as no one will be listening on the topic.
Week 4 - Data flow
The diagram above explains the message processing. For example, in my case, I will publish the message to the topic dev-challenge/week-4/ajmaradiaga/notification and I will receive a response on the following topic dev-challenge/week-4/ajmaradiaga/processed. Below is an example of the response payload sent by the processor service to the dev-challenge/week-4/ajmaradiaga/processed topic:
{ "id": "8dda9501-6379-4edc-91a7-6e78bec68746", "time": "2024-03-20T14:42:03.863Z", "type": "com.sap.dev-challenge.wk-4.processed.v1", "source": "https://ce-dev-challenge-wk4.cfapps.eu10.hana.ondemand.com/", "specversion": "1.0", "data": { "messageId": "a2e9ad2a-4955-4fb5-bd6c-91785548854b", "sapCommunityId": "ajmaradiaga", "hash": "285ed899bc5de8540a5ade05a673d60cbd68dc7bb2e21afd8507741a777e8393" } }
To subscribe to the dev-challenge/week-4/[sapcommunityid]/processed topic you will need to create a topic endpoint. This can be achieved by using the Solace SDK available for your particular language. Similar to week 3, there is no need to reinvent the wheel here... check out the code available in the Solace Samples org, e.g. for Node.js, Python and the tutorials available for the different programming languages. There are plenty of examples there that show you how to use the different protocols.
In my case, I ended up using Python for which there is a library available (solace-pubsubpluis) and there is also a detailed guide for how to receive messages using the Python library. I connect to AEM using Solace Messaging. See some sample code below.
from solace.messaging.messaging_service import MessagingService, RetryStrategy
from solace.messaging.receiver.inbound_message import InboundMessage
from solace.messaging.receiver.message_receiver import MessageHandler
from solace.messaging.config.transport_security_strategy import TLS
from solace.messaging.resources.topic_subscription import TopicSubscription
SOLACE_TRANSPORT_PROTOCOL = os.getenv("SOLACE_SMF_TRANSPORT_PROTOCOL")
SOLACE_HOST = os.getenv("SOLACE_SMF_HOST")
SOLACE_PORT = os.getenv("SOLACE_SMF_PORT")
SOLACE_USERNAME = os.getenv("SOLACE_SMF_USERNAME")
SOLACE_PASSWORD = os.getenv("SOLACE_SMF_PASSWORD")
...
def direct_message_consume(msg_service: MessagingService, consumer_subscription: str):
"""This method will create an receiver instance to receive str or byte array type message"""
try:
topics = [TopicSubscription.of(consumer_subscription)]
direct_receive_service = msg_service.create_direct_message_receiver_builder()
direct_receive_service = direct_receive_service.with_subscriptions(topics).build()
direct_receive_service.start()
direct_receive_service.receive_async(MessageHandlerImpl())
print(f"Subscribed to: {consumer_subscription}")
# Enter never ending loop to keep the receiver running
while True:
time.sleep(MAX_SLEEP)
finally:
direct_receive_service.terminate()
msg_service.disconnect()
...
transport_security = TLS.create().without_certificate_validation()
messaging_service = MessagingService.builder().from_properties(broker_props) \
.with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20, 3)) \
.with_transport_security_strategy(transport_security) \
.build()
messaging_service.connect()
##########################
# Consuming from a topic #
##########################
CONSUMER_SUBSCRIPTION = "dev-challenge/week-4/ajmaradiaga/processed"
print("Execute Direct Consume - String")
direct_message_consume(messaging_service, CONSUMER_SUBSCRIPTION)
To communicate with AEM, you will need to authenticate when posting a message. In the section below you can find the credentials required to connect.
For the adventurous out there.... I'm also sharing the connection details for different protocols that we can use to communicate with AEM, e.g. AMQP, Solace Messaging, Solace Web Messaging. In case you want to play around and get familiar with different protocols.
🔐Expand to view credentials 🔓
As part of the validation process for this week's challenge, there is a "processor service" that will process the messages sent. The processor program follows the same validation process as in week 3 and it will generate a response, which will be sent to the dev-challenge/week-4/[sapcommunityid]/processed topic.
You can monitor the messages processed by the processor service on this website: https://ce-dev-challenge-wk4.cfapps.eu10.hana.ondemand.com/messages-hash/webapp/index.html. Similar to last week, you will also be able to see if there are any errors in the message sent, the monitoring app will tell you what the error is, e.g. not a valid CloudEvent, sapcommunityid extension context attribute missing, or sapcommunityid doesn't match community Id specified in topic. The processor service is subscribed to the topics using a wildcard - dev-challenge/week-4/*/notification and will only generate hashes for valid payloads.
Below you can see a gif of a message processed successfully by the processor service.
2024 Mar 25 3:13 PM - edited 2024 Mar 27 6:48 AM
Interested in learning more about SAP Integration Suite, advanced event mesh? How about you join us in this in-person event that will take place on the 6th of May @ the SAP office in Madrid, Spain - https://community.sap.com/t5/sap-codejam/event-driven-integrations-with-sap-integration-suite-advanc...
2024 Mar 25 5:58 PM - edited 2024 Mar 25 6:00 PM
Here is my submission 👍
JS with solclientjs:
const solace = require("solclientjs");
const solaceConfig = {
host: "tcps://mr-connection-plh11u5eu6a.messaging.solace.cloud:55443",
vpnName: "eu-fr-devbroker",
userName: "solace-cloud-client",
passWord: "mcrtp5mps5q12lfqed5kfndbi2",
topic: "dev-challenge/week-4/r00k13d3v/processed",
};
function initSolaceSession() {
const factoryProps = new solace.SolclientFactoryProperties();
factoryProps.profile = solace.SolclientFactoryProfiles.version10;
solace.SolclientFactory.init(factoryProps);
const session = solace.SolclientFactory.createSession({
url: solaceConfig.host,
vpnName: solaceConfig.vpnName,
userName: solaceConfig.userName,
password: solaceConfig.passWord,
});
session.on(solace.SessionEventCode.UP_NOTICE, function (sessionEvent) {
console.log("Conectado a Solace.");
subscribeToTopic();
});
session.on(solace.SessionEventCode.CONNECT_FAILED_ERROR, function (sessionEvent) {
console.log("Conexión fallida a Solace.");
});
session.on(solace.SessionEventCode.DISCONNECTED, function (sessionEvent) {
console.log("Desconectado de Solace.");
});
session.on(solace.SessionEventCode.SUBSCRIPTION_ERROR, function (sessionEvent) {
console.log("Error de suscripción.");
});
session.on(solace.SessionEventCode.MESSAGE, function (message) {
console.log("Mensaje recibido:", message.getBinaryAttachment());
});
session.connect();
function subscribeToTopic() {
session.subscribe(solace.SolclientFactory.createTopicDestination(solaceConfig.topic), true, solaceConfig.topic, 10000);
}
}
initSolaceSession();
Results:
2024 Mar 25 6:00 PM - edited 2024 Mar 25 7:01 PM
Hash value: db38088e6ac9ad74447faac9b0c36667df7c4edd0af2dbe4b2be939a3ad9876a
Publish:
Consume:
2024 Mar 25 6:38 PM
Here is my submission:
const solace = require('solclientjs').debug; // Required for debugging purposes
const dotenv = require('dotenv');
// Load the environment variables from the .env file
dotenv.config();
const SOLACE_TRANSPORT_PROTOCOL = process.env.SOLACE_SMF_TRANSPORT_PROTOCOL;
const SOLACE_HOST = process.env.SOLACE_SMF_HOST;
const SOLACE_PORT = process.env.SOLACE_SMF_PORT;
const SOLACE_USERNAME = process.env.SOLACE_SMF_USERNAME;
const SOLACE_PASSWORD = process.env.SOLACE_SMF_PASSWORD;
const SOLACE_VPNNAME = process.env.SOLACE_SMF_VPNNAME;
const MAX_SLEEP = 1000; // Set your desired sleep duration
function directMessageConsume(session, consumerSubscription) {
try {
const topic = solace.SolclientFactory.createTopic(consumerSubscription);
session.subscribe(topic, true, consumerSubscription, 1000);
console.log(`Subscribed to: ${consumerSubscription}`);
} catch (error) {
console.error(`Error subscribing to topic: ${error}`);
}
}
async function connectAndConsume() {
// Initialize factory with the most recent API defaults
const factoryProps = new solace.SolclientFactoryProperties();
factoryProps.profile = solace.SolclientFactoryProfiles.version10;
solace.SolclientFactory.init(factoryProps);
const sessionProperties = new solace.SessionProperties();
sessionProperties.url = `${SOLACE_TRANSPORT_PROTOCOL}://${SOLACE_HOST}:${SOLACE_PORT}`;
sessionProperties.vpnName = SOLACE_VPNNAME;
sessionProperties.userName = SOLACE_USERNAME;
sessionProperties.password = SOLACE_PASSWORD;
const session = await solace.SolclientFactory.createSession(sessionProperties);
session.connect();
session.on(solace.SessionEventCode.UP_NOTICE, (sessionEvent) => {
console.log('Session is up');
directMessageConsume(session, 'dev-challenge/week-4/spirit2681/processed');
});
session.on(solace.SessionEventCode.CONNECT_FAILED_ERROR, (sessionEvent) => {
console.log('Connection failed to the message router: ' + sessionEvent.infoStr);
});
session.on(solace.SessionEventCode.DISCONNECTED, (sessionEvent) => {
console.log('Disconnected');
});
session.on(solace.SessionEventCode.SUBSCRIPTION_ERROR, (sessionEvent) => {
console.log('Cannot add subscription: ' + sessionEvent.correlationKey);
});
session.on(solace.SessionEventCode.SUBSCRIPTION_OK, (sessionEvent) => {
console.log('Subscription added: ' + sessionEvent.correlationKey);
});
session.on(solace.SessionEventCode.MESSAGE, (sessionEvent) => {
console.log('Message Received: ' + sessionEvent.getBinaryAttachment());
});
}
connectAndConsume();
Published message:
Received message in the consumer:
Status shown in the application:
2024 Mar 25 9:14 PM - edited 2024 Mar 25 9:15 PM
I have a problem receiving messages. The code I am using is a sample from GitHub.
Subscribing to the topic is successful, but I cannot receive messages when I publish events to `
In the monitor app I can see my messages have been processed. I have also tested with other participants' code, but the results were the same.
2024 Mar 25 10:58 PM
Hi @MioYasutake !
I've tried with the repository you mentioned from GitHub > Codespaces, and it worked for both your topic and mine. Here is the code I modified and the command I executed:
var subscriber = new TopicSubscriber(solace, 'dev-challenge/week-4/MioYasuatke/processed');
node src/basic-samples/TopicSubscriber.js tcps://mr-connection-plh11u5eu6a.messaging.solace.cloud:55443 solace-cloud-client@eu-fr-devbroker mcrtp5mps5q12lfqed5kfndbi2
I hope this has solved your problem.
Best regards!
2024 Mar 25 11:57 PM
Thank you for your prompt reply!
I discovered that I was publishing to the wrong topic - "MioYasuatke" (instead of "MioYasutake"). Correcting the topic resolved my issue.
2024 Mar 26 2:56 AM
2024 Mar 26 12:01 AM
Hash value: 9fd029aeb58df50ec143d3a6f703c20e1a9f1426958cd877ebea9b6a9f99bb6f
Response:
The code I'm using is a sample from GitHub:
How to run this code is explained by @r00k13d3v in the reply below.
2024 Mar 26 11:42 AM - edited 2024 Mar 26 11:51 AM
Yep guys. I've tried it and doesn't works 😫 done in node.js. I submited also the .env file to git if anyone wants to check it. I dunno. Appears at subscribed and listening, then i trigger the message to be consumed but nothing happens.
git repo - https://github.com/xsansebastian/codejams
environment entries here 🔐 ⬇️
I first executes week4, then week3. the message of subscription is now active corresponds to
Thanks in advance for your help 🙂
2024 Mar 26 12:04 PM
Hi @xavisanse !
I think you are calling the topic for week 3, you should call the one for week 4 (dev-challenge/week-4/${SAP_COMMUNITY_ID}/notification).
I'll leave you the link to the monitor so you can see it when you make the request.
See if that works for you.
Greetings!
2024 Mar 26 12:08 PM
absolutely true 🙂 it happens when you are lazy coding in easter holidays 🙂 and reads in diagonal. now works 🙂 kudos for you
2024 Mar 27 7:14 AM
2024 Mar 26 12:02 PM
Borrowed heavily from:
Processed through Event service:
Processed response:
[11:54:49] Received message: "▼☺{
"id": "1e1e10dd-0302-4f9b-b785-6fa84a89e43e",
"time": "2024-03-26T11:54:49.603Z",
"type": "com.sap.dev-challenge.wk-4.processed.v1",
"source": "https://ce-dev-challenge-wk4.cfapps.eu10.hana.ondemand.com/",
"specversion": "1.0",
"data": {
"messageId": "6caee628-04a1-4565-ae1e-8eb6d4370c96",
"sapCommunityId": "geek",
"hash": "e92ac42a4f66aaf0665d9254d2494efe0f6a1c9d73e25029ec1c2b680d06b465"
}
}"
2024 Mar 27 6:44 AM
2024 Mar 26 12:13 PM
awesomeCode:
const solaceConsumer = require('solclientjs');
require('dotenv').config();
const solaceHost = process.env.SOLACE_HOST;
const solaceUsername = process.env.SOLACE_USERNAME;
const solacePassword = process.env.SOLACE_PASSWORD;
const solaceVpn = process.env.SOLACE_VPN;
const solaceTopic ="dev-challenge/week-4/xavisanse/processed"
async function consumeAwesomeEvents() {
const factoryProps = new solaceConsumer.SolclientFactoryProperties();
factoryProps.profile = solaceConsumer.SolclientFactoryProfiles.version10;
solaceConsumer.SolclientFactory.init(factoryProps);
const session = solaceConsumer.SolclientFactory.createSession({
url: solaceHost,
vpnName: solaceVpn,
userName: solaceUsername,
password: solacePassword,
});
session.on(solaceConsumer.SessionEventCode.UP_NOTICE, function (sessionEvent) {
console.log('=== Successfully connected and ready to consume messages. ===');
subscribeToTopic(session, solaceTopic);
});
session.on(solaceConsumer.SessionEventCode.CONNECT_FAILED_ERROR, function (sessionEvent) {
console.log('=== Connection failed: ' + sessionEvent.infoStr + ' ===');
});
session.on(solaceConsumer.SessionEventCode.DISCONNECTED, function (sessionEvent) {
console.log('=== Disconnected. ===');
});
session.on(solaceConsumer.SessionEventCode.SUBSCRIPTION_OK, function (sessionEvent) {
console.log('=== Subscription is now active. ===');
});
session.on(solaceConsumer.SessionEventCode.SUBSCRIPTION_ERROR, function (sessionEvent) {
console.log('=== Subscription failed: ' + sessionEvent.infoStr + ' ===');
});
session.on(solaceConsumer.SessionEventCode.MESSAGE, function (message) {
console.log('Received message: ' + message.getBinaryAttachment());
});
session.connect();
function subscribeToTopic(session, topic) {
try {
const topicObject = solaceConsumer.SolclientFactory.createTopicDestination(topic);
console.log(`Subscribing to topic: ${topic}`);
session.subscribe(topicObject, true, topic, 10000);
} catch (error) {
console.error("Error subscribing to topic: " + error.toString());
}
}
}
consumeAwesomeEvents();
hashValue = 8a48e62638474227a18d5e4cf0640b22486a513f0fd0e17c95f9d191bcea3ad1
Console Output:
2024 Mar 26 2:56 PM
Hello @ajmaradiaga !
Thanks a lot for this great opportunity to handle those CE!
Here is my submission for this last week:
The hash code is:
2024 Mar 26 7:08 PM
"hash": "26105dec098dd4ee5410f3ba27e7f755f23551851888fc76f36364557d461b58"
Publish:
Consume:
Hola @ajmaradiaga, thank you for this challenge! I'm not a pro-developer and it's been difficult for me, but by going through all the documentation, I've been able to do it 😃!!! Gracias por informar del evento en Madrid!
2024 Mar 27 6:47 AM - edited 2024 Mar 27 6:47 AM
@emiliocampo, gracias por participar en el challenge! En el evento vamos a familiarizarnos con AEM y tambien habra algo de codigo :-). Nos vemos el 6 de Mayo?
2024 Mar 27 7:00 AM - edited 2024 Apr 01 4:24 AM
All valid submissions for week 4 so far...
2024 Mar 27 3:17 PM
2024 Mar 27 6:48 PM
Hash - 87a0351c2db1787a35fd16ea16149611f282caba1283e28f3774b0dd1754ac8f
2024 Mar 28 3:32 AM
This one was little complex over the previous weeks. After playing around with https://tutorials.solace.dev/nodejs/publish-subscribe/ , i finally looked at the topicpublisher.js and topicsubscriber.js on github. I was a breeze from then. I must admit, I am learning node.js and writing good code has been a challenge. Time to learn from other's code samples.
hashcode: 758935ea-3f3b-4cd6-8fc7-8636580a10c6
Source code is too long.
2024 Mar 28 6:05 AM - edited 2024 Mar 28 6:06 AM
2024 Mar 29 7:25 PM
2024 Apr 05 10:11 AM
Hi @ajmaradiaga , Can I still publish, consume messages in this challenge? Monitoring app is not working and my consumer app is not working either. I don't know if I have an error in my solution or if it is already too late.
2024 Apr 05 11:12 AM
Hi @tobiasz_h , thanks for your interest in the challenge. Unfortunately I stopped the monitoring services because the challenge finished. That said, I've started the monitoring service again and will keep it running for a couple of days so that you can complete the exercise. Fortunately, the messages where in a queue and they have been processed (see screenshot below). The beauty of asynchronous processing.
Regards,
.A
2024 Apr 05 12:10 PM
Thank you @ajmaradiaga for quick answer and starting services and thank you for great challenge.
Everything worked fine.
2024 Apr 05 12:18 PM
2024 Apr 05 12:15 PM
Hello,
I'm a little late but better late than never.
Hash value: E2A777F0BB61BEB847FC9874109FE97A6DC13232CCFEC3C8DD78F1D80BD332DB