
I have decided to develop the SCD type 2 using the Python3 operator and the main library that will be utilised is Pandas.
The code below will now do the remaining work, I have commented at each step so that it is clear what is being done at each stage.
#Import libraries
from io import StringIO
import pandas as pd
import numpy as np
def on_input(sourceInput, dimInput):
# Read CSV's using StringIO
sourceData = StringIO(str(sourceInput.body,'utf-8'))
dimData = StringIO(str(dimInput.body,'utf-8'))
# Read CSV's into dataframe
df_sourceData = pd.read_csv(sourceData)
df_dimData = pd.read_csv(dimData)
#Get Max DimKey
maxDimKey = df_dimData['DimKey'].max()
#Filter only Currnet records from Dimension
df_Dim_Is_Current = df_dimData[(df_dimData["IsCurrent"]==1)]
#Left Join dataframes on keyfields
df_merge_col = pd.merge(df_sourceData, df_dimData, on='circuitId', how='left')
#Fix Datatypes
df_merge_col['DimKey'] = df_merge_col['DimKey'].astype(pd.Int64Dtype())
df_merge_col['ValidFrom'] = df_merge_col['ValidFrom'].astype(pd.Int64Dtype())
df_merge_col['ValidTo'] = df_merge_col['ValidTo'].astype(pd.Int64Dtype())
df_merge_col['IsCurrent'] = df_merge_col['IsCurrent'].astype(pd.Int16Dtype())
#Identify new records By checking if DimKey IsNull
new_records_filter = pd.isnull(df_merge_col["DimKey"])
#Create dataframe for new records
df_new_records = df_merge_col[new_records_filter]
#Join datafrome and exclude duplicates (remove new records)
df_excluding_new = pd.concat([df_merge_col, df_new_records]).drop_duplicates(keep=False)
##Identify SCD Type 2 records By comparing SCD2 fields in source and dimension
df_scd2_records = df_excluding_new[
(df_excluding_new["name_x"]!=df_excluding_new["name_y"]) |
(df_excluding_new["circuitRef_x"]!=df_excluding_new["circuitRef_y"])|
(df_excluding_new["location_x"]!=df_excluding_new["location_y"])|
(df_excluding_new["country_x"]!=df_excluding_new["country_y"])]
#Join datafrome and exclude duplicates (remove scd2 records)
df_excluding_new_scd2 = pd.concat([df_excluding_new, df_scd2_records]).drop_duplicates(keep=False)
#Identify SCD Type 1 Records By comparing SCD1 fields in source and dimension
df_scd1_records = df_excluding_new[(df_excluding_new["url_x"]!=df_excluding_new["url_y"])]
#Join datafrome and exclude duplicates (remove scd1 records - no change records remaining)
df_no_change_records = pd.concat([df_excluding_new_scd2, df_scd1_records]).drop_duplicates(keep=False)
#Rename required No Change Fields
df_no_change_rename = df_no_change_records.rename(columns={
"circuitRef_x":"circuitRef","name_x":"name" ,"location_x":"location","country_x":"country","url_x":"url"
})
#Select required No Change Fields fields
df_no_change_final = df_no_change_rename[[
'DimKey','circuitId','circuitRef','name','location','url','ValidFrom','ValidTo','IsCurrent'
]]
#Rename required SCD1 Fields
df_scd1_rename = df_scd1_records.rename(columns={
"circuitRef_x":"circuitRef","name_x":"name","location_x":"location","country_x":"country","url_x":"url"
})
#Select required SCD1 Fields
df_scd1_final = df_scd1_rename[[
'DimKey','circuitId','circuitRef','name','location','url','ValidFrom','ValidTo','IsCurrent'
]]
#Rename required SCD2 New Fields
df_scd2New_rename = df_scd2_records.rename(columns={
"circuitRef_x":"circuitRef","name_x":"name","location_x":"location","country_x":"country","url_x":"url"
})
#Update SCD2 New ValidFrom
df_scd2New_rename["ValidFrom"] = pd.to_datetime('today').strftime("%Y%m%d")
df_scd2New_rename["IsCurrrent"] = '1'
#Select required SCD2 New Fields
df_scd2new_final = df_scd2New_rename[[
'DimKey','circuitId','circuitRef','name','location','url','ValidFrom','ValidTo','IsCurrent'
]]
#Rename required SCD2 Old Fields
df_scd2Old_rename = df_scd2_records.rename(columns={
"circuitRef_y":"circuitRef","name_y":"name","location_y":"location","country_y":"country","url_y":"url"
})
#Update SCD2 Old ValidTo and IsCurrent
TodaysDate = pd.Timestamp('today')
df_scd2Old_rename["ValidTo"] = pd.to_datetime('today').strftime("%Y%m%d")
df_scd2Old_rename["IsCurrent"] = 0
#Select required SCD2 Old Fields
df_scd2old_final = df_scd2Old_rename[['DimKey','circuitId','circuitRef','name','location','url','ValidFrom','ValidTo','IsCurrent']]
#Rename required New record Fields
df_New_rename = df_new_records.rename(columns={
"circuitRef_x":"circuitRef","name_x":"name","location_x":"location","country_x":"country","url_x":"url"
})
#Update New records ValidFrom
df_New_rename["ValidFrom"] = 19000101
df_New_rename["ValidTo"] = 99991231
df_New_rename["IsCurrent"] = 1
#Select required New record Fields
df_new_final = df_New_rename[['DimKey','circuitId','circuitRef','name','location','url','ValidFrom','ValidTo','IsCurrent']]
#Union scd2 new and new records set DimKey
df_new_new_scd2 = df_allframes = [df_scd2new_final,df_new_final]
df_new_new_scd2_concat = pd.concat(df_new_new_scd2)
df_new_new_scd2_concat['DimKey'] = np.arange(len(df_new_new_scd2_concat))+1+maxDimKey
#Union All Dataframes
df_allframes = [df_scd2old_final,df_scd1_final,df_no_change_final,df_new_new_scd2_concat]
df_allframes_concat = pd.concat(df_allframes)
#Sort the data
df_all_sort = df_allframes_concat.sort_values(["circuitId", "ValidFrom","ValidTo"], ascending = (True,True,True))
#Write to CSV
csv = df_all_sort.to_csv(index=False)
#Send csv to port dimOutput
api.send("dimOutput", csv)
api.set_port_callback(["sourceInput","dimInput"], on_input)
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
User | Count |
---|---|
11 | |
10 | |
9 | |
6 | |
6 | |
5 | |
4 | |
4 | |
4 | |
3 |