Technology Blogs by Members
Explore a vibrant mix of technical expertise, industry insights, and tech buzz in member blogs covering SAP products, technology, and events. Get in the mix!
cancel
Showing results for 
Search instead for 
Did you mean: 
r_herrmann
Active Contributor
15,244
A couple of days ago I read the following question on the SAP answers section of SAP community:  "CPI Message Processing Stuck". Alice van Ommen asked, if it was possible to stop a SAP CPI message, if it gets stuck in the "Processing" state. The answer to this question pointed to SAP Note 2399949, which says that it isn't possible and one has to wait for max. 30 days, until a clean-up job runs. I think the SAP note may be a bit misleading, because it doesn't fit in 100% of the cases where a message is stuck in "Processing" state. But let me show you why...



Note: If you're not interested in the "how", but only want a quick solution, scroll down to the "ready-to-use solution" paragraph at the end of this article.

Table of contents



  • Can messages (exchanges) on CPI be stopped?

  • How to stop messages - step by step

  • How to stop messages - a ready-to-use solution

  • Feedback and discussion


Can messages (exchanges) on CPI be stopped?


The answer is yes. Messages (=CPI slang; exchanges=Apache Camel slang) can be stopped at least when they are running. This may sound like a triviality, but let me explain why I point out the "when they are running". There can be two reasons why a message is shown as in "Processing" state in SAP CPI's monitoring.

  1. The message is still running

  2. The message already stopped, but the monitor didn't got the information


Case 1 may occur if you managed to write an infinite loop in one of your scripts or if there are network slowdown or when dealing with huge payloads or... Case 2 might occur, when the message processing was "hard-interrupted" by crashing the runtime node or undeploy the IFlow during the message processing.

If your message is shown as "Processing" because of "case 2", you have no other chance to get the entry in the monitoring correct, than waiting (up to) 30 days for the monitoring clean-up job. (But hey, after all it's only a display error and the message dosn't consume any resources from your tenant.) But if you get into a "case 1" situation, there is a good chance that you can cancel the message. Let's have a look on the necessary code in the next section.

How to stop messages - step by step


The short version: Get a handle to the Exchange object which should be stopped and stop it.

The long version: At first you should define the id of the IFlow id and the message id of the message you want to stop. Then we use the FrameworkUtil-class from the OSGi to get the bundle "com.sap.gateway.ip.core.customdev.util.Message" (because it exists in every runtime node) to get the overall BundleContext via the bundle's getBundleContext-function.

The BundleContext itself gives us access to the service references. (Service reference represent (not exclusive, but also) the instances of our deployed/running IFlows. Via the getServiceReferences-function and by passing our IFlow's id, we can retrieve the ServiceReference object which links to the IFlow which runs the message we want to stop. In the next step, we cast the ServiceReference to a CamelContext and save it in a variable called "camelContext".
def iFlowID = "THE_ID_OF_YOUR_IFLOW_ARTIFACT"
def messageID = "17gbd329bd3j90eqh3hq093eh3q" //The SAP CPI message you want to stop

def bundleCtx = FrameworkUtil.getBundle(Class.forName("com.sap.gateway.ip.core.customdev.util.Message")).getBundleContext()
//Get service references for IFlow
ServiceReference[] srs = bundleCtx.getServiceReferences(CamelContext.class.getName(), "(camel.context.name=${iFlowID})");
//Get CamelContext from service reference
CamelContext camelContext = (CamelContext)bundleCtx.getService(srs[0])

Imagine the CamelContext as an instance of our target Iflow. Via the CamelContext's getInflightRepository-function and it's browse-function we can get a list of all Exchanges (=messages) which are currently processed by the IFlow. (The InflightRepository tracks the exchanges during the processing steps.)

At first we check if the InflightRepository has at least on message (size() > 1). If so, everything is fine. If not, we stop at this point, because the message we want to stop is already completed. If there are messages in the InflightRepository, we call the browse-function in combination with the find-function, passing our message id, to get the repository entry, which reflects the message we search for. We save the repository entry in a variable called "repoEntry".
//Loop through all messages currently in processing
if (camelContext.getInflightRepository().browse().size() < 1){
throw new Exception("No messages in InflightRepository, thus nothing to stop.")
} else {
//Get repository entry for given message id
def repoEntry = camelContext.getInflightRepository().browse().find{ it.getExchange().getProperty("SAP_MessageProcessingLogID") == messageID}
if (repoEntry != null){
//Get exchange, set Exception and stop it
def exchange = repoEntry.getExchange()
exchange.setException(new InterruptedException(exceptionText))
exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
} else {
throw new Exception("No message found for given metatdata.")
}
}

If "repoEntry" isn't null, which means we got a valid reference, we call the getExchange-function on it, to get an instance of type Exchange. That's the holy grail we were looking for. With two more lines of code we are able to stop the message now. By calling the setProperty-function with the parameters "Exchange.ROUTE_STOP" and "Boolean.True" we set the message to be stopped as soon as possible.

Note (1): The setException-call is optional. If you call this function, the message will show up as "FAILED" in SAP CPI's monitoring. The exception text, passed to the setException-function, will also be shown in CPI's monitoring. If you don't call the setException-function, but only set the "Exchange.ROUTE_STOP" property, then the message will be stopped, but shown as "COMPLETED" in CPI's monitoring.

Note (2): By setting the "Exchange.ROUTE_STOP" property, you don't kill an Exchange immediately, but let the Camel framework know, that it should stop the Exchange at the next possible time (=usually the next processing step in the IFlow).

How to stop messages - a ready-to-use solution


Now that we have looked into the technical details, we take a look at the practical implementation. You can use the method above, by building a simple IFlow which looks as follows.



Create a new IFlow with one sender and no receiver. Connect the Sender element via HTTPS sender channel. In the Sender set an endpoint address according to your taste. After that, add a script element of type Groovy Script and past the following script. (The script is based on the theoretical part above, but expanded by some lines of error- and input handling.)
//internal
import com.sap.gateway.ip.core.customdev.util.Message
import groovy.json.JsonOutput
import org.apache.camel.*
import org.osgi.framework.*

Message processData(Message message) {

//Get url parameters
def params = [:]
(message.getHeaders().CamelHttpQuery =~ /(\w+)=?([^&]+)?/)[0..-1].each{
params[it[1]] = it[2]
}

//Stop message
def result = stopExchange(params.messageId, params.iFlowId)

//Return result
def body = JsonOutput.toJson(result)
message.setBody(body)
message.setHeader('Content-Type', 'application/json')
return message
}

private stopExchange(messageId, iFlow, exceptionText = "Exchange stopped manually"){
def result = [successful:true, text:""]

try {
//Get general bundle context
def bundleCtx = FrameworkUtil.getBundle(Class.forName("com.sap.gateway.ip.core.customdev.util.Message")).getBundleContext()

//Get service references for IFlow
ServiceReference[] srs = bundleCtx.getServiceReferences(CamelContext.class.getName(), "(camel.context.name=${iFlow})");
//Get CamelContext from service reference
CamelContext camelContext = (CamelContext)bundleCtx.getService(srs[0])


//Loop through all messages currently in processing
if (camelContext.getInflightRepository().browse().size() < 1){
result.successful = false
result.text = "No messages in InflightRepository, thus nothing to stop."
} else {
//Get repository entry for given message id
def repoEntry = camelContext.getInflightRepository().browse().find{ it.getExchange().getProperty("SAP_MessageProcessingLogID") == messageId}
if (repoEntry != null){
//Get exchange, set Exception and stop it
def exchange = repoEntry.getExchange()
exchange.setException(new InterruptedException(exceptionText))
exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
} else {
result.successful = false
result.text = "No message found for given metatdata."
}
}
}
catch(Exception ex){
result.successful = false
result.text = ex.getMessage()
}
return result
}

That's it. Save and deploy the IFlow. After deployment, you can call your IFlow in any webbrowser. By adding the url parameters "messageId" and "iFlowId" you can stop message with ease now.

Example call:
https://x0815-iflmap.hcisbp.eu1.hana.ondemand.com/http/CancelMessage?messageId=AF3Cuw5OXjZYte4cEwEDE...

Because the IFlow responds with an easy-to-parse JSON document, including status- and error information, it is also ideal for interfacing with a third-party application or monitoring tool of your choice.

Feedback and discussion


It was a fun journey. Sneaking through the classes and features of CPI / Apache and finding out what's possible is always fun. Also, if that should be clear, let me say it again in clear words: The concept shown above is a proof-of-concept. It is not an official method of SAP itself, nor is it recommended by SAP. Please handle this knowledge responsibly.

Enough of the harsh words now. Please let me know what you think about the methods/idea shared with you in the article above? Did you know them before? Have you ever faced the problem of stuck messages and how did you solve the situation? I'm looking forward to your comments.
34 Comments
engswee
Active Contributor
Fantastic.... simply fantastic!

bravo … thanks for wonderful blog. 

NickSYYang
Active Participant

Great blog, I encountered same issue this year due to problematic input data and caused iFlow unable to reach completed.

This blog shows great insight in CPI. I wonder how you did your investigation during figuring out the code in your blog. If all done in CPI tenant then your are brilliant!

r_herrmann
Active Contributor
Hi Nick,

for testing/figuring out what is possible, I used the method described here, which allows us to run scripts immediately without deploying: https://blogs.sap.com/2019/06/17/rapid-groovy-scripting-in-cpi-dynamic-loading-and-execution-of-scri... This accelerated the development decisively.

As entry point for my investigation I used Eng Swee's hint in this article, which shows us how to access the Exchange-object of a message: https://blogs.sap.com/2018/04/05/using-camels-simple-in-cpi-groovy-scripts/

With access to the Exchange itself, we have access to a Apache Camel class/object. Thus I tried to "break" out of the CPI environment and see what is possible in Apache Camel. By searching Apache Camel boards, documentation and Stackoverflow I tried to figure out how it is possible to stop Exchanges in Camel. With the ideas from my research, I tried to code down the information I collected in the CPI environment.

Hopefully this answers your question.
Hi Raffael,

thanks a lot for your blog, great solution!

The only hurdle I have to overcome is that non of my iFlows can be Deployed since the message got stuck...

 

Any tips;-)

 

All the best, regards Alice
vadimklimov
Active Contributor
Raffael, thank you for such mindful insight. Message cancellation is truly a highly demanded feature, and an elegant way to address it that you illustrated!
r_herrmann
Active Contributor
If you tenant is thus messed up, that even deployment doesn't work, I don't see another chance, then opening a ticket at SAP and ask for a reboot of the tenant.

For the future I suggest, to implement Vadim's/Eng Swee's solution for running code dynamically: https://blogs.sap.com/2019/06/17/rapid-groovy-scripting-in-cpi-dynamic-loading-and-execution-of-scri...

Thus you could either call the "stop message" code or call a simple "System.exit(0)" which would end up in a "hard" reboot of your tenant. (Use the System.exit-command only as last resort. Because you may see a lot of "Case 2" messages...)

Regards,
Raffael
NickSYYang
Active Participant
0 Kudos

Thanks for sharing your journey on how to make it work.

Your sample code also shows very lean and thoroughly understanding of Groovy language.

engswee
Active Contributor
Getting into the Camel bits behind CPI is indeed fun and a whole new world to explore 🙂 And indeed it is achieved much faster/easier with dynamic script execution 🙂

I love the way you expanded on the knowledge that Vadim and myself shared, and brought this to another level. It shows how a vibrant community of passionate individuals can spur each other towards greater heights by sharing knowledge and continually learning from one another. Kudos again for another fantastic, simply fantastic work.
0 Kudos
Thank you for the detailed insight !! Its a long awaited solution 🙂
engswee
Active Contributor
0 Kudos
Raffael, I think you might be able to get access to the Camel Context without the bundle and service reference.
def Message processData(Message message) {
Exchange exchange = message.exchange
CamelContext context = exchange.getContext()

I admit it takes advantage of Groovy's direct access to private attributes and may seem "hacky", but we are indeed hacking into the system anyway 🙂

 
r_herrmann
Active Contributor
Hi engswee.yeoh ,

You're right. It's an easier way to get the CamelContext belonging to an exchange. But it doesn't help for this task. If I'm not wrong, each Iflow/OSGi package has its own CamelContext with an independent InflightRepository. So using the CamelContext from the Exchange that was triggered to stop a specific message would only work if the code runs in the same Iflow which also hold the message to be stopped.

If there is a way to get access to something like a global InflightRepository this would not be necessary.

Regards
engswee
Active Contributor
Right. Understand now. You are accessing a different Iflow's exchange.

 

Great. Thanks for the explanation - the code makes sense now 🙂
former_member82872
Discoverer
0 Kudos
Great Blog Raffael Herrmann !!

Can we not stop messages/Achieve the same using any of CPI platform APIs ?

Thanks

Nag
r_herrmann
Active Contributor
0 Kudos
Hi Nagarjuna,

no (at least right now) it isn't possible via official APIs.

Regards,
Raffael
0 Kudos
Hi,

Is there a possibility to also get rid of 'Retry' status messages?

Thanks,

Marco
0 Kudos
Hi Raffel,

 

I tried the same to stop my iflow messages in processing status.But every time it is giving me message as "No messages in InflightRepository, thus nothing to stop."It is not getting any messages inflight repository,though I can see them in message monitoring.Can you please suggest onthis.

 

Thanks,

Sonali
r_herrmann
Active Contributor
0 Kudos
Hi Sonalia,

 

if there are no more messages in the InflightRepository, then you've either choosen a wrong IFlow-Id or the messages are already processed and the CPI monitor just didn't get the information. The second option is case 2 as described in the article: "Case 2 might occur, when the message processing was “hard-interrupted” by crashing the runtime node or undeploy the IFlow during the message processing." In this case you have to wait for the official clean-up job.

 
former_member194481
Participant
Hi Herrmann,

Is it possible to stop the messages which are retrying as well?

 

Thanks and Regards,

Vijay.
r_herrmann
Active Contributor
0 Kudos
Hi Vijay,

I haven't tried to stop "retry" messages. But if you're talking about the standard retry mechanism (with JMS adapter + message queues) than I would guess it should work with the code shown above if you make one little adjustment: Remove the following line from the code above:
exchange.setException(new InterruptedException(exceptionText))

Without these line messages are stopped with state "Completed" instead of "Failed". I think the retry in scenario with JMS adapter in retry mode will only be triggered if a message goes into errornous state. Maybe you can "bypass" this by setting the message simply to "Completed" state as described above.

 
former_member194481
Participant
0 Kudos
Hi Herrmann,

I have tried, but the iflow is not getting canceled. i have commented the line which you have shown.

 

Thanks and Regards,

Vijay.
philippeaddor
Active Participant
0 Kudos

Hi Raffael,

I have the same problem. The messages in status “Retry” at least in my case origin from XI Sender Adapters with QoS “At least once” (or “Exactly Once”) configured.

These messages seem to retry forever (?) without a way to cancel them like in PI/PO. Apparently this was forgotten by SAP. ?

Your approach sadly doesn’t work here. So we need to find a solution for this case as well…

Great blog by the way!

 

maxi1555
Contributor
0 Kudos
The easy and unique way is deleting them from the datastore ?
BiswaPal
Explorer
0 Kudos
Hi  r_herrmann ,

Thank you for the great blog!!

I have tried the above Groovy script option in iFlow to cancel the processing message (due to SFTP session close issue) but it doesn't work.

Got the successful status when triggered the iFlow with proper Message and iFlow id at browser level and see the entry at Monitoring also. (completed) . However the iFlow message in Processing status persists.
{"successful":true,"text":""}

Could you kindly help?

Hi vijaykumar.kvn ,

Did you happen to successfully cancel the 'processing status' messages?

Best Regards,
Suresh S
Emre_tr
Active Participant
0 Kudos
Hi,

when i try to get messages from InflightRepository always comes empty. Could the reason be authorization?
r_herrmann
Active Contributor
0 Kudos
Maybe a missing role or something in the "background logic" may have changed. Where did you try it? On NEO or Cloud Foundry? (I'm asking, because I tried it out on NEO when writing this blog post and from my experience the Cloud Foundry based CPI behaves differently at some points...)
Emre_tr
Active Participant
0 Kudos
It's neo environment. It's strange i have administrator and developer roles but camelContext's inflightrepository coming empty. There are 5 flows stuck in processing mode .
dineshhome1361_7
Participant
0 Kudos
Hello r_herrmann

I have query on the below lines ,

def params = [:]
(message.getHeaders().CamelHttpQuery =~ /(\w+)=?([^&]+)?/)[0..-1].each{
params[it[1]] = it[2]
}

I understood at higher level , this MAP is extracting messageID and Iflow Name  but still confused.
Could anyone explain what these lines does  with some example/illustration please?

I tried another way to extract Message ID and Iflow ID but IFlow i guess there is no other way

def msgid= message.getProperties("SAP_MessageProcessingLogID")

 

Thanks,

Dinesh

 
r_herrmann
Active Contributor
0 Kudos

Hi dineshhome1361_7 ,

It's a combination of Regex and map function. Maybe this sample is easier to understand:

//message.getHeaders().CamelHttpQuery contains an url query string
//For debugging we set url query manually
def url = "a=1&b=second&c=hello"
def params = [:]
//Parse query string with regex
def regexMatch = (url =~ /(\w+)=?([^&]+)?/)
println "Result of Regex:"
println regexMatch
//Loop through regex matches
regexMatch[0..-1].each{
params[it[1]] = it[2]
}
println "Resulting parameters:"
print params

You can see an intercative demo here: https://jdoodle.com/ia/hmC

An explanation of the regex can be found here: https://regex101.com/r/D9lgxV/1

gganesh15051
Explorer
0 Kudos

Hi

I have tried script and it's just putting the exception messages and not aborting immediately. The failed status and exception texts are appearing in CPI monitoring after usually execution. Do we have any option to cancel immediate.

 

{
"successful": true,
"text": ""
}

Thanks,

Ganesh.

kuznetsov_in2
Explorer
0 Kudos
Other reason to get it for my understanding is: CamelContext + InflightRepository are bound to IFLMAP node. If your tenant has multiple ones, it's necessary to have some way dispatch this code amonst them.
FedericoTokman
Advisor
Advisor
0 Kudos
Hi all, one of my customers wants to stop/delete some CPI in DEV and QA. Do we have any step by step to do that? Regards.
Ravi_Bharti08
Explorer
Hello Raffael,

Thanks for writing this great blog.
I was trying this scenario.
-I created one iflow in which i wrote groovy script which would put the message in sleep(for 3 minutes) so that the iflow status shows as "Processing".

-Used your approach to stop the message, the iflow returns success but it still runs for 3 minutes after that the status is reflected as "Failed".

 

I was thinking it should have failed the moment we invoke the iflow for stopping this message right ? Any thoughts on this ?

Groovy script :
import com.sap.gateway.ip.core.customdev.util.Message

def Message processData(Message message) {
def headers = message.getHeaders()
// Sleep for 3 minutes
Thread.sleep(3*60000)
return message
}

 


message stopping time

luis_gomez
Newcomer
0 Kudos

Hello @r_herrmann ,
we are trying the ready to use solution but we get the following result :S

 

{"successful":false,"text":"while trying to get the length of a null array loaded from local variable 'a'"}

We are thinking is an authorization issue but cannot find the root cause. Any inputs?

Thanks 

Labels in this area