cancel
Showing results for 
Search instead for 
Did you mean: 
Read only

JSON Normalize fails for Nested Json array

rajeshps
Participant
0 Likes
6,136

Hello Team,

For below nested json, the array is not getting normalized using

import pandas as pd
import json

def on_input(data):

df = pd.read_json(data)df = pd.json_normalize(json.loads(data))

api.send("output", df.to_json(orient="records"))

api.set_port_callback("input1", on_input)

I also tried max_level and then record_path but no luck.

Flow: Kafka producer(json string) -> avro decoder -> Python3 -> Hana Client

Is there any way to normalise the json to check iF Then else condition & then updated suffix for column field and eventually updated to DB with no duplicates. Here header.poNumber and data.id are primary keys/unique identifiers.

Example:

 [
   {
      "header.poNumber":"9023496",
      "data.id":"10013459",
      "message.source":[
         {
            "createSource":null,
            "timeStamp":"2023-05-12T19:30:00.0000000+02:00",
            "type":"full"
         },
         {
            "createSource":"testdev",
            "timeStamp":"2023-05-11T19:30:00.0000000+02:00",
            "type":"ordersEstimated"
         },
         {
            "createSource": "event",
            "timeStamp":"2023-05-12T12:30:00.0000000+01:00",
            "type":"ordersCreated"
         }
      ],
      "message.time":[
         {
            "timeSource":"UTC",
            "typeId":"full"
         },
         {
            "timeSource":"IST",
            "typeId":"actual"
         }
      ]
   }
]

Expected output:

View Entire Topic
Vitaliy-R
Developer Advocate
Developer Advocate
0 Likes

I do not think you can simply normalize this record because it contains two arrays of dictionaries: `"message.time"` and `"message.source"`.

From the "expected output", I understand that you want to join values from `"message.time"` to values from `"message.source"` on the `type` attribute to create records.

So, I think you need to flatten two arrays into separate Pandas DataFrames, and then merge them on keys. Something like:

data_as_json=json.loads(data)<br>df_source=pd.json_normalize(data_as_json, record_path='message.source', meta=['header.poNumber', 'data.id'])
df_time=pd.json_normalize(data_as_json, record_path='message.time', meta=['header.poNumber', 'data.id'])
df=df_source.merge(df_time, left_on=['header.poNumber','data.id','type'], right_on=['header.poNumber','data.id','typeId'])

Here are my tests:

Regards,
-Vitaliy

rajeshps
Participant
0 Likes

Thanks alot for your kind reply vitaliy.rudnytskiy

. Output i'm expecting only one row as mentioned above.

While normalizing consider message.source IF type = 'ordersEstimated' THEN update message.source.timeStamp1 and message.source.type1

IF type = 'ordersCreated' THEN update message.source.timeStamp2 and message.source.type2

similarly for message.time IF typeId = 'actual' THEN update message.time.timeStamp and message.time.typeId

<strong>header.poNumber', </strong><strong>'data.id'</strong> are primary keys so no duplicates when updating to HANA Database. Above is one event (1 record)
Flow looks like Kafka producer -> avro decoder -> python3 -> hana client

vitaliy.rudnytskiy