This blog is part of a blog series from SAP Datasphere product management with the focus on the Replication Flow capabilities in SAP Datasphere:
Replication Flow Blog Series Part 1 – Overview | SAP Blogs
Replication Flow Blog Series Part 2 – Premium Outbound Integration | SAP Blogs
Replication Flows Blog Series Part 3 – Integration with Kafka
Replication Flows Blog Series Part 4 – Sizing
Replication Flows Blog Series Part 5 – Integration between SAP Datasphere and Databricks
Replication Flows Blog Series Part 6 – Confluent as a Replication Target
Replication Flows Blog Series Part 7 – Performance
Replication Flows Blog Series Part 8 - Confluent as a Replication Source
Data Integration is an essential topic in a Business Data Fabric like SAP Datasphere. Replication Flow is the cornerstone to fuel SAP Datasphere with data, especially from SAP ABAP sources. There is also a big need to move data from third party sources into SAP Datasphere to succeed certain use cases.
In this part of the Replication Flow Blog series, we focus on the usage of Confluent as a source in Replication Flows. We will explain in detail the new capabilities that have been introduced with SAP Datasphere release 2024.23. The content of this blog is structured as follows.
The purpose of the additional features that are described in this Blog, is to provide tailor-made integration with our dedicated partner Confluent’s Apache Kafka®-based data streaming platform—both fully managed Confluent Cloud and self-managed Confluent Platform. More specifically, we now provide the possibility to consume Kafka messages from Confluent, respectively, in SAP Datasphere. The content of this blog does not apply to the generic Kafka integration in SAP Datasphere.
The Confluent integration described in this blog is only usable in SAP Datasphere Replication Flows.
For the examples and step-by-step instructions in the blog, we assume that a properly configured SAP Datasphere tenant and a Confluent Cloud cluster are available. In addition, we assume that the reader is familiar with the basic concepts around Replication Flows and Connection Management in SAP Datasphere as well as with the Kafka data streaming capabilities of Confluent. For a guide on how to setup a connection to Confluent Cloud via the SAP Datasphere Connection Management, we refer the reader to Part 6 of this blog series. If we want to refer/link to Confluent specific material, for the sake of consistency we will always use the Confluent Cloud documentation as well as related material.
In order to explain the various aspects of the consumption capabilities in SAP Datasphere Replication Flows for Confluent Kafka data we use the following sample setup. It is solely intended to explain the new functionalities and does not claim to fully or partially represent real scenarios. We assume that we have employee data that is stored in a Kafka topic Demo_Topic consisting of 6 partitions as illustrated in the following figure.
Figure 1 - Kafka Messages with employee data
The employee Kafka messages comply with the following JSON schema whose subject is named Demo_Topic-value and is available in the default schema registry context.
{
"$id": "http://example.com/myURI.schema.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": false,
"description": "Sample schema",
"properties": {
"history": {
"items": [
{
"properties": {
"end": {
"type": "string"
},
"position": {
"type": "string"
},
"start": {
"type": "string"
}
},
"required": [
"position",
"start",
"end"
],
"type": "object"
}
],
"type": "array"
},
"id": {
"type": "integer"
},
"personal": {
"properties": {
"email": {
"type": "string"
},
"firstName": {
"type": "string"
},
"secondName": {
"type": "string"
}
},
"required": [
"firstName",
"secondName",
"email"
],
"type": "object"
},
"position": {
"type": "string"
}
},
"required": [
"id",
"personal",
"position"
],
"title": "SampleRecord",
"type": "object"
}
We will also consider AVRO schemas in this blog.
Confluent Schema Registry is a prerequisite when using a Confluent system as a source in Replication Flows. Additionally, the messages must contain the schema ID in the bytes that follow the magic byte at the beginning of the message. How schemas are used during the replication process is described in the next section.
Replication Flows itself can consume topics with Kafka messages that comply with schemas of type AVRO or JSON.
We now run through the creation process of a Replication Flow. In parallel we will describe the different configuration options that are available during design time of a Replication Flow in case a Confluent system is selected as the source. For this blog, we use SAP Datasphere as the replication target. Of course, in a real-world scenario, any system that is supported by Replication Flows as a replication target can be used. As mentioned in the first section of this blog, we assume that a connection to the source Confluent system has been set up in the Connection Management. In our case, this connection is named CONFLUENT_DEMO.
After we have entered the Replication Flow creation screen via the , we select CONFLUENT_DEMO as the source connection. A Confluent Schema Context always serves as a Replication Flow Source Container. The default schema context, that we also use in our example, is always denoted with a dot (.). But the Replication Flow Source Container selection screen lists all available schema contexts. In the Replication Flow Source Object selection screen we can choose one or several Kafka topics that are available in the Confluent Cloud cluster. In our example we only select Demo_Topic. Finally, we select the SAP Datasphere local repository as the replication target.
Figure 2 - Replication Flow with Confluent as Source
Although there is no direct relation between Confluent schema contexts and Kafka topics, like it is the case for schemas and tables in classical relational database systems, the two Confluent artifacts are used to derive a structure definition of the replication target (e.g. a HANA table schema in our example). We provide the details below.
As highlighted in Figure 2, for each replication object, in the Object Properties Panel of the Replication Flow screen there are the four standard sections General, Projections, Settings and Target Columns as well as two confluent specific sections Confluent: Source Settings and Confluent: Source Schema Settings. The General and Target Columns section do not require additional explanation.
Settings
It is possible to decide whether Delta Capture Columns are added to the replication target as well as whether the replication target is truncated before the replication starts. For Confluent Kafka sources, only the load type Initial and Delta is supported. Which parts of a topic are actually replicated, depends on the Starting Point that can be configured in the section Confluent: Source Settings (see below).
Figure 3 - General Replication Task Settings
Projections
For each Kafka topic that is added as a source in a Replication Flow there is a projection automatically generated. One can do the usual configurations via the Projection screens.
Figure 4 - Projection Settings
The projection is created automatically due to the way how the Kafka message body of a Kafka message is flattened into a table format. We will provide the details later in the next chapter about Replication Flow Runtime Specifics.
Confluent: Source Schema Settings
The section provides an overview of the Confluent schema that is used to deserialize the Kafka messages of the selected Kafka topic. It also provides a possibility to configure/change the schema that is supposed to be used.
Figure 5 - Source Schema Settings
During the workflow showcased in Figure 2 an initial proposal for the schema to be used, is derived according to the following rules:
The source Confluent schema, which is a JSON/AVRO nested structure, is then translated into a flat representation according to certain rules (see chapter Replication Flow Runtime Specifics).
In Figure 5 the main area of the schema configuration screen shows the resulting flat representation of the Kafka schema (i.e. viewable via the button Flat) as well as a mapping of the columns of the flat representation to the original nested schema (i.e. viewable via the button Nested). The nested schema uses to specify the data types of the different properties/fields. The Apache Arrow types are only mentioned for the sake of completeness since they are used internally for type mapping.
On the Properties panel on the left side a Confluent schema definition can be selected via Subject Name and a Schema Version dropdown menus.
In addition, one can choose to include/exclude a Technical Key in the target schema. If set, an additional column __message_id of type String(39) is added that constitutes a primary key of the target table derived from the Kafka message in the following way. The value for that column of a row originating from a Kafka message from partition p and offset o is “<p>-<o>”.
Confluent: Source Settings
The section contains configuration options to fine tune the topic consumption behaviour.
Figure 6 - Confluent: Source Settings
The following table lists explains the different configuration options that are shown in Figure 4 above.
Configuration | Value(s) | Explanation |
Consume Other Schema Versions | True/False | If set to True, a message (M) that is retrieved and which has a different schema version as opposed to what is configured in the Confluent: Source Schema Settings is tried to be mapped to the configured schema (S) according to the following rules.
The Ignore Schema Missmatch configuration allows to configure the behaviour of the replication in case the schema mapping fails. If set to False, the replication partition will fail with an error stating that the schemas do not match. The subject is always required to be identical to the one specified via the Confluent Source Schema Settings configuration. Remark: |
Ignore Schema Missmatch | True/False | This option is only available if Consume Other Schema is set to True. If set to False, the replication partition will fail if a received message with a different schema cannot be mapped to the configured schema. |
Fail on Data Truncation | True/False | If set to True, a replication partition fails if a source string field is encountered that does not fit into a string of at most 5000 chars. Also applies to binary values exceeding a length of 5000 bytes. If set to False, string fields larger than 5000 characters are automatically truncated and written into the target column. The replication continues. |
Starting Point | Read from earliest offset / Read from latest offset | Read from earliest offset Read from latest offset |
In this section we will have a closer look on certain behaviour during a replication run. We start with some generic bullet points.
We now provide details regarding the flattening of Kafka source messages.
Message Flattening & Type Mapping
As indicated in the last chapter there is a flattening process involved when setting up and running a Replication Flow. The Confluent schema needs to be flattened into a proper target schema and the content of the Kafka messages need to be translated into the flat target schema. The flattening process for the schema is a two step process. From a high level perspective the flow looks as follows.
Figure 7 - Flattening Process
Let’s take our example topic with the corresponding simplified JSON schema. In the figure below we also added a sample message body and included a possible AVRO schema definition of the JSON schema.
Figure 8 - Sample Schema & Message
In our example the first step is illustrated in the following figure.
Figure 9 - Generating a flat representation of a nested schema
The following rules are applied to flatten the JSON/AVRO schema into the flat Source Schema.
Using the above sample message, we would end up with the following flattened source message (pseudo notation).
Figure 10 - Example: Flattening Step 1
The second step depends on the target system that was chosen in the Replication Flow UI. In our example the flat target schema looks as follows.
Figure 11 - Generating a flat target schema
In our example, the resulting SAP HANA table in SAP Datasphere looks as follows.
Figure 12 - Sample target table data
The details on the flattening logic provided in this blog will be extended according to subsequent releases and enhancements.
The following two tables show a mapping of the different data types from the source Kafka JSON/AVRO schema, the intermediate flat source schema and the flat target schema. There is also a column which gives information whether a certain source data type is supported or not.
AVRO Type | Arrow Type | HANA Type | Google Big Query | Object Store (parquet) | Object Store (csv) | Comment |
null | - | - | - | - | - | ignored |
int | ARROW_TYPE_INT32 | INTEGER | INT64 | INT32 | string |
|
long | ARROW_TYPE_INT64 | BIGINT | INT64 | INT64 | string |
|
float | ARROW_TYPE_FLOAT32 | REAL | FLOAT64 | FLOAT | string -ddd.dddd for small (<6) and -d.ddddE±dd for large exponents |
|
double | ARROW_TYPE_FLOAT64 | DOUBLE | FLOAT64 | DOUBLE | string -ddd.dddd for small (<6) and -d.ddddE±dd for large exponents |
|
bytes | ARROW_TYPE_BINARY | VARBINARY(5000) | BYTES(n) | BYTE_ARRAY | string | Truncation logic is applied |
boolean | ARROW_TYPE_BOOLEAN |
| BOOL | BOOLEAN | string |
|
string | ARROW_TYPE_UTF8 | NVARCHAR(5000) | STRING(N) | BYTE_ARRAY | string | Truncation logic is applied |
records | StructType | - | - | - | - | Resolved via flattening |
enum | ARROW_TYPE_UTF8(length N) | NVARCHAR(N) | STRING(N) | BYTE_ARRAY | string | The length N is defined by the maximum length of the enum values |
array | - | - | - | - | - | Not supported |
map | - | - | - | - | - | Not supported |
union | n/a | Depends on type that is used in the union (see comment) | Depends on type that is used in the union (see comment) | Depends on type that is used in the union (see comment) | Depends on type that is used in the union (see comment) | Only support [“null”, <some-type>] |
fixed(N) | ARROW_TYPE_FIXEDSIZEBINARY(length N) | VARBINARY(N) | BYTES(N) | BYTE_ARRAY | string |
|
decimal(p,s) | ARROW_TYPE_DECIMAL128/ ARROW_TYPE_DECIMAL256(p,s) | DECIMAL(p,s) | NUMERIC(p,s) | FIXED_LEN_BYTE_ARRAY | string |
|
uuid | ARROW_TYPE_UTF8 | NCLOB | - | BYTE_ARRAY | string |
|
date | ARROW_TYPE_DATEDAY | DATE | DATE | INT32 | string |
|
time-millis | ARROW_TYPE_TIMEMILLI | TIME | TIME |
| string |
|
time-micros | ARROW_TYPE_TIMEMICRO | TIME | TIME | INT64 | string |
|
timestamp-millis | ARROW_TYPE_TIMESTAMPMILLITZ | TIMESTAMP | TIMESTAMP |
| string |
|
timestamp-micros | ARROW_TYPE_TIMESTAMPMICROTZ | TIMESTAMP | TIMESTAMP | INT64 | string |
|
local-timestamp-millis | ARROW_TYPE_TIMESTAMPMILLI | TIMESTAMP | TIMESTAMP |
| string |
|
Local-timestamp-micros | ARROW_TYPE_TIMESTAMPMICRO | TIMESTAMP | TIMESTAMP | INT64 | string |
|
duration | ARROW_TYPE_INTERVALMONTHDAYNANO | - | - | - | - |
|
JSON as schema type
JSON Type | Arrow Type | HANA Type | Google Big Query | Object Store (parquet) | Object Store (csv) | Comment |
string | ARROW_TYPE_UTF8 | NVARCHAR(5000) | STRING(5000) | BYTE_ARRAY | string |
|
number | ARROW_TYPE_FLOAT64 | DOUBLE | FLOAT64 | DOUBLE | string -ddd.dddd for small (<6) and -d.ddddE±dd for large exponents |
|
integer | ARROW_TYPE_INT64 | BIGINT | INT64 | INT64 | string |
|
object | StructType | - | - | - | - | Resolved via flattening |
array | - | - | - | - | - | Not supported |
boolean | ARROW_TYPE_BOOLEAN | BOOLEAN | BOOL | BOOLEAN | string |
|
null | None, ignored. | - | - | - | - |
|
string with format date-time | ARROW_TYPE_TIMESTAMPNANOTZ | TIMESTAMP | TIMESTAMP | INT64 | string |
|
string with format time | ARROW_TYPE_TIMENANO | TIME | TIME | INT64 | string |
|
string with format date | ARROW_TYPE_DATEDAY | DATE | DATE | INT32 | string |
|
We end this blog with a short remark regarding replication fails.
Failing a Replication from a Confluent Kafka source partition
As outlined in the chapter above about Configuration Options for Replication Flows the replication might fail because of various reasons, for example if the Fail on Data Truncation flag is turned on or if a message uses a wrong schema. In case such a replication failure occurs the replication is only stopped for the corresponding source Kafka partition that contains the message that caused the failure. It is currently not possible to automatically stop the full Replication Task or Flow in case a replication failure occurs during the replication of a specific source Kafka partition.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
User | Count |
---|---|
8 | |
8 | |
6 | |
6 | |
6 | |
5 | |
5 | |
5 | |
4 | |
4 |