This blog describes options for configuring asynchronous message processing using the new AMQP adapter, which is available for customers to connect to messaging systems using the AMQP (Advanced Message Queuing Protocol) protocol. It describes the configuration, prerequisites and limits. The AMQP adapter is available for SAP Cloud Integration customers with the 08-December-2019 release.
Connecting to Messaging Systems Using the AMQP Adapter
In many integration scenarios messages or events have to be exchanged between applications or systems via messaging systems. With the new AMQP adapter
SAP Cloud Integration can be used as provider or consumer of such messages or events. Cloud Integration can connect to messaging systems using the AMQP (Advanced Message Queuing Protocol) protocol version 1.0 and consume messages or events using the AMQP sender adapter or store messages or events in the message broker using the AMQP receiver adapter. Follow the steps described below to write to or consume from queues and/or topics in the message broker.
Prerequisite: Messaging System Setup
To be able to connect to queues or topics in the message broker, you have to create queues and/or topics in the message broker. This needs to be done on the messaging system with the configuration tools provided by the messaging system
.
In some messaging systems you need to configure a
Lock Duration to make sure the message is not consumed multiple times. This timeout must be longer than the expected processing time of the message, otherwise this would lead to duplicate messages.
Note that the monitoring of queues, topics and messages in the queues or topics can only be executed by using tools provided by the messaging system provider. These monitors are not integrated into Cloud Integration. In SAP Cloud Platform Integration you can monitor the integration flows using the AMQP adapter and the messages send to or consumed from the messaging system. You can find more details about those options in this blog in section
'Monitoring'.
AMQP Receiver Adapter
The AMQP receiver adapter can be used to send messages from Cloud Integration to queues or topics in an external messaging system. A small sample scenario with a HTTP sender adapter and the AMQP receiver adapter is shown in the picture below. A message is received by the HTTP sender adapter, some headers are defined and conversions in a groovy script are done, the message is then sent to the messaging system via the AMQP adapter:
In the AMQP receiver channel the following configurations have to be made to connect to the messaging system. Note that these settings are very specific to the connected messaging system, some sample settings are listed below in the messaging system specific chapters.
- Protocol:
- Transport Protocol: Select the protocol the messaging system supports.
- Connection:
- Host: Specify the hostname of the messaging system.
- Port: Specify the port of the messaging system.
- Proxy Type: Select if you want to connect via Cloud Connector (On-Premise) or directly via the Internet, this configuration option is available with version 1.1 of the adapter (available with the January 2020 update). See blog How to connect to an on-premise AMQP server via Cloud Connector for more details.
- Path: For WebSocket specify the access path of the messaging system.
- Connect with TLS: Select if TLS has to be used for the connection.
- Authentication: Select the authentication method the messaging system supports.
- SASL (Simple Authentication and Security Layer)
- OAuth2 Client Credentials
- None
- Credential Name: Select the alias of the deployed user credentials. You need to deploy the respective key name and key as user credentials in the security material section.
- Processing:
- Destination Type: Specify if messages are to be sent to queues or topics in the messaging system.
- Destination Name: Specify the name of the queue or topic. This value can be dynamically defined using header: ${header.queueabc} or property: ${property.queueabc}.
- Expiration Period: Specify the TTL (Time to Live) for the message. If nothing is specified, the setting for the queue or topic subscription in the messaging system applies. How the defined TTL is interpreted by the messaging system depends on the messaging system.
- Delivery: Specify if the messaging system has to make sure that the message is not lost, even in case of unexpected terminations.
- Persistent
- Non-Persistent
AMQP Sender Adapter
The AMQP sender adapter can be used to consume messages in Cloud Integration from queues or topic subscriptions in an external messaging system. A small sample scenario with the AMQP sender adapter and a SOAP receiver adapter is shown in the below picture. A message is consumed by the AMQP sender adapter from the messaging system, some conversions in a groovy script are executed, and then the message is sent to the receiver via the SOAP adapter:
In the AMQP sender channel the following configurations need to be done, sample settings below in the messaging system specific chapters:
- Protocol:
- Transport Protocol: Select the protocol the messaging system supports.
- Connection:
- Host: Specify the hostname of the messaging system.
- Port: Specify the port of the messaging system.
- Proxy Type: Select if you want to connect via Cloud Connector (On-Premise) or directly via the Internet, this configuration option is available with version 1.1 of the adapter (available with the January 2020 update). See blog How to connect to an on-premise AMQP server via Cloud Connector for more details.
- Path: For WebSocket specify the access path of the messaging system.
- Connect with TLS: Select if TLS has to be used for the connection.
- Authentication: Select the authentication method the messaging system supports.
- SASL (Simple Authentication and Security Layer)
- OAuth2 Client Credentials
- None
- Credential Name: Select the alias of the deployed user credentials. You need to deploy the respective key name and key as user credentials in the security material section.
- Processing:
- Queue Name: Specify the name of the queue or topic subscription to be consumed from. Note that topics are not supported in the sender adapter, only queues and topic subscriptions.
- Number of Concurrent Processes: Specify the number of processes used for parallel message processing. Note, that the processes are started from each worker node.
- Max. Number of Prefetched Messages: Specify the maximum number of messages to be prefetched by a worker. The default value is set to 5, which means that if there are more than 5 messages in the queue, the first worker fetches 5 messages at once and processes them one after the other; the second worker fetches the next 5 and so on. The allowed value range is 1 to 100. The performance may be increased with this prefetch feature because it reduces the time for additional communications. Consider the following:
- If the processing time of the integration flow is longer than only some seconds, consider a lower value for prefetched messages to allow an optimal distribution between the worker nodes. For very long running integration flows (longer than one minute) you may even consider 1 as value for prefetched messages.
- If the processing time of the integration flow is only some milliseconds you may increase the number of prefetched messages to 10 or even higher. But keep the lock timeout in mind (next point).
- Important: You need to keep the lock timeout of the messaging system in mind because the timeout starts with the processing of the first message in the prefetch package. If the package of the prefetched messages takes longer to process than what you’ve specified for the lock timeout, this could lead to duplicate messages!
- For older versions of the AMQP channel this configuration option is not available. In this case, the default value of 5 messages is used.
- The Retry Details section will be available with the 10th-May 2020 update.
- Max. Number of Retries: Specify the maximum number of retries that shall be executed in case there is an error during processing. Afterwards a different delivery status will be sent to the message broker (see next field).
- Delivery Status After Max. Retries: Select the delivery status that shall be sent to the message broker after the specified retries were executed and the message processing still fails. The following two values are available: REJECTED and MODIFIED_FAILED_UNDELIVERABLE. The message broker can then handle such messages differently.
Retry Configuration
If an error occurs during the processing of the consumed message in Cloud Integration, the message is not removed from the messaging system, but is retried again immediately as long as the maximum number of configured retries is reached. In case the messaging system does not support a different handling of the message when a different delivery status is sent, the retry may even go on forever.
There are different options available to handle errors during processing and configure the retry for messages consumed via AMQP:
- Use Retry configuration in AMQP sender adapter as described above (available with 10th May 2020 update)
- Retry Configuration based on JMSRedelivered/JMSXDeliveryCount headers in Exception Sub-Process
- Catch Error and configure alternate processing in Exception Sub-Process
If the used messaging system supports dead-lettering and delayed processing based on the delivery status as described for the AMQP sender adapter, then this is the recommended configuration option. If the messaging system does not support dead-lettering or you want to do a more specific handling, like sending a notification to an administrator after a certain number of retries you can use the configuration based on the two headers
JMSRedelivered and
JMSXDeliveryCount. The third option is the only option if the messaging system does neither support dead-lettering nor the headers.
Retry Configuration in AMQP Sender Adapter
If supported by the messaging system use the retry configuration in the AMQP sender adapter as shortly mentioned above.
The
Retry Details section in the AMQP sender adapter will be available with the 10th-May 2020 update.
- Max. Number of Retries: Specify the maximum number of retries that shall be executed in case there is an error during processing. Afterwards a different delivery status will be sent to the message broker (see next field).
- Delivery Status After Max. Retries: Select the delivery status that shall be sent to the message broker after the specified retries were executed and the message processing still fails. The following two values are available: REJECTED and MODIFIED_FAILED_UNDELIVERABLE. The message broker can then handle such messages differently.
Most message brokers put messages with delivery status REJECTED into a dead letter queue, but for several message brokers a specific configuration in the message broker is required as well to achieve this. Some message brokers do not react on the delivery status at all. Check out the configuration options of the used message broker. Some specifics are mentioned below in the section about different tested message brokers.
Executing Retries or Not?
Answering this question is not as simple as you might expect. Considering the details of the retry mechanism, both options have advantages and disadvantages.
The AMQP sender adapter will run up to
Max. Number of Retries before sending the configured
Delivery Status After Max. Retries to the broker. Then it depends on the broker and its configuration how it reacts to this delivery status. Usually, the broker either dead-letters the message or it resends the message to the client.
- If the AMQP sender adapter is configured not to execute any retries (Max. Number of Retries = 0), a to-be-redelivered message will be filtered out before the integration flow even starts to be executed. No MPL will be visible for the filtering mechanism, as not a single modeled step is even executed in the first place. This is all fine, if the broker dead-letters the message as a consequence. But this becomes a problem, if the broker simply resends the message anyway (although the AMQP sender adapter signaled to the broker that it cannot (or does not wish to) re-execute this message).
- If the AMQP sender adapter is configured to execute n retries, it will attempt up to n retries and finally sends back the Delivery Status After Max. Retries in case all attempts were unsuccessful. If the broker chooses to send the same message again to the client, the client will (once again) try to process the message with up to n subsequent retries before sending back the Delivery Status After Max. Retries - again.
Let us consider the two worst-case scenarios:
Scenario 1:
The integration flow is unable to process the message successfully without user-interaction (or change). That means when re-executing the same message, processing will continue to fail. The AMQP sender adapter is configured
not to run any retries. However, the broker resends the message over and over independent of the configured
Delivery Status After Max. Retries. The retry attempts will not be visible (i.e. via MPL). But since the broker continues to send this message, the client has to filter it out and this consumes resources, mainly CPU. And even worse, it blocks the processing of subsequent messages from that queue in case of exclusive queues or message groups .
Exposing these infinite attempts in MPLs would make this transparent while putting the health of your tenant's database at risk. Fast, infinite retry attempts can flood the database with MPLs, potentially long before you even take notice of this. Hence, Cloud Integration decided to sacrifice transparency for the sake of the healthiness of the database and your tenant.
Scenario 2:
The integration flow is unable to process the message successfully without user-interaction (or change). That means when re-executing the same message, processing will continue to fail. The AMQP sender adapter is configured to run n retries (
Max. Number of Retries > 0). The adapter will attempt up to n retries and finally sends back the
Delivery Status After Max. Retries in case all attempts were unsuccessful. If the broker chooses to send the same message again to the client, the client will (once again) try to process the message with up to n subsequent retries before sending back the
Delivery Status After Max. Retries - again. Since the broker resends the same message indefinitely, this results in infinite retry attempts. And since the integration flow allows retry attempts, it is not immediately filtered out. Instead, it is actually being executed and you will see an MPL for this (in RETRY state).
If this goes on and on without anybody taking notice and acting on this situation, the database can be flooded and the tenant becomes unstable.
Ways Out of the Misery
- Choose a broker that supports dead-lettering and make sure, it is configured to do so based on the Delivery Status After Max. Retries.
- When allowing retries, it is recommended to catch and handle exceptions in an exception subprocess. If you re-raise an exception (i.e. Error end event), the process will fail and the message will be retried. It is advisable to have an automated way of handling the exception successfully (i.e. End event) to avoid infinite retry attempts during a time frame the tenant is not actively observed.
Retry Configuration based on JMSRedelivered/JMSXDeliveryCount
There is no option to configure a delay in retry processing in the AMQP adapter because this is not supported by the AMQP protocol. One option to delay the retry in error cases is to configure this retry handling in the integration flow using dead letter queues in the messaging system (if this is supported by the messaging system). This can be configured using two headers set by most of the messaging systems:
- JMSRedelivered: This header specifies, if the message was redelivered. Value 'false' means that this message was delivered for the first time, value 'true' tells you that this message was already tried to be delivered before.
- JMSXDeliveryCount: This header specifies the number of processings for this message. Value 1 means that it is the first attempt to process this message, value 2 means that it is the second attempt, meaning it is the first retry.
Note that these headers are not set or not reliably set by some messaging systems. See below in the messaging system specific section for more details.
Configure Delayed Retry Processing
The first step to configure a delayed retry would be to add a
Router step and another
Receiver to the integration flow consuming the message:
In the
Router the route to the second receiver has to be configured in such a way that messages which are consumed for the second time are sent to another queue in the messaging system. The C
ondition in the
Router can be defined by using one of the above-mentioned parameters. In the described sample I use the 'JMSXDeliveryCount' header and define the
Non-XML expression as
${header.JMSXDeliveryCount} > '1'. If the used message broker does not support this header you can use the 'JMSRedelivered' header and define the expression as
${header.JMSRedelivered} = 'true'.
In the AMQP receiver channel a different queue is configured. This queue has to be created in the messaging system. From this queue no integration flow consumes messages, it is only used as storage for the time to the next retry. In my sample I use the queue
'retryqueue'.
In the parameters for this queue, it needs to be specified that messages are moved to a dead letter queue after a specific time interval (time to live). This configuration has to be made in the messaging system by using the tools provided by the messaging system.
Note: If the used message broker supports configuring any queue as dead letter queue, you can directly move the message back to the original processing queue and do not need to do this via explicit process.
From this dead letter queue we can then consume the message and put it back to the original processing queue. This is done in a separate integration process:
The AMQP sender adapter needs to be configured to consume messages from the dead letter queue of the retry queue or the queue defined as dead letter queue.
In case Microsoft Azure Service Bus is used as messaging system, the dead letter queue can be accessed by using the following syntax: '<queue>/$deadletterqueue': This may be different for other messaging systems.
Make sure you select the flag ‘
Consume Expired Messages’. This is required because the messages in the dead letter queue are expired because the expiration time is used as the delay time.
The process writes the message back to the original processing queue via an AMQP receiver adapter so that it can be consumed from there again.
In addition to these configurations, you need to make sure the header JMSXDeliveryCount or JMSRedelivered is retrieved by the integration flow. For this we configure the header as
Allowed Header in the
Runtime Configuration of the integration flow:
Now we can deploy the integration flow and test the processing:
As soon as a message is available in the processing queue in the messaging system, the message is consumed by Cloud Integration. If there is an error during processing, the processing fails and an immediate retry is executed. This retry stores the message in the separate retry queue, where the message is parked for the amount of time configured in the messaging system. As the message is not consumed from this retry queue it is moved to the dead letter queue after the configured time to live.
As soon as it is available in the dead letter queue the second process consumes the message and stores it to the original processing queue. From there it is consumed and tried to be processed again. The same retry procedure would be triggered if again an error in processing occurs.
You see, with this we are able to define a kind of retry interval, the message would only be retried after a defined time interval. What is not possible with this configuration is to stop a message after a certain number of retries. How this can be done, we see in the next section.
Stop Retry Processing after x Retries
First, we need to introduce a counter that counts the numbers of executed retries. For this we cannot use the
JMSXDeliveryCount header set by the messaging system because we move the messages between different queues and so the
JMSXDeliveryCount is always reset. We define our own counter in the process that writes the message back to the processing queue. We do this in a small groovy script:
In the script we set a header
RetryCount.
Note that headers with prefix JMS are not allowed, such headers are not forwarded by the messaging systems.
import com.sap.gateway.ip.core.customdev.util.Message;
import java.util.HashMap;
def Message processData(Message message) {
// Get Headers
map = message.getHeaders();
retry = map.get("RetryCount");
if (retry == null) {
retry = 1;
}
else {
retry = retry + 1;
}
message.setHeader("RetryCount", retry);
return message;
}
Furthermore, we extend the configuration in the main process that consumes messages from the processing queue by adding an additional
Router step:
In this
Router step we evaluate the
RetryCount header, and if it is greater or equal 5 we do some alternative handling and stop the retries. In my sample I simply write the message to a data store, but any other configuration could be done as well.
As a last step we need to add the
RetryCount header in the
Allowed Headers in the
Runtime Configuration of the integration flow:
Now we can deploy the updated integration flow and test the processing:
As soon as a message is available in the processing queue in the messaging system, the message is consumed by Cloud Integration. If there is an error during processing, the processing fails and an immediate retry is executed. This retry stores the message in the separate retry queue, where the message is parked for the amount of time configured in the messaging system. As the message is not consumed from this retry queue it is moved to the dead letter queue after the configured time to live.
As soon as it is available in the dead letter queue the second process consumes the message, sets the
RetryCount header and stores it to the original processing queue. From there it is consumed and as the numbers of executed retries are checked. The message is only retried 4 times; during the 5th retry, the message is written to the data store and the processing stops.
Note, that when the message broker supports defining the original queue as dead letter queue directly and you skip this additional process, you can increase the RetryCount in the script directly in the main process instead before writing the message to the retry queue.
Configure Exception Handling for Messaging Systems not Supporting the Headers
As mentioned already, not all messaging systems support the two headers to configure explicit retry handling. In this case the only option to avoid endless retries that lead to a huge amount of message processing log, is to configure an exception handling in case of an error:
Configure an
Exception Subprocess in your integration flow to catch the error. Define an alternate processing for the message in the exception sub-process. In the above case the message is stored to a data store, and afterwards an email is sent to the administrator to analyze the error and process the message differently. In case the message is stored to a data store, like in the above sample, a different integration flow could consume and process these messages. You can for sure also define a different error handling, like storing the message to a different queue (kind of dead letter queue) on the messaging system.
It is important that you end the
Exception Subprocess with a
Message End event and not with an
Error End event to make sure the message is marked as
Completed and is removed from the original queue on the message broker to prevent endless retries.
Monitoring
There are different options to monitor the integration flows, the connection to the messaging system and the processed messages.
AMQP Connection Test
The
Connectivity Test is available in Operations View in Web UI, in section
Manage Security Material. Selecting the
Connectivity Test tile from
Overview Page opens the test tool offering tests for different protocols. To test the communication to the messaging system, the
AMQP option is to be selected.
Integration Content Monitor
After the integration flow is deployed it can be found in the
Manage Integration Content monitor. There you can check the status of the integration flow.
Poll Status
In the
Status Details area you may find the status and the details about the current consumption:
If there is an error when consuming messages via the AMQP sender adapter the error would be shown here for the respective integration flow. In the
Polling Information the status of the consumption is shown as
Failed:
Message Processing Monitor
Processed messages can be monitored in the Message Processing monitor:
In case of an error when sending the message to the messaging system using the AMQP receiver the errors would appear in this monitor:
Usually, during message processing a dedicated message processing log (mpl) is written for each message (JMS Message ID) consumed from the AMQP server.
Note, that in case the same integration flow consumes from multiple queues subscribed to the same topic those messages may be contained in the same mpl because the original JMS message id is used for the creation of the mpl. Because of this, the recommendation when consuming from different queues is to use different integration flows, each for a dedicated queue.
Messaging System Monitor
Queues, topics and messages in the messaging system can be monitored using the monitoring tools of the messaging system.
Mapping of AMQP Headers, Properties and Annotations to Headers in Cloud Integration
During transforming a AMQP message to a message in Cloud Integration, or vise versa, the following AMQP headers, properties and annotations are mapped to headers in Cloud Integration:
AMQP Type AMQP Name Header Name in Cloud Integration
header durable JMSDeliveryMode
header priority JMSPriority
header delivery-count JMSXDeliveryCount
annotation x-opt-delivery-time JMSDeliveryTime
property message-id JMSMessageID
property user-id JMSXUserID
property to JMSDestination
property subject JMSType
property reply-to JMSReplyTo
property correlation-id JMSCorrellationID
property absolute-expiry-time JMSExpiration
property creation-time JMSTimestamp
property group-id JMSXGroupID
property group-sequence JMSXGroupSeq
AMQP application properties are transferred to Camel headers in Cloud Integration runtime with the same name and the other way round. Important is that the application properties are not allowed to start with JMS, else it would not be forwarded into a Camel header.
Exclusive Processing
In general there are two ways to provide exclusive (in-order) processing using an AMQP broker:
- Exclusive queues provide a capability that will let only one consumer consume from a given queue at a time. If that particular consumer is stopped or crashes one of the other consumers takes over consuming the messages. This means this also works if the Cloud Integration tenant has multiple worker nodes.
- Message groups are an optional feature of the AMQP 1.0 protocol. Groups are realized by using the AMQP property group-id. If an AMQP producer sets the group-id property at a message and the broker supports message grouping for that queue, all messages from the same message group will be processed one after the other. In our case, if the producer is a Cloud Integration system, you can set the AMQP group-id property by setting the camel header JMSXGroupID (see in the table above).
Note that not all messaging systems support exclusive queues and/or message groups and these feature may have to be activated explicitly for a queue in the messaging system configuration. See below in the messaging system specific section for more details.
Connections required for Processing
For the AMQP adapter you require a permanent minimum of one connections per worker node for consuming messages from a queue because each worker node needs a connection to the message broker. For storing a message to the messaging system the numbers of worker nodes is the maximum of required connections.
In total this means the numbers of connections per queue is:
number of worker nodes + parallel requests storing messages to the message broker (maximum: number of worker nodes)
This means you would need a maximum of 4 connections for one queue for a cluster with two worker nodes.
Supported External Messaging Systems
There are multiple providers of messaging system implementations available. Most of them support AMQP 1.0 and can be connected using the AMQP adapters. The most used messaging systems were tested by SAP Cloud Integration. Note the required settings, configuration options and limitations below.
SAP Enterprise Messaging/Event Mesh
The following configuration options are applicable for the SAP Enterprise Messaging:
- Connectivity Options: WebSocket over TLS (Port 443 with Path: "/protocols/amqp10ws")
- Authentication Option: OAuth2 Client Credentials
- Dead Letter Queue Handling based on Delivery Status: Yes, Delivery status REJECTED puts the message to a DLQ after the configured redeliveries. Delivery status MODIFIED_FAILED_UNDELIVERABLE must not be used as it is not supported and can lead to errors during processing. Note, that the dead letter queue and the maximum redeliveries have to be configured in the message broker as well.
- JMSRedelivered/JMSXDeliveryCount supported: JMSRedeliverd: Yes; JMSXDeliveryCount: No
- Exclusive Queues supported: Yes
- Message Groups supported: No
- Queues supported: Yes, queues need to be accessed using the following syntax in AMQP adapter channel: "queue:<queue name>"
- Topics supported: Yes, topics need to be accessed using the following syntax in AMQP adapter channel: "topic:<topic name>"
- Topic subscriptions supported: Yes, use queues to subscribe to topics and use the queue in AMQP sender.
Size Limitations from SAP Enterprise Messaging
When using Enterprise Messaging consider the following important restrictions:
- Maximum Number of Connections supported per Service Instance: 50
- Maximum Numbers of Connections combined for all Service Instances and subscription (per subaccount): 1000
Taking the above connection requirements in consideration, when using Enterprise Messaging with the AMQP adapter this means that you should use one service instance per queue.
For more details check out the
documentation.
SAP Integration Suite, Advanced Event Mesh
The following configuration options are applicable for the Advanced event mesh message broker:
- Connectivity Options:
- TCP (Port as defined in the event mesh broker)
- TLS over TCP (Port as defined in the event mesh broker)
- Authentication Option: SASL
- Dead Letter Queue Handling based on Delivery Status: Supported, but the dead letter queue and the maximum redeliveries have to be configured in the message broker as well. Delivery status REJECTED puts the message to the dead letter queue. Delivery status MODIFIED_FAILED_UNDELIVERABLE must not be used as it is not supported and can lead to errors during processing.
- JMSRedelivered/JMSXDeliveryCount supported: JMSRedeliverd: Yes; JMSXDeliveryCount: No
- Exclusive Queues supported: Yes
- Message Groups supported: No
- Queues supported: Yes
- Topics supported: Yes
- Topic subscriptions supported: Yes, use queues to subscribe to topics and use the queue in AMQP sender.
Microsoft Azure Service Bus
The following configuration options are applicable for the Microsoft Azure Service Bus:
- Connectivity Options: TLS over TCP (Port 5671)
- Authentication Option: SASL
- Dead Letter Queue Handling based on Delivery Status: Supported, delivery status REJECTED puts the message to a dead letter queue, delivery status MODIFIED_FAILED_UNDELIVERABLE gives the message a deferred status in the messaging system (Message Deferral). Deferred messages stay in the queue, but are not consumed anymore.
- JMSRedelivered/JMSXDeliveryCount supported: Yes
- Exclusive Queues supported: No
- Message Groups supported: No
- Queues supported: Yes
- Topics supported: Yes
- Topic subscriptions supported: Yes, topic subscriptions can be accessed as separate, queue-like entities using the AMQP sender adapter: "[topic name]/subscriptions/[subscription name]". Multiple subscriptions for a single topic can exist. It is not possible to send to topic subscriptions using the AMQP receiver adapter.
Solace PubSub+
The following configuration options are applicable for the Solace message broker:
- Connectivity Options:
- TCP (Port as defined in the Solace broker)
- TLS over TCP (Port as defined in the Solace broker)
- Authentication Option: SASL
- Dead Letter Queue Handling based on Delivery Status: Supported, but the dead letter queue and the maximum redeliveries have to be configured in the message broker as well. Delivery status REJECTED puts the message to the dead letter queue. Delivery status MODIFIED_FAILED_UNDELIVERABLE must not be used as it is not supported and can lead to errors during processing.
- JMSRedelivered/JMSXDeliveryCount supported: JMSRedeliverd: Yes; JMSXDeliveryCount: No
- Exclusive Queues supported: Yes
- Message Groups supported: No
- Queues supported: Yes
- Topics supported: Yes
- Topic subscriptions supported: Yes, use queues to subscribe to topics and use the queue in AMQP sender.
Apache Qpid Broker-J
The following configuration options are applicable for the Apache Qpid Broker-J:
- Connectivity Options:
- TCP (Port as defined in the Qpid broker)
- TLS over TCP (Port as defined in the Qpid broker)
- WebSocket (Port as defined in the Qpid broker)
- WebSocket over TLS (Port as defined in the Qpid broker)
- Authentication Option: SASL
- Dead Letter Queue Handling based on Delivery Status: Yes, but the dead letter queue and the maximum redeliveries have to be configured in the message broker as well. Delivery status REJECTED puts the message to the dead letter queue after the number of redelivery attempts configured in the messaging system. Delivery status MODIFIED_FAILED_UNDELIVERABLE takes the message out of the processing as long as the connection to the messaging system is established. After restart of the integration flow or the worker node the message would be tried again. After the number of redelivery attempts configured in the messaging system the message is moved to the dead letter queue.
- JMSRedelivered/JMSXDeliveryCount supported: Yes
- Exclusive Queues supported: Yes
- Message Groups supported: Yes
- Queues supported: Yes
- Topics supported: Yes
- Topic subscriptions supported: Yes, use queues to subscribe to topics and use the queue in AMQP sender.
Apache ActiveMQ 5 and Apache ActiveMQ Artemis
The following configuration options are applicable for the Apache ActiveMQ:
- Connectivity Options:
- TCP (Port as defined in the ActiveMQ broker)
- TLS over TCP (Port as defined in the ActiveMQ broker)
- Authentication Option: SASL
- Dead Letter Queue Handling based on Delivery Status: Yes, the dead letter queue, the maximum redeliveries and a redelivery delay can be configured in the message broker as well. Delivery status REJECTED and MODIFIED_FAILED_UNDELIVERABLE put the message to the dead letter queue.
- JMSRedelivered/JMSXDeliveryCount supported: Yes
- Exclusive Queues supported: No
- Message Groups supported: Yes
- Queues supported: Yes
- Topics supported: Yes
- Topic subscriptions supported: Yes, use queues to subscribe to topics and use the queue in AMQP sender.
Note that Amazon MQ Service is based on Apache Active MQ and so, the same configuration options apply. For details see blog
How to connect to an Amazon MQ Service using the AMQP Adapter.
IBM MQ
In older versions the IBM MQ broker has only limited support for the AMQP protocol, it does not support addressing queues via AMQP protocol, see
IBM MQ documentation. Because of this limitation, with IBM MQ versions < 9.2, the AMQP receiver adapter can only be used to write to topics on the IBM MQ broker, but the AMQP sender adapter cannot consume from IBM MQ because only queues are supported for consumption in Cloud Integration. With IBM MQ version 9.2 this limitation does not exist anymore, see
Point-to-point support on AMQP channels.
The following configuration options are applicable for IBM MQ:
- Connectivity Options:
- TCP (Port as defined in the IBM MQ broker)
- TLS over TCP (Port as defined in the IBM MQ broker)
- Authentication Option: SASL
- Dead Letter Queue Handling based on Delivery Status: Not supported with AMQP protocol.
- JMSRedelivered/JMSXDeliveryCount supported: Yes (with version >= 9.2).
- Exclusive Queues supported: Yes (with version >= 9.2). Max Number of Prefetched Messages in AMQP sender channel needs to be set to 0.
- Note that the Poll status in the deployed artifacts monitor will show an error if multiple workers are running in your tenant. This is because only one consumer is allowed for the exclusive queue, the second worker will get an error when trying to connect.
- Message Groups supported: No.
- Queues supported: Yes (with version >= 9.2)
- Topics supported: Yes
- Topic subscriptions supported: Yes.
RabbitMQ
In testing we found that RabbitMQ has problems with AMQP 1.0. Test was done with plugin version 3.8.2, which was the newest version available at this time. This may change with newer plugin versions.