A couple of days ago I read the following question on the SAP answers section of SAP community: "
CPI Message Processing Stuck". Alice van Ommen asked, if it was possible to stop a SAP CPI message, if it gets stuck in the "Processing" state. The answer to this question pointed to
SAP Note 2399949, which says that it isn't possible and one has to wait for max. 30 days, until a clean-up job runs. I think the SAP note may be a bit misleading, because it doesn't fit in 100% of the cases where a message is stuck in "Processing" state. But let me show you why...
Note: If you're not interested in the "how", but only want a quick solution, scroll down to the "ready-to-use solution" paragraph at the end of this article.
Table of contents
- Can messages (exchanges) on CPI be stopped?
- How to stop messages - step by step
- How to stop messages - a ready-to-use solution
- Feedback and discussion
Can messages (exchanges) on CPI be stopped?
The answer is yes. Messages (=CPI slang; exchanges=Apache Camel slang) can be stopped at least when they are running. This may sound like a triviality, but let me explain why I point out the "when they are running". There can be two reasons why a message is shown as in "Processing" state in SAP CPI's monitoring.
- The message is still running
- The message already stopped, but the monitor didn't got the information
Case 1 may occur if you managed to write an infinite loop in one of your scripts or if there are network slowdown or when dealing with huge payloads or... Case 2 might occur, when the message processing was "hard-interrupted" by crashing the runtime node or undeploy the IFlow during the message processing.
If your message is shown as "Processing" because of "case 2", you have no other chance to get the entry in the monitoring correct, than waiting (up to) 30 days for the monitoring clean-up job. (But hey, after all it's only a display error and the message dosn't consume any resources from your tenant.) But if you get into a "case 1" situation, there is a good chance that you can cancel the message. Let's have a look on the necessary code in the next section.
How to stop messages - step by step
The short version: Get a handle to the Exchange object which should be stopped and stop it.
The long version: At first you should define the id of the IFlow id and the message id of the message you want to stop. Then we use the
FrameworkUtil-class from the OSGi to get the bundle "com.sap.gateway.ip.core.customdev.util.Message" (because it exists in every runtime node) to get the overall BundleContext via the bundle's
getBundleContext-function.
The BundleContext itself gives us access to the service references. (Service reference represent (not exclusive, but also) the instances of our deployed/running IFlows. Via the
getServiceReferences-function and by passing our IFlow's id, we can retrieve the ServiceReference object which links to the IFlow which runs the message we want to stop. In the next step, we cast the ServiceReference to a
CamelContext and save it in a variable called "camelContext".
def iFlowID = "THE_ID_OF_YOUR_IFLOW_ARTIFACT"
def messageID = "17gbd329bd3j90eqh3hq093eh3q" //The SAP CPI message you want to stop
def bundleCtx = FrameworkUtil.getBundle(Class.forName("com.sap.gateway.ip.core.customdev.util.Message")).getBundleContext()
//Get service references for IFlow
ServiceReference[] srs = bundleCtx.getServiceReferences(CamelContext.class.getName(), "(camel.context.name=${iFlowID})");
//Get CamelContext from service reference
CamelContext camelContext = (CamelContext)bundleCtx.getService(srs[0])
Imagine the CamelContext as an instance of our target Iflow. Via the CamelContext's
getInflightRepository-function and it's
browse-function we can get a list of all Exchanges (=messages) which are currently processed by the IFlow. (The InflightRepository tracks the exchanges during the processing steps.)
At first we check if the InflightRepository has at least on message (size() > 1). If so, everything is fine. If not, we stop at this point, because the message we want to stop is already completed. If there are messages in the InflightRepository, we call the
browse-function in combination with the
find-function, passing our message id, to get the repository entry, which reflects the message we search for. We save the repository entry in a variable called "repoEntry".
//Loop through all messages currently in processing
if (camelContext.getInflightRepository().browse().size() < 1){
throw new Exception("No messages in InflightRepository, thus nothing to stop.")
} else {
//Get repository entry for given message id
def repoEntry = camelContext.getInflightRepository().browse().find{ it.getExchange().getProperty("SAP_MessageProcessingLogID") == messageID}
if (repoEntry != null){
//Get exchange, set Exception and stop it
def exchange = repoEntry.getExchange()
exchange.setException(new InterruptedException(exceptionText))
exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
} else {
throw new Exception("No message found for given metatdata.")
}
}
If "repoEntry" isn't null, which means we got a valid reference, we call the
getExchange-function on it, to get an instance of type
Exchange. That's the holy grail we were looking for. With two more lines of code we are able to stop the message now. By calling the
setProperty-function with the parameters "Exchange.ROUTE_STOP" and "Boolean.True" we set the message to be stopped as soon as possible.
Note (1): The
setException-call is optional. If you call this function, the message will show up as "FAILED" in SAP CPI's monitoring. The exception text, passed to the
setException-function, will also be shown in CPI's monitoring. If you don't call the
setException-function, but only set the "Exchange.ROUTE_STOP" property, then the message will be stopped, but shown as "COMPLETED" in CPI's monitoring.
Note (2): By setting the "Exchange.ROUTE_STOP" property, you don't kill an Exchange immediately, but let the Camel framework know, that it should stop the Exchange at the next possible time (=usually the next processing step in the IFlow).
How to stop messages - a ready-to-use solution
Now that we have looked into the technical details, we take a look at the practical implementation. You can use the method above, by building a simple IFlow which looks as follows.
Create a new IFlow with one sender and no receiver. Connect the Sender element via HTTPS sender channel. In the Sender set an endpoint address according to your taste. After that, add a script element of type Groovy Script and past the following script. (The script is based on the theoretical part above, but expanded by some lines of error- and input handling.)
//internal
import com.sap.gateway.ip.core.customdev.util.Message
import groovy.json.JsonOutput
import org.apache.camel.*
import org.osgi.framework.*
Message processData(Message message) {
//Get url parameters
def params = [:]
(message.getHeaders().CamelHttpQuery =~ /(\w+)=?([^&]+)?/)[0..-1].each{
params[it[1]] = it[2]
}
//Stop message
def result = stopExchange(params.messageId, params.iFlowId)
//Return result
def body = JsonOutput.toJson(result)
message.setBody(body)
message.setHeader('Content-Type', 'application/json')
return message
}
private stopExchange(messageId, iFlow, exceptionText = "Exchange stopped manually"){
def result = [successful:true, text:""]
try {
//Get general bundle context
def bundleCtx = FrameworkUtil.getBundle(Class.forName("com.sap.gateway.ip.core.customdev.util.Message")).getBundleContext()
//Get service references for IFlow
ServiceReference[] srs = bundleCtx.getServiceReferences(CamelContext.class.getName(), "(camel.context.name=${iFlow})");
//Get CamelContext from service reference
CamelContext camelContext = (CamelContext)bundleCtx.getService(srs[0])
//Loop through all messages currently in processing
if (camelContext.getInflightRepository().browse().size() < 1){
result.successful = false
result.text = "No messages in InflightRepository, thus nothing to stop."
} else {
//Get repository entry for given message id
def repoEntry = camelContext.getInflightRepository().browse().find{ it.getExchange().getProperty("SAP_MessageProcessingLogID") == messageId}
if (repoEntry != null){
//Get exchange, set Exception and stop it
def exchange = repoEntry.getExchange()
exchange.setException(new InterruptedException(exceptionText))
exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
} else {
result.successful = false
result.text = "No message found for given metatdata."
}
}
}
catch(Exception ex){
result.successful = false
result.text = ex.getMessage()
}
return result
}
That's it. Save and deploy the IFlow. After deployment, you can call your IFlow in any webbrowser. By adding the url parameters "messageId" and "iFlowId" you can stop message with ease now.
Example call:
https://x0815-iflmap.hcisbp.eu1.hana.ondemand.com/http/CancelMessage?messageId=AF3Cuw5OXjZYte4cEwEDE...
Because the IFlow responds with an easy-to-parse JSON document, including status- and error information, it is also ideal for interfacing with a third-party application or monitoring tool of your choice.
Feedback and discussion
It was a fun journey. Sneaking through the classes and features of CPI / Apache and finding out what's possible is always fun. Also, if that should be clear, let me say it again in clear words: The concept shown above is a proof-of-concept. It is not an official method of SAP itself, nor is it recommended by SAP. Please handle this knowledge responsibly.
Enough of the harsh words now. Please let me know what you think about the methods/idea shared with you in the article above? Did you know them before? Have you ever faced the problem of stuck messages and how did you solve the situation? I'm looking forward to your comments.