aws s3 cp /data/preprocessing/s3_upload/dataset.csv s3://{{S3 bucket}} /dataset/version1/dataset.csv
{
"name": "default",
"type": "webhdfs",
"pathPrefix": "<path prefix to be appended>",
"data": {
// e.g. https://c32727c8-4260-4c37-b97f-ede322dcfa8f.files.hdl.canary-eu10.hanacloud.ondemand.com
"HDFS_NAMENODE": "https://<file-container-name>.files.hdl.canary-eu10.hanacloud.ondemand.com",
"TLS_CERT": "-----BEGIN CERTIFICATE-----\nMIICmjCCAYIxxxxxxxxxxxxR4wtC32bGO66D+Jc8RhaIA==\n-----END CERTIFICATE-----\n",
"TLS_KEY": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqxxxxxxxxxxxxnor+rtZHhhzEfX5dYLCS5Pww=\n-----END PRIVATE KEY-----\n",
"HEADERS": "{\"x-sap-filecontainer\": \"<file-container-name>\", \"Content-Type\": \"application/octet-stream\"}"
}
}
#Create artifacts
artifact = ai_core_client.artifact.create(
name = "blogdataset", # Custom Non-unqiue identifier
kind = Artifact.Kind.DATASET,
url = "ai://default/dataset/version3", #"ai://mldata-secret-model/model/blogmodel"
scenario_id = scenario_id,
description = "create dataset for preprocessing",
labels = [
Label(key="ext.ai.sap.com/dataset", value="mldata"), # any descriptive key-value pair, helps in filtering, key must have the prefix ext.ai.sap.com/
],
resource_group = "blog-demo"
# required to restrict object store secret usage within a resource group)
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pyspark.sql.functions as func
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import os
import sys
import numpy as np
import pandas as pd
from itertools import islice
import csv
# PATH Variable
INPUT_PATH = '/app/data/'
OUTPUT_PATH = '/app/dataoutput/training.csv'
path_data = INPUT_PATH + 'before_preprocessing.csv'
# Start a Spark session
spark = SparkSession.builder.getOrCreate()
# read csv
df = spark.read.csv(path_data, header=True, inferSchema=True)
# Preprocessing steps
def preprocessing_consumer(df):
....
return df_preprocessed
# Write Preprocessed file
df_preprocessed.write.option("header", True).csv(OUTPUT_PATH)
docker build -t sumin/ml-demo:0.1 .
FROM python:3.9-slim-buster
RUN apt-get update --allow-releaseinfo-change
# # Install Python packages
RUN apt-get update && \
apt-get install -y python3 python3-pip openjdk-11-jdk-headless wget && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Upgrade pip and install PySpark3
RUN pip3 install --upgrade pip && \
pip3 install pyspark==3.4.0
RUN wget https://archive.apache.org/dist/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz && \
tar -zxvf spark-3.4.0-bin-hadoop3.tgz && \
mv spark-3.4.0-bin-hadoop3 /usr/local/spark && \
rm spark-3.4.0-bin-hadoop3.tgz
# Set environment variables for Hadoop
#https://spark.apache.org/docs/latest/
ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64
ENV JAVA_TOOL_OPTIONS="-Dio.netty.tryReflectionSetAccessible=true"
ENV SPARK_HOME=/usr/local/spark
ENV HADOOP_HOME=/usr/local/spark
# Expose Spark UI port
EXPOSE 4040
# # Creates directory within your Docker image
RUN mkdir -p /app/src/
# # Don't place anything in below folders yet, just create them
RUN mkdir -p /app/data/
RUN mkdir -p /app/dataoutput/
# # # Copies file from your Local system TO path in Docker image
COPY preprocessing.py /app/src/
COPY requirements.txt /app/src/
# # # # Installs dependencies within you Docker image
RUN pip3 install -r /app/src/requirements.txt
# Enable permission to execute anything inside the folder app
RUN chgrp -R 65534 /app && \
chmod -R 777 /app
docker push sumin /ml-demo:0.1
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: blog-preprocessing-pipeline # Executable ID (max length 64 lowercase-hyphen-separated), please modify this to any value if you are not the only user of your SAP AI Core instance. Example: `first-pipeline-1234`
annotations:
scenarios.ai.sap.com/description: "SAP AI Core-blog demo for preprocessing"
scenarios.ai.sap.com/name: "blog-prep"
executables.ai.sap.com/description: "blog-prep"
executables.ai.sap.com/name: "blog-prep preprocessing pipeline"
artifacts.ai.sap.com/blogdataset.kind: "dataset"
labels:
scenarios.ai.sap.com/id: "blog-prep"
executables.ai.sap.com/id: "blog-preprocessing-pipeline"
ai.sap.com/version: "0.0.1"
spec:
imagePullSecrets:
- name: credstutorialrepo-i551982 # your docker registry secret
entrypoint: glog-demo-pipeline
templates:
- name: blog-demo-pipeline
metadata:
labels:
ai.sap.com/resourcePlan: train.l
inputs:
artifacts: # placeholder for cloud storage attachments
- name: blogdataraw #a name for the placeholder
path: /app/data/ #where to copy in the Dataset in the Docker image
outputs:
artifacts:
- name: blogdataprep # name of the artifact generated, and folder name when placed in s3, complete directory will be `../<executaion_id>/housepricemodel`
globalName: blogdataprep # local identifier name to the workflow, also used above in annotation
path: /app/dataoutput # from which folder in docker image (after running workflow step) copy contents to cloud storage
archive:
none: {}
container:
image: "docker.io/sumin/demo-prep:0.1" #docker.io/lsm1401/blog-demo:0.5 - scikit lean / docker.io/sumin/blog-demo:0.4 - pyspark
imagePullPolicy: Always
command: ["/bin/sh", "-c"] #["sh", "-c"]
env:
args:
- >
set -e && echo "---Start Preprocessing---" && python /app/src/preprocessing.py && ls -lR /app/dataoutput && echo "---End Preprocessing---"
# Expose Spark UI port
EXPOSE 4040
# # Creates directory within your Docker image
RUN mkdir -p /app/src/
# # Don't place anything in below folders yet, just create them
RUN mkdir -p /app/data/
RUN mkdir -p /app/model/
RUN mkdir -p /app/model/blogmodel
RUN mkdir -p /app/model/encoder
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: blog-training-pipeline # Executable ID (max length 64 lowercase-hyphen-separated), please modify this to any value if you are not the only user of your SAP AI Core instance. Example: `first-pipeline-1234`
annotations:
scenarios.ai.sap.com/description: "SAP AI Core-BLOG demo"
scenarios.ai.sap.com/name: "blog-demo-training"
executables.ai.sap.com/description: "blog-demo-training"
executables.ai.sap.com/name: "blog-demo training pipeline"
artifacts.ai.sap.com/blogdataset.kind: "dataset" # Helps in suggesting the kind of artifact that can be attached. #
artifacts.ai.sap.com/blogmodel.kind: "model" # Helps in suggesting the kind of artifact that can be generated.
labels:
scenarios.ai.sap.com/id: "blog"
executables.ai.sap.com/id: "blog-training-pipeline"
ai.sap.com/version: "0.1.8"
spec:
imagePullSecrets:
- name: credstutorialrepo-i551982 # your docker registry secret
entrypoint: blog-demo-pipeline
arguments:
parameters: # placeholder for string like inputs
- name: DT_MAX_DEPTH # identifier local to this workflow
- name: MAX_BINS
parameters: # placeholder for string like inputs
- name: DT_MAX_DEPTH # identifier local to this workflow
- name: MAX_BINS
DT_MAX_DEPTH= int(os.getenv('DT_MAX_DEPTH')) # 10
MAX_BINS = int(os.getenv('MAX_BINS')) #5
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Generate a classification report
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1_score = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
print("Accuracy = %g" % (accuracy))
print("Precision = %g" % (precision))
print("Recall = %g" % (recall))
print("F1 score = %g" % (f1_score))
from ai_core_sdk.models import Metric, MetricTag, MetricCustomInfo, MetricLabel
from ai_core_sdk.tracking import Tracking
aic_connection = Tracking()
# log the AUC metric using aic_connection.log_metrics()
aic_connection.log_metrics(
metrics = [
Metric(name= "Accuracy",value= float(accuracy),timestamp=datetime.utcnow(), labels= [MetricLabel(name="metrics.ai.sap.com/Artifact.name", value="blogmodel")]),
Metric(name= "Precision",value= float(precision),timestamp=datetime.utcnow(), labels= [MetricLabel(name="metrics.ai.sap.com/Artifact.name", value="blogmodel")]),
Metric(name= "Recall",value= float(recall),timestamp=datetime.utcnow(), labels= [MetricLabel(name="metrics.ai.sap.com/Artifact.name", value="blogmodel")]),
Metric(name= "F1 Score",value= float(f1_score),timestamp=datetime.utcnow(), labels= [MetricLabel(name="metrics.ai.sap.com/Artifact.name", value="blogmodel")])
]
)
FROM python:3.8
ENV LANG C.UTF-8
RUN apt-get update --allow-releaseinfo-change
# # Install Python packages
RUN apt-get update && \
apt-get install -y python3 python3-pip openjdk-11-jdk openjdk-11-jdk-headless wget && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Upgrade pip and install PySpark3
RUN pip3 install --upgrade pip && \
pip3 install pyspark==3.2.1
RUN wget https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz && \
tar -zxvf spark-3.2.1-bin-hadoop3.2.tgz && \
mv spark-3.2.1-bin-hadoop3.2 /usr/local/spark && \
rm spark-3.2.1-bin-hadoop3.2.tgz
# # Set environment variables for Hadoop
ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
ENV JAVA_TOOL_OPTIONS="-Dio.netty.tryReflectionSetAccessible=true"
ENV SPARK_HOME=/usr/local/spark
ENV PATH $PATH:$SPARK_HOME/bin
# Custom location to place code files
RUN mkdir -p /app/src
COPY hdiconfig.json /app/src/
COPY main.py /app/src/
COPY requirements.txt /app/src/requirements.txt
RUN pip3 install -r /app/src/requirements.txt
# Required to execute script
RUN chgrp -R nogroup /app && \
chmod -R 770 /app
docker build -t sumin/ml-demo-serving:0.0.1 .
docker push -t sumin/ml-demo-serving:0.0.1
apiVersion: ai.sap.com/v1alpha1
kind: ServingTemplate
metadata:
name: demo-classifier-1 #Enter a unique name here
annotations:
scenarios.ai.sap.com/description: " demo classification"
scenarios.ai.sap.com/name: " demo-predict-clf"
executables.ai.sap.com/description: " demo classification GPU Serving executable"
executables.ai.sap.com/name: " demo-serve-exectuable"
artifacts.ai.sap.com/demomodel.kind: "model" # Suggest the kind of artifact to input.
labels:
scenarios.ai.sap.com/id: "demo-clf"
ai.sap.com/version: "0.0.1"
spec:
inputs:
artifacts:
- name: modelArtifact
template:
apiVersion: "serving.kubeflow.org/v1beta1"
metadata:
labels: |
ai.sap.com/resourcePlan: basic
spec: |
predictor:
imagePullSecrets:
- name: credstutorialrepo-{examplenumber}
containers:
- name: kfserving-container
image: "docker.io/sumin/ml-demo-serving:0.0.1"
ports:
- containerPort: 9001
protocol: TCP
command: ["/bin/sh", "-c"]
args:
- >
set -e && echo "Starting" && gunicorn --chdir /app/src main:app -b 0.0.0.0:9001 # filename `main` flask variable `app`
env:
- name: STORAGE_URI
value: "{{inputs.artifacts.modelArtifact}}"
import os
import json
import numpy as np
import pandas as pd
from flask import Flask, jsonify
from flask import request as call_request
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
from pyspark.sql.types import IntegerType, FloatType, StringType, StructType, StructField, DoubleType
from pyspark.ml.classification import DecisionTreeClassificationModel
@app.before_first_request
def init():
"""
Load model else crash, deployment will not start
"""
global model
MODEL_PATH = "/mnt/models/blogmodel"
spark = SparkSession.builder.appName("Model").getOrCreate()
model = DecisionTreeClassificationModel.load(MODEL_PATH)
return None
@app.route("/v2/greet", methods=["GET"])
def status():
global model
if model is None:
return "Flask Code: Model was not loaded."
else:
return "Model is loaded."
# You may customize the endpoint, but must have the prefix `/v<number>`
@app.route("/v2/predict", methods=["POST"])
def predict():
"""
Perform an inference on the model created in initialize
Returns:
Predict Classification: Dunning level .
"""
global model
# Data Loading
json_load = call_request.json
spark = SparkSession.builder.getOrCreate()
rdd = spark.sparkContext.parallelize(json_load)
df = spark.read.json(rdd)
PIPELINE_PATH = '/mnt/models/encoder'
featurePipeline = PipelineModel.load(PIPELINE_PATH)
def transfomer(df)
# Add an index column
features_vectorized = featurePipeline.transform(preprocessing(df))
predictions = model.transform(features_vectorized)
# Use the loaded model to make predictions
# Change prediction type
predictions = predictions.withColumn('prediction', col('prediction').cast(IntegerType()))
result_df = predictions.select('prediction', 'probability')
# # Convert the result DataFrame to a JSON string
result_json = result_df.toJSON().collect()
# Return the JSON response to the API call
return json.dumps(result_json)
hana_service_key = './hdiconfig.json'
with open(hana_service_key) as hanaconfig:
hanaconfig_k = json.load(hanaconfig)
port_c = 443
user_c = hanaconfig_k['hana'][0]['credentials']['user']
url_c = hanaconfig_k['hana'][0]['credentials']['host']
pwd_c = hanaconfig_k['hana'][0]['credentials']['password']
schema = hanaconfig_k['hana'][0]['credentials']['schema']
print(port_c, user_c)
conn = dbapi.connect(address=url_c, port=port_c, user=user_c, password=pwd_c,
ENCRYPT=True,
sslValidateCertificate=False)
cc = ConnectionContext(url_c, port_c, user_c, pwd_c,ENCRYPT=True,sslValidateCertificate=False)
cursor = conn.cursor()
current_datetime = dt.datetime.now()
for i in range(len(results_pdf)):
row = results_pdf.loc[i]
done=cursor.execute("insert into DEMO_PREDICTION_HDI_DB_1.PREDICTION Values (?,?,?,?,?,?,?,?,?,?)",
(int(ID), row['CAN'], row['BPTYPE'], row['Payment_Mode'], row['POST_CODE'], float(row["FEATURE0"]), float(row["FEATURE1"]),
float(row["FEATURE2"]),
int(row["prediction"]),str(row["probability"]),str(current_datetime)))
ID+=1
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
User | Count |
---|---|
20 | |
10 | |
9 | |
7 | |
7 | |
7 | |
6 | |
6 | |
5 | |
5 |