Technology Blogs by SAP
Learn how to extend and personalize SAP applications. Follow the SAP technology blog for insights into SAP BTP, ABAP, SAP Analytics Cloud, SAP HANA, and more.
cancel
Showing results for 
Search instead for 
Did you mean: 

はじめに


SAP Data Intelligence(以降、DI)は、「データ管理」と「AIの開発・運用」を一つに統合したソリューションです。詳細については、SAP Data Intelligenceで実現するエンタープライズAIをご参照ください。

本ブログの目的


DIのパイプラインを使うことで、SAPやnon-SAPにまたがってデータが散らばる複雑なランドスケープにおいて、データを効率よく連携・管理できることを理解。

前提知識



  • ABAP:CDS Viewを書いたことがあるレベル


説明の流れ



  1. DI-S/4 HANAとDI-Kafkaの接続設定

  2. パイプライン作成

  3. パイプライン実行


本内容は、IT管理者やデータエンジニアに実施して頂く想定になります。

作成するパイプライン


CDS Viewの内容を読み取り、JSON形式に変換した上でnon-SAPシステムに格納するパイプラインを作成します。
non-SAPシステムとしてApache Kafka(以降、Kafka)を使用しますが、Kafkaの知識は必要ありません。

補足)Kafkaとは?
高スループット、かつ低レイテンシなオープンソースの分散ストリーミングプラットフォームです。

1. DI-S/4 HANAとDI-Kafkaの接続設定


ここでは、読み取り対象のCDS Viewを準備し、DIのConnection ManagementからS/4 HANAとKafkaへの接続設定を行います。

補足)Connection Managementは、SAPやnon-SAPの様々なデータソースへの接続を定義し、管理する機能です。ここで定義した接続設定を用いることで、データカタログ機能によりDBのテーブルを確認したり、パイプライン機能によりデータ連携先として簡単にデータソースにアクセスできるようになります。

CDS View準備


まずは、ABAP Development Tools(ADT)でDIの読み取り対象になるCDS Viewを準備します。

補足)CDS Viewの作成方法がわからない方は、こちらをご参照ください。

DIから読み取りが出来るようにするために、CDS Viewに以下のAnnotationを付与します。
@Analytics: { 
dataExtraction: {
enabled: true,
delta.changeDataCapture.automatic: true
}
}

今回は以下のようなCDS Viewを使用します。
@AbapCatalog.sqlViewName: 'ZCDSMARA_0'
@AbapCatalog.compiler.compareFilter: true
@AbapCatalog.preserveKey: true
@AccessControl.authorizationCheck: #CHECK
@EndUserText.label: 'ZCDS MARA test'

@Analytics: {
dataExtraction: {
enabled: true,
delta.changeDataCapture.automatic: true
}
}

define view ZCDS_MARA_0 as select from mara {
key matnr,
ersda,
created_at_time,
scm_matid_guid16
}

次に、ADTで読み取り対象のCDS Viewに以下のようにデータが入っていることを確認します。



DIとS/4 HANAの接続設定


DIのLaunchpadで、Connection Managementを選択します。


“Create Connection”ボタン(+)を押します。


“Connection Type”で”ABAP”を選択します。


接続対象となるS/4 HANAのホスト名、認証情報等を設定し、”Create”ボタンを押します。


Connectionの一覧画面で、”Action”ボタンから”Check Status”を実行します。


以下のようにOKと表示されたら、対象のS/4 HANAとの接続に成功しています。



DIとKafkaの接続設定


Connection一覧画面に戻り、“Create Connection”ボタン(+)を押します。


“Connection Type”で”Kafka”を選択します。


接続対象となるKafkaのホスト名、ポート等を設定し、”Create”ボタンを押します。


これでDIとKafkaの接続設定は完了です。

2. パイプライン作成


ここでは、CDS Viewを読み取り、JSONに変換してKafkaに格納するまでを行うDIのパイプラインを作成します。

新規パイプライン作成


DIのLaunchpadで、"Modeler"を選択します。


"Graphs"を選択してから、"Create Graph"ボタンをクリックします。


補足)Graphというのは、パイプラインのフローのことを指します。

"Save"ボタンを押し、作成したGraphに名前を付けます。


今回は"CDS to Kafka"という名前で保存します。



フロー作成:CDS View読み取り


ここからは、Operatorという小さな機能の単位を組み合わせてデータの流れを制御するフローを作成します。

まずはCDS Viewを読み取るためのOperatorである"ABAP CDS Reader"をダブルクリックし、フロー内に配置します。


読み取るCDS Viewを設定するために、フロー内の"ABAP CDS Reader"を選択し、"Open Configuration"をクリックします。


指定した”ABAP CDS Reader”オペレーターの設定画面が右側に開きます。
“ABAP Connection”のドロップダウンボタンをクリックするとConnection Managementで設定したABAP接続の一覧が表示されるため、接続対象のS/4 HANAを選択します。


"Version"の選択ボックスをクリックすると、"Select Version"画面が表示されます。
ABAP CDS Readerに複数のバージョンがありますが、最新版のV2を選択します。

補足)新規にパイプラインを作成する際は、古いバージョンで検証を行っていた等の特別な事情がない限り、新しいバージョンを利用してください。



"ABAP CDS Name"に対象の読み取り対象のCDS View名を記載します。
"Transfer Mode"は今回はInitial Loadを指定します。


ここまででCDS Viewの読み取り設定は完了です。

フロー作成:CSV ⇒ JSON変換


次に、CDS Viewから読み取ったCSVをJSONに変換するフローを作成します。

CSVからJSONに変更する機能を持つ"Format Converter"オペレーターをダブルクリックし、フロー内に配置します。


"ABAP CDS Reader V2"オペレーターと"Format Converter"オペレーターを接続するために、各々のOutputポートとInputポートのデータ形式を確認します。

まずは、"ABAP CDS Reader V2"オペレーターのOutputポートにマウスカーソルを合わせると、以下のように"Data Type: message"と表示されます。


次に、"Format Converter"オペレーターのInputポートにマウスカーソルを合わせると、以下のように"Data Type: blob"と表示されます。


接続したいオペレーター同士のデータ形式が違うため、これを統一する必要があります。

message形式のデータをblob形式に変換するために、"blob"で検索すると、"ToBlob Converter"オペレーターというものが見つかるので、ダブルクリックしてフロー内に配置します。


先ほどと同じように"ToBlob Converter"オペレーターのInputポート、Outputポートを確認すると、Inputが"Data Type: any"、Outputが"Data Type: blob"となり、"ABAP CDS Reader V2"オペレーターと"Format Converter"オペレーターを接続する際のデータ形式の仲介役に利用できることがわかります。

接続したいOperatorをそれぞれ、以下のようにマウスでドラッグして接続します。



次に、"Format Converter"を選択し、"Open Configuration"をクリックします。


"Target Format"に返還後のデータ形式である"JSON"を指定します。
"Fields"にJSON変換時のヘッダー情報を記入します。

冒頭のCDS View確認時のヘッダーを参考に、"Fields"は以下のように設定します。



これでCSV ⇒ JSONのフローが完成です。

フロー作成:Kafkaへの書き込み


最後に、Kafkaへのデータ書き込みのフローを作成します。

"Kafka"で検索すると、以下のように2つのオペレーターが表示されます。
ConsumerはKafkaからデータを取得するオペレーターで、ProducerはKafkaにデータを書き込むオペレーターです。

今回はKafkaに対してデータを書き込みたいので、"Kafka Producer"オペレーターをダブルクリックしてフロー内に配置します。


"Format Converter"オペレーターのOutputポートと、"Kafka Producer"オペレーターのInputポートを接続します。


Kafkaへのデータ書き込み設定をするために、フロー内の"Kafka Producer"を選択し、"Open Configuration"をクリックします。


”Kafka Producer”オペレーターの設定画面が右側に開きます。
“Connection Type”のドロップダウンボタンをクリックし、"connection management"を選択します。


補足)"manual"を選択した場合は、Kafkaのホスト名等、接続情報を記入します。
今回は事前にConnection ManagementでKafkaの接続情報を設定済みのため、既存の接続情報からKafkaの宛先を選ぶことができる"connection management"を選択しました。

次に、"Kafka Producer Connection"の"Editproperty"ボタンをクリックします。
"Edit property 'Kafka Producer Connection'"画面が表示されるので、Connection IDのドロップダウンのリストから接続対象のKafkaを選択し、"save"ボタンをクリックします。


"Topic"にデータ書き込み先のKafkaの対象トピック名を記入します。


補足)トピックというのは、Kafkaにおけるデータストリーミングの1つの箱のようなものだと考えてください。

ここまででフローの作成は完了です。"Save"ボタンをクリックしてフローを保存します(Ctrl+Sでも可)。



3. パイプライン実行


パイプライン実行前の環境確認


以下のようにKafkaに格納されたデータを確認すると、パイプライン実行前は空であることがわかります。
# ./bin/kafka-console-consumer.sh --bootstrap-server <Kafka Host>:<Kafka Port> --topic cdsTestTopic --from-beginning

パイプライン実行


Modeler画面の“Run”ボタンで作成したパイプラインを実行します。


“Status”欄に実行状態が表示されます。”Running”と表示されたら、パイプラインによるデータ転送処理を実行中です。



パイプライン実行後の環境確認


再度Kafkaに格納されたデータを確認すると、以下のようにデータが格納されていることがわかります。
# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cdsTestTopic --from-beginning
[
{"created_at":"11:49:49.000","esrda":"2020-09-12","guid":"42010AEE-E443-1EDA-BD9D-C1FE403C4C0B","matnr":"TG22"},
{"created_at":"11:49:49.000","esrda":"2020-09-12","guid":"42010AEE-E443-1EDA-BD9D-C1FE403C2C0B","matnr":"TG21"},
{"created_at":"11:49:49.000","esrda":"2020-09-12","guid":"42010AEE-E443-1EDA-BD9D-C1FE403C0C0B","matnr":"TG20"},
……
{"created_at":"03:44:10.000","esrda":"2021-01-06","guid":"42010AEE-E443-1EDB-93FA-2DECC2C485FD","matnr":"000000000000000015"}
]

パイプライン停止


最後に、"Stop Process"ボタンを押し、パイプラインを停止します。


 

まとめ


以上のように、DIを活用することにより、様々な場所に散らばったデータを効率よく連携・変換することができます。

データは重要な資産であり、インサイトの発見に欠かせません。
しかし、今日ではデータがオンプレのSQL-DB、NoSQL-DB、クラウド上のデータレイク等あちこちに散らばり、複雑なランドスケープを形成しています。

そのため、データサイエンティストがデータ分析をする際、必要なデータを集める前準備だけで多くの時間を消費してしまいます。

DIを使うことで、複雑なランドスケープにおけるデータ管理を効率化したり、可視化することでデータフローを直感的に掴めるようになります。
これにより、インサイトの発見やイノベーション創出に役立てることが出来ます。

データ管理基盤として是非、DIをご活用頂けたら幸いです!