2

How to make a REST api call from a Materialized View and load its data

Incorta supports many connectors but it can happen that there is a particular cloud application for which we do not have a connector yet. For eg Pendo is one such application , it does have a REST api framework which we can use to query the data from a Incorta Materialized View (MV) , save the data in a Pandas dataframe, process the data and then convert it to a Spark Dataframe to save the MV. Here is a sample code - 

#How to query the Features data  from Pendo via a MV
#The program makes a rest api call, saves the data in a Pandas dataframe , processes it
# and converts to a Spark dataframe for saving into a MV

#Import all the modules and functions required for this script, install them if they are not installed

import requests
import json
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import col
import pandas as pd
from pandas.io.json import json_normalize

#Pendo API Request call to get the Features
url = "https://app.pendo.io/api/v1/aggregation"
data = "{\"response\":{\"mimeType\":\"application/json\"},\"request\":{\"pipeline\":[{\"source\":{\"features\":null}}]}}"
headers = {
    'x-pendo-integration-key': "<pass the pendo key here>",
    'content-type': "application/json"
}
response = requests.post(url, data = data, headers = headers)

#Stores the response request from API into a variable
content = response.json()

#Flattening the JSON data into Pandas Dataframe
data = json_normalize(content['results'])

#Performing the column selection from Pandas Dataframe
data = data[['appId','appWide','color','createdAt','createdByUser.deletedAt','createdByUser.first','createdByUser.id','createdByUser.last',
'createdByUser.role','createdByUser.userType','createdByUser.username','createdByUser.visitorIds','dirty','elementPathRules','eventPropertyConfigurations',
'group.color','group.createdAt','group.createdByUser.deletedAt','group.createdByUser.first','group.createdByUser.id','group.createdByUser.last',
'group.createdByUser.role','group.createdByUser.userType','group.createdByUser.username','group.createdByUser.visitorIds','group.description',
'group.id','group.items','group.lastUpdatedAt','group.lastUpdatedByUser.first','group.lastUpdatedByUser.id','group.lastUpdatedByUser.last','group.lastUpdatedByUser.role',
'group.lastUpdatedByUser.userType','group.lastUpdatedByUser.username','group.lastUpdatedByUser.visitorIds','group.length','group.name','id','kind','lastUpdatedAt','lastUpdatedByUser.deletedAt','lastUpdatedByUser.first','lastUpdatedByUser.id',
'lastUpdatedByUser.last','lastUpdatedByUser.role','lastUpdatedByUser.userType','lastUpdatedByUser.username','lastUpdatedByUser.visitorIds',
'name','pageId','rootVersionId','stableVersionId','validThrough'
]]

#Defining Schema structure for Spark dataframe
schema = StructType([
    StructField('appId', StringType(), True),
    StructField('appWide', StringType(), True),
    StructField('color', StringType(), True),
    StructField('createdAt', StringType(), True),
    StructField('createdByUser_deletedAt', StringType(), True),
    StructField('createdByUser_firstname', StringType(), True),
    StructField('createdByUser_id', StringType(), True),
    StructField('createdByUser_lastname', StringType(), True),
    StructField('createdByUser_role', IntegerType(), True),
    StructField('createdByUser_userType', StringType(), True),
    StructField('createdByUser_username', StringType(), True),
    StructField('createdByUser_visitorIds', StringType(), True),
    StructField('dirty', StringType(), True),
    StructField('elementPathRules', StringType(), True),
    StructField('eventPropertyConfigurations', StringType(), True),
    StructField('group_color', StringType(), True),
    StructField('group_createdAt', StringType(), True),
    StructField('group_createdByUser_deletedAt', StringType(), True),
    StructField('group_createdByUser_firstname', StringType(), True),
    StructField('group_createdByUser_id', StringType(), True),
    StructField('group_createdByUser_lastname', StringType(), True),
    StructField('group_createdByUser_role', FloatType(), True),
    StructField('group_createdByUser_userType', StringType(), True),
    StructField('group_createdByUser_username', StringType(), True),
    StructField('group_createdByUser_visitorIds', StringType(), True),
    StructField('group_description', StringType(), True),
    StructField('group_id', StringType(), True),
    StructField('group_items', StringType(), True),
    StructField('group_lastUpdatedAt', StringType(), True),
    StructField('group_lastUpdatedByUser_firstname', StringType(), True),
    StructField('group_lastUpdatedByUser_id', StringType(), True),
    StructField('group_lastUpdatedByUser_lastname', StringType(), True),
    StructField('group_lastUpdatedByUser_role', FloatType(), True),
    StructField('group_lastUpdatedByUser_userType', StringType(), True),
    StructField('group_lastUpdatedByUser_username', StringType(), True),
    StructField('group_lastUpdatedByUser_visitorIds', StringType(), True),
    StructField('group_length', IntegerType(), True),
    StructField('group_name', StringType(), True),
    StructField('id', StringType(), True),
    StructField('kind', StringType(), True),
    StructField('lastUpdatedAt', StringType(), True),
    StructField('lastUpdatedByUser_deletedAt', StringType(), True),
    StructField('lastUpdatedByUser_firstname', StringType(), True),
    StructField('lastUpdatedByUser_id', StringType(), True),
    StructField('lastUpdatedByUser_lastname', StringType(), True),
    StructField('lastUpdatedByUser_role', IntegerType(), True),
    StructField('lastUpdatedByUser_userType', StringType(), True),
    StructField('lastUpdatedByUser_username', StringType(), True),
    StructField('lastUpdatedByUser_visitorIds', StringType(), True),
    StructField('name', StringType(), True),
    StructField('pageId', StringType(), True),
    StructField('rootVersionId', StringType(), True),
    StructField('stableVersionId', StringType(), True),
    StructField('validThrough', StringType(), True),
    ])

#Creating spark dataframe
df = spark.createDataFrame(data,schema)

#Replace the NaN valkues with null when transformed from Pandas to Spark dataframe
df = df.replace('NaN', None).replace(float('NaN'), None)

#Transforming epoch time to timestamp
df = (df
    .withColumn('createdByUser_deletedAt', (col('createdByUser_deletedAt')/1000).cast('timestamp'))
    .withColumn('createdAt', (col('createdAt')/1000).cast('timestamp'))
    .withColumn('lastUpdatedByUser_deletedAt', (col('lastUpdatedByUser_deletedAt')/1000).cast('timestamp'))
    .withColumn('lastUpdatedAt', (col('lastUpdatedAt')/1000).cast('timestamp'))
    .withColumn('group_createdAt', (col('group_createdAt')/1000).cast('timestamp'))
    .withColumn('group_lastUpdatedAt', (col('group_lastUpdatedAt')/1000).cast('timestamp'))
    .withColumn('group_createdByUser_deletedAt', (col('group_createdByUser_deletedAt')/1000).cast('timestamp'))
    .withColumn('validThrough', (col('validThrough')/1000).cast('timestamp'))
    )

#"Save" is Incorta method to store the dataframe data
save(df)
Reply Oldest first
  • Oldest first
  • Newest first
  • Active threads
  • Popular
Like2 Follow
  • 2 Likes
  • 6 days agoLast active
  • 192Views
  • 2 Following

Product Announcement

Incorta 2014.2 is now available!