Introduction
With the latest release of SAP Cloud Integration, we provide a new API to hook into the Message Processing Log and Adapter Trace feature. With this newly released API, adapter developers are also able to contribute the knowledge about the HTTP endpoints opened by the adapter and display them in the
Manage Integration Content view of the Web application.
In this blog, I create an adapter to demonstrate the installation and usage of the new Adapter API.
Prerequisites
You must install the
Adapter Development Kit for SAP Cloud Integration. For detailed instructions refer to the
documentation.
Basic Adapter
The initial step is to create a new
Adapter Project, called
BlogADKAdapter in Eclipse. In the Eclipse
New Adapter Project wizard enter the details of the new adapter as shown in the next screenshot.
Clicking on
Finish creates the adapter. The project contains the following files:
Component |
Description |
BlogADKAdapterComponent.java |
Sample runtime component. |
BlogADKAdapterConsumer.java |
Sample consumer. |
BlogADKAdapterEndpoint.java |
Sample endpoint. |
BlogADKAdapterProducer.java |
Sample producer. |
metadata.xml |
Component configuration properties and other meta-data. |
BlogADKAdapterComponentTest.java |
Sample JUnit test. |
pom.xml |
Contains configuration details and dependencies. |
Compiling the Adapter
To compile the adapter project, use standard Maven command
mvn install or the Maven M2Eclipse plugin. The output of the compilation process is shown in the Terminal window or
Console view of Eclipse, and the path to the generated adapter is shown at the end of the build command output.
Using the Adapter
We have now created a simple polling mechanism adapter. For this blog post, we modify the adapter as it would act like receiving HTTP messages.
To use the new adapter, you must deploy the adapter to the tenant and create an Integration Flow.
Deploy the Adapter
First, go to the Integration Operations Eclipse perspective and select the tenant from the
Node Explorer view. Then open the context menu of your tenant and select
Deploy Artifacts... From the appearing wizard select Integration Adapter to deploy your adapter by selecting the above generated
BlogADKAdapter.esa file.
You can see the result of the deployment in the
Component Status View Eclipse view.
Create an Integration Flow
Create a new
Blog integration package and add a new Integration Flow called
BlogADKAdpaterIFlow. For more information on creating Integration Flows refer to the
documentation.
Within the Integration Flow, you need to connect the Sender and Receiver components and select the new
BlogADKAdapter as the adapter type of the connection.
The Integration Flow should look like:
Configure the Sender Adapter with the values shown in the table below:
Parameter |
Value |
Description |
First URI Part |
firstURIPartOfSender |
Specifies the HTTP path of the sender adapter |
Greetings Message |
Hello from Sender Adapter |
Example of a Sender adapter parameter |
Delay |
6048000000 |
Delay for sending further messages. We set a very high value to trigger only an initial message. |
Configure the Receiver Adapter with the values shown in the table below:
Parameter |
Value |
Description |
First URI Part |
firstURIPartOfReceiver |
Specifies the path of the receiver adapter |
Greetings Message |
Hello from Receiver Adapter |
Example of a Receiver adapter parameter |
Afterward, deploy the Integration Flow.
Go to the
Manage Integration Content view of the Web application, to view the state of your Integration Flow. You should see your deployed Integration Flow:
By clicking on
Monitor Message Processing link, you can see the processed message of
BlogADKAdapter Integration Flow.
Note: A redeployment or restart of the Integration Flow triggers a new message processing.
Note: To view the detailed Message Processing Log, you must execute the Integration Flow with, at least,
Debug log level. For more information about log levels refer to the
documentation.
We now have completed the initial setup of the adapter.
Adapter API
So far, we have developed a very basic adapter and an Integration Flow for its execution. If you want to contribute to the Message Processing Log, Tracing or Endpoint Information then you must use the new Adapter API.
Installation
Execute the following steps to install the Adapter APIs in your Adapter project:
- Download the Adapter API and Generic API from https://tools.hana.ondemand.com/#cloudintegration
- Note the file version of both JAR files and rename the files to generic-api.jar and adapter-api.jar
- Move both files to the BlogADKAdapter\src\main\resources directory and refresh your Eclipse project. The Eclipse project should look similar to:
- To make the JAR files available in your project, add the following snippet to the end of the dependencies section of the pom.xml file, located in your project root directory:
<!-- replace XXX with the file version -->
<dependency>
<groupId>com.sap.it.public</groupId>
<artifactId>adapter.api</artifactId>
<version>XXX</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/resources/adapter-api.jar</systemPath>
</dependency>
<dependency>
<groupId>com.sap.it.public</groupId>
<artifactId>generic.api</artifactId>
<version>XXX</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/resources/generic-api.jar</systemPath>
</dependency>
- Add com.sap.it.public to the excludeGroupIds section in the configuration settings of the maven-dependency-plugin section.
<excludeGroupIds>com.sap.cloud.adk,org.apache.camel,org.slf4j,log4j,com.sap.it.public</excludeGroupIds>
Verify that everything was applied correctly by expanding the
Maven Dependencies section of your Eclipse project and reviewing the referenced JAR files.
Note: If you edit the pom.xml file outside of Eclipse, then you must update the Maven project definition in Eclipse, to apply the changes to the Eclipse project settings.
Integrate into Message Processing Log
If your adapter shall add information to the Message Processing Log, then you use the classes of the
com.sap.it.api.msglog.adapter package delivered with the Adapter API.
To add to the Message Log of the sender adapter you modify the existing
poll() method of the sender adapter that is implemented in the
BlogADKAdapterConsumer class. Following listing shows the updated poll() method of the
BlogADKAdapterConsumer class:
@Override
protected int poll() throws Exception {
Exchange exchange = endpoint.createExchange();
// get adapter message log factory
AdapterMessageLogFactory msgLogFactory = ITApiFactory.getService(AdapterMessageLogFactory.class, null);
// create a message body
String greetingsMessage = endpoint.getGreetingsMessage();
Date now = new Date();
if(greetingsMessage == null || greetingsMessage.isEmpty()){
LOG.error("The message is empty! Default one will be used");
greetingsMessage = "Hello There!! ";
}
StringBuilder builder = new StringBuilder(greetingsMessage);
builder.append(" Now it is ");
builder.append(now.toString());
// add content to the MPL
AdapterMessageLogWithStatus msgLog = null;
try {
msgLog = msgLogFactory.getMessageLogWithStatus(exchange, "BlogADKAdapterConsumer Inbound Log Text ",
"BlogADKAdapter-ID", "SomeRandomString" + System.currentTimeMillis() + "-IN");
msgLog.putAdapterAttribute("GreetingsMessage", greetingsMessage);
} finally {
if (msgLog != null) {
msgLog.close();
}
}
exchange.getIn().setBody(builder.toString());
try {
// send message to next processor in the route
getProcessor().process(exchange);
return 1; // number of messages polled
} finally {
// log exception if an exception occurred and was not handled
if (exchange.getException() != null) {
getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
}
}
}
The implementation of the
AdapterMessageLogFactory is provided by the adapter runtime, which is not available for unit-tests. So the factory has to be either mocked for the unit-test execution or has to be skipped for the maven build via the
skipTests Maven option.
After compiling the adapter, you must deploy the adapter.
Then set the log level to
Debug and restart your Integration Flow.
The following screenshot shows the new step in the Message Processing Log:
Going to the step details of the highlighted step shows the new header content:
Integrate into Tracing
If your adapter transforms the payload or adds headers before sending or receiving a message, then those changes will not be reflected in the regular trace. However, it might be very useful information in troubleshooting to see the payload and headers exactly how they were received or transmitted. So if this is the case for your adapter, you can provide that information to the tracing mechanism.
The following changes to the BlogADKAdapterConsumer class set the payload and define header entries in the trace information.
package blogadkadapter;
import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.ScheduledPollConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sap.it.api.ITApiFactory;
import com.sap.it.api.msglog.adapter.AdapterMessageLogFactory;
import com.sap.it.api.msglog.adapter.AdapterMessageLogWithStatus;
import com.sap.it.api.msglog.adapter.AdapterTraceMessage;
import com.sap.it.api.msglog.adapter.AdapterTraceMessageType;
/**
* The Sample.com consumer.
*/
public class BlogADKAdapterConsumer extends ScheduledPollConsumer {
private Logger LOG = LoggerFactory.getLogger(BlogADKAdapterConsumer.class);
private final BlogADKAdapterEndpoint endpoint;
private Map<String, String> headerElements = new HashMap<>();
public BlogADKAdapterConsumer(final BlogADKAdapterEndpoint endpoint, final Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
// initialize a map for storing a sample header for demo purposes only. In typical scenarios
// e.g. providing own HTTP endpoints with your adapter, this is not required and will be automatically defined
this.headerElements.put("protocolType", "foo");
this.headerElements.put("protocolVersion", "1");
this.headerElements.put("applicationMessageFormat", "bar");
}
@Override
protected void doStart() throws Exception {
super.doStart();
}
@Override
protected int poll() throws Exception {
DefaultExchange exchange = (DefaultExchange) endpoint.createExchange();
// as we just created our camel exchange we need to assign the header values
// for this example we will add all non protocol specific headers
for (String key : headerElements.keySet()) {
if (!key.startsWith("protocol")) {
exchange.getIn().setHeader(key, headerElements.get(key));
}
}
// get adapter message log factory
AdapterMessageLogFactory msgLogFactory = ITApiFactory.getService(AdapterMessageLogFactory.class, null);
// create a message body
String greetingsMessage = endpoint.getGreetingsMessage();
Date now = new Date();
if (greetingsMessage == null || greetingsMessage.isEmpty()) {
LOG.error("The message is empty! Default one will be used");
greetingsMessage = "Hello There!! ";
}
StringBuilder builder = new StringBuilder(greetingsMessage);
builder.append(" Now it is ");
builder.append(now.toString());
AdapterMessageLogWithStatus msgLog = null;
try {
// add content to the MPL
msgLog = msgLogFactory.getMessageLogWithStatus(exchange, "BlogADKAdapterConsumer Inbound Log Text ", "BlogADKAdapter-ID",
"SomeRandomString" + System.currentTimeMillis() + "-IN");
msgLog.putAdapterAttribute("GreetingsMessage", greetingsMessage);
// set the body of the in message
exchange.getIn().setBody(builder.toString());
// if trace is active, write the sender inbound content to the trace log
if (msgLog.isTraceActive()) {
writeTraceMessage(exchange, headerElements, msgLog, AdapterTraceMessageType.SENDER_INBOUND);
}
// send message to next processor in the route
getProcessor().process(exchange);
return 1; // number of messages polled
} finally {
// if trace is active, write the sender outbound content to the trace log
if (msgLog.isTraceActive()) {
Map<String, String> outboundHeaderElements = new HashMap<>();
// set the outbound headers to the once originally retrieved in the message
outboundHeaderElements.putAll(convertMap(exchange.getIn().getHeaders()));
// for demo purposes add an additional header entries to the outbound header list
outboundHeaderElements.put("isProcessed", "true");
writeTraceMessage(exchange, outboundHeaderElements, msgLog, AdapterTraceMessageType.SENDER_OUTBOUND);
}
if (msgLog != null) {
msgLog.close();
}
// log exception if an exception occurred and was not handled
if (exchange.getException() != null) {
getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
}
}
}
/**
* Convert a Map<String, Object> to a new Map<String, String>
*
* @param inputMap
*/
private Map<String, String> convertMap(Map<String, Object> inputMap) {
Map<String, String> result = new HashMap<String, String>();
for (String key : inputMap.keySet()) {
result.put(key, inputMap.get(key).toString());
}
return result;
}
/**
* Write a trace message.
*
* @param msgLog
* message log
* @param type
* message type
*/
private void writeTraceMessage(final Exchange exchange, Map<String, String> headers, final AdapterMessageLogWithStatus msgLog,
final AdapterTraceMessageType type) {
// creates a trace message to write to the log
// in this example we write our generated payload to the trace
// typically you want to do this only if you changed the payload by your adapter
Object payload = exchange.getIn().getBody();
byte[] payloadBytes = new byte[0];
if (payload != null) {
try {
payloadBytes = payload.toString().getBytes("UTF-8");
} catch (UnsupportedEncodingException uee) {
getExceptionHandler().handleException("Error reading payload", exchange, uee);
}
}
AdapterTraceMessage traceMsg = msgLog.createTraceMessage(type, payloadBytes, false);
traceMsg.setHeaders(headers);
msgLog.writeTrace(traceMsg);
}
}
If you activate
Trace log level and restart the Integration Flow, the trace information is written and shown in the step details view of the Message Monitor, as shown below.
Show Endpoint Information
If your adapter opens HTTP endpoints, that can be called from outside, the URL of the endpoints depends on the tenant on which the adapter is deployed and used. Depending on the way the URL’s path is built, it can be hard for the user to get to the correct URL to be called. In order to simplify this, there is the option to display the URLs as part of the Integration Flow details of the
Monitor Integration Content view so that the user can copy the URL and use it to configure a client calling that endpoint for instance.
To expose the URLs of your adapter, you need to expose the
AdapterEndpointInformationService. In this service, your adapter can return for each Integration Flow the path of the endpoints opened relative to the root of the CPI worker node. The root path will be added automatically when displaying the endpoint information.
To implement this, we require the following implementation tasks:
- Create a Registry for storing a list of endpoints for an Integration Flow
- Add and remove endpoints from the Registry as endpoints are started or stopped
- Implement the AdapterEndpointInformationService service definition to provide a list of endpoints to the backend
Preparation
We need to store the path of the endpoint to make it accessible to the other classes in our adapter implementtion.
package blogadkadapter;
import java.net.URISyntaxException;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultPollingEndpoint;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Represents a www.Sample.com Camel endpoint.
*/
@UriEndpoint(scheme = "sap-sample", syntax = "", title = "")
public class BlogADKAdapterEndpoint extends DefaultPollingEndpoint {
private BlogADKAdapterComponent component;
private transient Logger logger = LoggerFactory.getLogger(BlogADKAdapterEndpoint.class);
// used to store our First URI Part adapter parameter, per default it
// is not part of the adapter properties
private String path;
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
@UriParam
private String greetingsMessage;
public String getGreetingsMessage() {
return greetingsMessage;
}
public void setGreetingsMessage(String greetingsMessage) {
this.greetingsMessage = greetingsMessage;
}
public BlogADKAdapterEndpoint() {
}
public BlogADKAdapterEndpoint(final String endpointUri, final BlogADKAdapterComponent component) throws URISyntaxException {
super(endpointUri, component);
this.component = component;
this.path = "/";
}
public BlogADKAdapterEndpoint(final String uri, final String remaining, final BlogADKAdapterComponent component) throws URISyntaxException {
this(uri, component);
// the First URI Part adapter parameter is set in the remaining method parameter
this.path = remaining;
}
public Producer createProducer() throws Exception {
return new BlogADKAdapterProducer(this);
}
public Consumer createConsumer(Processor processor) throws Exception {
final BlogADKAdapterConsumer consumer = new BlogADKAdapterConsumer(this, processor);
configureConsumer(consumer);
return consumer;
}
public boolean isSingleton() {
return true;
}
}
Implementation of a Registry
The new endpoint service, that we will create next, must return the endpoints for each Integration Flow. So we have to implement a Registry for storing this information. The following class is used to store the endpoints of an Integration Flow:
package blogadkadapter;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* Registry for storing an Integration Flow together with its endpoints.
*/
public class EndpointRegistry {
private static final Map<String, List<BlogADKAdapterEndpoint>> registry = new HashMap<>();
/**
* Adds an endpoint for an Integration Flow
*
* @param iflow
* the Integration Flow
* @param endpoint
* the endpoint
* @see BlogADKAdapterEndpoint
*/
public static void add(String iflow, BlogADKAdapterEndpoint endpoint) {
if (registry.containsKey(iflow)) {
List<BlogADKAdapterEndpoint> endpoints = registry.get(iflow);
endpoints.add(endpoint);
} else {
List<BlogADKAdapterEndpoint> endpoints = new LinkedList<>();
endpoints.add(endpoint);
registry.put(iflow, endpoints);
}
}
/**
* Returns all endpoints for all iflows.
*
* @return List of iflows and their endpoints
* @see BlogADKAdapterEndpoint
*/
public static Map<String, List<BlogADKAdapterEndpoint>> getEndpoints() {
return registry;
}
/**
* Returns the endpoints for an Integration Flow
*
* @param iflow
* the Integration Flow
* @return List of Endpoints
* @see BlogADKAdapterEndpoint
*/
public static List<BlogADKAdapterEndpoint> getEndpoints(String iflow) {
if (registry.containsKey(iflow)) {
return registry.get(iflow);
}
return new LinkedList<>();
}
/**
* Removes an endpoint from an Integration Flow
*
* @param iflow
* the integration flow
* @param endpoint
* the endpoint to remove
* @see BlogADKAdapterEndpoint
*/
public static void remove(String iflow, BlogADKAdapterEndpoint endpoint) {
if (registry.containsKey(iflow)) {
List<BlogADKAdapterEndpoint> endpoints = registry.get(iflow);
endpoints.remove(endpoint);
if (endpoints.size() == 0) {
registry.remove(iflow);
}
}
}
}
Using the EndpointRegistry
We need to add and remove endpoints in the Registry when a new Sender endpoint is started or stopped. To implement this, we must change the
BlogADKAdapterConsumer class to modify the existing doStart() method and also overwrite and change the doStop() method:
@Override
protected void doStart() throws Exception {
super.doStart();
EndpointRegistry.add(endpoint.getCamelContext().getName(), endpoint);
}
@Override
protected void doStop() throws Exception {
super.doStop();
EndpointRegistry.remove(endpoint.getCamelContext().getName(), endpoint);
}
Implementation of AdapterEndpointInformationService
Next, we implement the AdapterEndpointInformationService service for our adapter. The service implementation is done like the
BlogADKAdapterEndpointInformationService class shown below.
package blogadkadapter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.camel.ServiceStatus;
import com.sap.it.api.adapter.monitoring.AdapterEndpointInformation;
import com.sap.it.api.adapter.monitoring.AdapterEndpointInformationService;
import com.sap.it.api.adapter.monitoring.AdapterEndpointInstance;
import com.sap.it.api.adapter.monitoring.EndpointCategory;
/**
* Expose the endpoints used by this adapter.
*/
public class BlogADKAdapterEndpointInformationService implements AdapterEndpointInformationService {
/**
* Returns the list of all endpoints of all IntegrationFlows used by the
* adapter.
*
* @return the list of endpoints
*/
@Override
public List<AdapterEndpointInformation> getAdapterEndpointInformation() {
List<AdapterEndpointInformation> endpointInfos = new ArrayList<>();
Map<String, List<BlogADKAdapterEndpoint>> knownEndpoints = EndpointRegistry.getEndpoints();
for (String iflow : knownEndpoints.keySet()) {
List<AdapterEndpointInformation> iflowEndpoints = this.getAdapterEndpointInformationByIFlow(iflow);
endpointInfos.addAll(iflowEndpoints);
}
return endpointInfos;
}
/**
* Returns the list of endpoints for a given IntegrationFlow. If the given
* Integration Flows does not use the adapter, it has to return an empty
* instance list.
*
* @param integrationFlowId
* the id (bundle symbolic name) of the Integration Flow
* @return the endpoint information for the given Integration Flow
*/
@Override
public List<AdapterEndpointInformation> getAdapterEndpointInformationByIFlow(String iflow) {
List<AdapterEndpointInformation> endpointInfos = new ArrayList<>();
List<BlogADKAdapterEndpoint> endpoints = EndpointRegistry.getEndpoints(iflow);
if (endpoints != null && endpoints.size() > 0) {
for (BlogADKAdapterEndpoint endpoint : endpoints) {
String camelEndpoint = endpoint.getPath();
if (endpoint.getStatus() == ServiceStatus.Started) {
// in this example the endpoint should be available via: <host>/adk/<first uri part>
// note that a channel can have one entry endpoint and multiple definition endpoints
// our BlogADKAdapter uses only the entry endpoint and not the definition endpoints
AdapterEndpointInstance mainEndpointInstance = new AdapterEndpointInstance(EndpointCategory.ENTRY_POINT,
"/adk/" + camelEndpoint, null);
AdapterEndpointInformation adapterEndpointInformation = new AdapterEndpointInformation();
adapterEndpointInformation.setIntegrationFlowId(iflow);
adapterEndpointInformation.setAdapterEndpointInstances(Arrays.asList(mainEndpointInstance));
endpointInfos.add(adapterEndpointInformation);
}
}
}
return endpointInfos;
}
}
Exposing the BlogADKAdapterEndpointInformationService Service
To expose our new
BlogADKAdapterEndpointInformationService OSGi service we need to create the beans.xml file in the BlogADKAdapter\src\main\resources\OSGI-INF\blueprint directory, to make our service available to the runtime.
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd">
<bean id="iAdapterEndpointInformationService" class="blogadkadapter.BlogADKAdapterEndpointInformationService"
activation="eager"></bean>
<service id="IAdapterEndpointInformationService" ref="iAdapterEndpointInformationService"
interface="com.sap.it.api.adapter.monitoring.AdapterEndpointInformationService">
</service>
</blueprint>
After implementing all changes above, compile and deploy the adapter. Then restart your Integration Flow to see the endpoint information in the Manage Integration Content view of Web application.
Summary
We have seen how to use the Adapter API to hook into the Message Processing Log, Adapter Tracing and endpoint information. I hope you liked the post. If you have any questions or feedback, don’t hesitate to comment on this blog post.
References
Develop adapters
Java API
Creating Integration Flows
Managing Integration Content
Setting Log Levels
Tracing
Message Processing Log - Adapter Tracing
Monitoring Message Processing
Blog: Enhancements to Message Processing Log Viewer in the Web Application