.png)
- Article History
- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
06-23-2023 06:13 AM - edited 06-26-2023 10:36 AM
Introduction
When replacing your current Informatica ETL processes, there are many approaches you can take to seamlessly transition to Incorta. This document will cover some of the best practices around how to manage an Informatica migration to Incorta project.
What you should know before reading this article
We recommend that you be familiar with these Incorta concepts before exploring this topic further.
- Spark Materialized Views
- Slowly changing dimensions
- Modeling Schemas, Tables and Join
- Incorta Data engineering
Applies to
These concepts apply to all Incorta versions.
Let’s Go
Let's dive into the concepts that will help you migrate your ETLs to Incorta.
Here are the general steps you can follow to migrate from Informatica ETL to Incorta:
- Evaluate Requirements: Start by understanding your organization's specific requirements, data sources, and business needs. Determine the scope of the migration project and the data sets that need to be migrated.
- Analyze Existing Informatica Workflows: Perform a thorough analysis of your Informatica ETL workflows, including data sources, transformations, and dependencies. Identify the mappings, transformations, and data flows that need to be migrated to Incorta.
- Data Source Analysis: Assess the data sources used in Informatica ETL. Identify the types of databases, file formats, APIs, or cloud platforms that Informatica connects to. Ensure that Incorta supports the same or equivalent data sources.
- Data Extraction: Determine the best approach for extracting data from your existing data sources and loading it into Incorta. This may involve writing custom scripts or using Incorta's connectors to establish connectivity and extract data from the source systems.
- Data Mapping, Transformation & Loading: Here is some guidance:
- Import the base tables referenced in your mapping to Incorta , create the data model in Incorta with joins and formula columns and load the data.
- We can also leverage Incorta data apps for specific application sources as starting points.
- Ideally, you will want this model to help create the dataset equivalent of the Informatica mapping. You can test using a Dashboard or an Analyzer table and see if that suffices.
- If there is any M:M (many to many) or step by step logic in the mapping, then you can create a minimal Pyspark based Materialized view (MV) in Incorta and use that in the model created above. The aim is to maximize the use of the 3NF (third normal form) data model in Incorta.
- For an example, check this Incorta dashboard query plan which has a few MVs in the data model.
- In the MV, create the necessary data transformation logic and mappings to ensure the data is loaded accurately and transformed as required.
- Check the below table on the equivalent transformation we can do in PySprk
Informatica Transformation
PySpark equivalent
Source
The Incorta read() function reads the parquet data into a spark dataframe. Multiple read calls are allowed and they can be joined and executed using a spark sql function. Check for examples in the sample mv code below
Target
The Incorta save() function writes the final spark dataframe into a parquet file, only one save() is allowed.
Check for examples in the sample mv code below
Aggregator
- Use the spark sql function to aggregate data in the dataframes to create a new dataframe or
- Use the dataframe function to aggregate df.select(approx_count_distinct("salary")).collect()[0][0])
Data Masking
Use some of the python functions to do this
Deduplicate
- Use the spark sql function to dedup the data in the dataframes to create a new dataframe or
- Refer to https://sparkbyexamples.com/pyspark/pyspark-distinct-to-drop-duplicates/
Expression
Use python functions to perform calculations on individual rows of data.
Filter
Refer to https://sparkbyexamples.com/pyspark/pyspark-where-filter/ on how to filter data from the dataframe.
Joiner
Use the spark sql function to join dataframes.
Check for examples in the sample mv code below
Lookup
Use the spark sql function to join the lookup table to create a new dataframe
Mapplet
A pyspark function is the equivalent of a mapplet
Normalizer
Use python logic to process data with multiple-occurring fields that returns a row for each instance of the multiple-occurring data.
Rank
Use the spark window function, refer to https://sparkbyexamples.com/pyspark/pyspark-window-functions/
Router
Use python logic to perform this.
Sequence Generator
Refer to https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.sequence.html
Sorter
Refer to https://sparkbyexamples.com/pyspark/pyspark-orderby-and-sort-explained/
Union
Use the spark sql function to merge data from multiple dataframe into a single dataframe.
Check for examples in the sample mv code below
- Data Quality and Validation: Implement data quality checks and validation processes in Incorta to ensure data integrity and accuracy. This may involve implementing data validation rules, data profiling, and cleansing routines.
- Testing and Validation: Perform extensive testing of the migrated data and validate the accuracy and completeness of the migrated workflows and data in Incorta. Compare the results with the original Informatica ETL outputs to ensure consistency.
- User Training and Adoption: Conduct training sessions for users and stakeholders who will be working with Incorta. Familiarize them with the Incorta environment, self-service analytics capabilities, and reporting features.
- Go-Live and Production Deployment: Plan and execute the go-live process for Incorta, ensuring a smooth transition from Informatica to Incorta. Monitor the system after deployment to address any performance issues or data discrepancies that may arise.
- Retire Legacy Processes: Remove access to old Informatica ETL to encourage the adoption of the new Incorta based processes.
- Post-Migration Support: Provide ongoing support and assistance to users as they transition to using Incorta. Address any additional requirements or challenges that may arise post-migration.
Sample MV code
# EBS AR Aging mv code
import pyspark.sql.functions as F
df_HZ_CUST_ACCOUNTS = read("EBS_PARTY_COMMON.HZ_CUST_ACCOUNTS")
df_HZ_PARTIES = read("EBS_PARTY_COMMON.HZ_PARTIES")
df_AR_PAYMENT_SCHEDULES_ALL = read("EBS_AR.AR_PAYMENT_SCHEDULES_ALL")
df_RA_CUST_TRX_LINE_GL_DIST_ALL = read("EBS_AR.RA_CUST_TRX_LINE_GL_DIST_ALL")
df_AR_RECEIVABLE_APPLICATIONS_ALL = read("EBS_AR.AR_RECEIVABLE_APPLICATIONS_ALL")
df_AR_ADJUSTMENTS_ALL = read("EBS_AR.AR_ADJUSTMENTS_ALL")
df_AR_CASH_RECEIPT_HISTORY_ALL = read("EBS_AR.AR_CASH_RECEIPT_HISTORY_ALL")
df_HR_OPERATING_UNITS = read("EBS_HR_COMMON.HR_OPERATING_UNITS")
df_AR_CASH_RECEIPTS_ALL = read("EBS_AR.AR_CASH_RECEIPTS_ALL")
df_GL_LEDGERS = read("EBS_FIN_COMMON.GL_LEDGERS")
df_RA_CUSTOMER_TRX_ALL = read("EBS_AR.RA_CUSTOMER_TRX_ALL")
df_HZ_CUST_ACCOUNTS.createOrReplaceTempView("HZ_CUST_ACCOUNTS")
df_HZ_PARTIES.createOrReplaceTempView("HZ_PARTIES")
df_AR_PAYMENT_SCHEDULES_ALL.createOrReplaceTempView("AR_PAYMENT_SCHEDULES_ALL")
df_RA_CUST_TRX_LINE_GL_DIST_ALL.createOrReplaceTempView("RA_CUST_TRX_LINE_GL_DIST_ALL")
df_AR_RECEIVABLE_APPLICATIONS_ALL.createOrReplaceTempView("AR_RECEIVABLE_APPLICATIONS_ALL")
df_AR_ADJUSTMENTS_ALL.createOrReplaceTempView("AR_ADJUSTMENTS_ALL")
df_AR_CASH_RECEIPT_HISTORY_ALL.createOrReplaceTempView("AR_CASH_RECEIPT_HISTORY_ALL")
df_HR_OPERATING_UNITS.createOrReplaceTempView("HR_OPERATING_UNITS")
df_AR_CASH_RECEIPTS_ALL.createOrReplaceTempView("AR_CASH_RECEIPTS_ALL")
df_GL_LEDGERS.createOrReplaceTempView("GL_LEDGERS")
df_RA_CUSTOMER_TRX_ALL.createOrReplaceTempView("RA_CUSTOMER_TRX_ALL")
df_PTY = spark.sql("""SELECT CA.CUST_ACCOUNT_ID
FROM HZ_CUST_ACCOUNTS CA
JOIN HZ_PARTIES PT
ON PT.PARTY_ID = CA.PARTY_ID""")
df_HIST = spark.sql("""SELECT BASE.PAYMENT_SCHEDULE_ID ,
BASE.AS_OF_DATE ,
COALESCE(
CASE
WHEN PSA.GL_DATE_CLOSED <= BASE.AS_OF_DATE
THEN ROUND(PSA.AMOUNT_DUE_REMAINING,2)
END , SUM(BASE.AMT_DOC) OVER (PARTITION BY BASE.PAYMENT_SCHEDULE_ID ORDER BY BASE.AS_OF_DATE)) AS AMT_REM_AS_OF ,
COALESCE(
CASE
WHEN PSA.GL_DATE_CLOSED <= BASE.AS_OF_DATE
THEN ROUND(PSA.AMOUNT_DUE_REMAINING * COALESCE(PSA.EXCHANGE_RATE,0),2)
END , SUM(BASE.AMT_FUNC) OVER (PARTITION BY BASE.PAYMENT_SCHEDULE_ID ORDER BY BASE.AS_OF_DATE)) AS AMT_REM_AS_OF_FUNC
FROM AR_PAYMENT_SCHEDULES_ALL PSA
JOIN
(SELECT PAYMENT_SCHEDULE_ID ,
AS_OF_DATE ,
SUM(AMT_DOC) AS AMT_DOC ,
SUM(AMT_FUNC) AS AMT_FUNC
FROM
(SELECT PS1.PAYMENT_SCHEDULE_ID ,
PS1.GL_DATE AS AS_OF_DATE ,
SUM(PS1.AMOUNT_DUE_ORIGINAL) AS AMT_DOC ,
SUM(COALESCE(GLDA.ACCTD_AMOUNT , ROUND(PS1.AMOUNT_DUE_ORIGINAL * COALESCE(PS1.EXCHANGE_RATE , 1) , 2))) AS AMT_FUN
FROM AR_PAYMENT_SCHEDULES_ALL PS1
LEFT JOIN RA_CUST_TRX_LINE_GL_DIST_ALL GLDA
ON GLDA.CUSTOMER_TRX_ID = PS1.CUSTOMER_TRX_ID
AND GLDA.CUSTOMER_TRX_LINE_ID IS NULL
AND GLDA.LATEST_REC_FLAG = 'Y'
WHERE 1 =1
AND PS1.AMOUNT_DUE_ORIGINAL != 0
AND PS1.GL_DATE_CLOSED > PS1.GL_DATE
GROUP BY PS1.PAYMENT_SCHEDULE_ID ,
PS1.GL_DATE
UNION ALL
SELECT RA1.APPLIED_PAYMENT_SCHEDULE_ID ,
RA1.GL_DATE ,
SUM(RA1.AMOUNT_APPLIED) AS AMT_DOC ,
SUM(RA1.ACCTD_AMOUNT_APPLIED_TO) AS AMT_FUNC
FROM AR_RECEIVABLE_APPLICATIONS_ALL RA1
WHERE RA1.AMOUNT_APPLIED != 0
AND RA1.STATUS IN ('APP','ACTIVITY')
AND RA1.APPLIED_PAYMENT_SCHEDULE_ID IS NOT NULL
GROUP BY RA1.APPLIED_PAYMENT_SCHEDULE_ID ,
RA1.GL_DATE
UNION ALL
SELECT RA2.PAYMENT_SCHEDULE_ID ,
RA2.GL_DATE ,
SUM(COALESCE(RA2.AMOUNT_APPLIED_FROM , RA2.AMOUNT_APPLIED)) AS AMT_DOC ,
SUM(RA2.ACCTD_AMOUNT_APPLIED_FROM) AS AMT_FUNC
FROM AR_RECEIVABLE_APPLICATIONS_ALL RA2
WHERE RA2.AMOUNT_APPLIED != 0
AND RA2.STATUS IN ('APP','ACTIVITY')
GROUP BY RA2.PAYMENT_SCHEDULE_ID ,
RA2.GL_DATE
UNION ALL
SELECT AA1.PAYMENT_SCHEDULE_ID ,
AA1.GL_DATE ,
SUM(AA1.AMOUNT) AS AMT_DOC ,
SUM(AA1.ACCTD_AMOUNT) AS AMT_FUNC
FROM AR_ADJUSTMENTS_ALL AA1
WHERE AA1.AMOUNT != 0
AND AA1.STATUS = 'A'
GROUP BY AA1.PAYMENT_SCHEDULE_ID ,
AA1.GL_DATE
UNION ALL
SELECT PSA.PAYMENT_SCHEDULE_ID ,
R1.GL_DATE ,
0 AS AMT_DOC ,
0 AS AMT_FUNC
FROM AR_CASH_RECEIPT_HISTORY_ALL R1
JOIN AR_PAYMENT_SCHEDULES_ALL PSA
ON R1.CASH_RECEIPT_ID = PSA.CASH_RECEIPT_ID
WHERE R1.STATUS = 'REVERSED'
) PRE
GROUP BY PAYMENT_SCHEDULE_ID ,
AS_OF_DATE
) BASE ON BASE.PAYMENT_SCHEDULE_ID = PSA.PAYMENT_SCHEDULE_ID
""")
df_PTY.createOrReplaceTempView("PTY")
df_HIST.createOrReplaceTempView("HIST")
df = spark.sql("""SELECT PSA.TRX_NUMBER AS INVOICE_NUM ,
HIST.PAYMENT_SCHEDULE_ID ,
PSA.GL_DATE ,
HIST.AS_OF_DATE ,
CASE WHEN REV.STATUS IS NULL THEN HIST.AMT_REM_AS_OF ELSE 0 END AS AMT_REM_AS_OF,
CASE WHEN REV.STATUS IS NULL THEN HIST.AMT_REM_AS_OF_FUNC ELSE 0 END AS AMT_REM_AS_OF_FUNC,
PSA.LAST_UPDATE_DATE ,
PSA.CUSTOMER_TRX_ID ,
PSA.CASH_RECEIPT_ID ,
GL.CURRENCY_CODE AS CURRENCY_CODE_FUNC ,
COALESCE(RCTA.BILL_TO_SITE_USE_ID ,PSA.CUSTOMER_SITE_USE_ID ,ACRA.CUSTOMER_SITE_USE_ID) AS BILL_TO_SITE_USE_ID ,
RCTA.SOLD_TO_SITE_USE_ID ,
RCTA.SHIP_TO_SITE_USE_ID ,
COALESCE(RCTA.BILL_TO_CUSTOMER_ID ,PSA.CUSTOMER_ID ,ACRA.PAY_FROM_CUSTOMER) AS BILL_TO_CUSTOMER_ID ,
RCTA.SOLD_TO_CUSTOMER_ID ,
RCTA.SHIP_TO_CUSTOMER_ID,
GLDA.CODE_COMBINATION_ID,
COALESCE(PSA.EXCHANGE_DATE ,GLDA.GL_DATE ,PSA.GL_DATE) AS AR_FX_DATE,
GL.ACCOUNTED_PERIOD_TYPE PERIOD_TYPE,
GL.PERIOD_SET_NAME
FROM AR_PAYMENT_SCHEDULES_ALL PSA
JOIN HIST
ON HIST.PAYMENT_SCHEDULE_ID = PSA.PAYMENT_SCHEDULE_ID
LEFT JOIN HR_OPERATING_UNITS ORG
ON ORG.ORGANIZATION_ID = PSA.ORG_ID
LEFT JOIN AR_CASH_RECEIPTS_ALL ACRA
ON ACRA.CASH_RECEIPT_ID = PSA.CASH_RECEIPT_ID
LEFT JOIN AR_CASH_RECEIPT_HISTORY_ALL REV
ON REV.CASH_RECEIPT_ID = PSA.CASH_RECEIPT_ID
AND REV.STATUS = 'REVERSED'
AND REV.GL_DATE <= HIST.AS_OF_DATE
LEFT JOIN GL_LEDGERS GL
ON GL.LEDGER_ID = ORG.SET_OF_BOOKS_ID
LEFT JOIN RA_CUSTOMER_TRX_ALL RCTA
ON RCTA.CUSTOMER_TRX_ID = PSA.CUSTOMER_TRX_ID
LEFT JOIN PTY BILP
ON BILP.CUST_ACCOUNT_ID = COALESCE(RCTA.BILL_TO_CUSTOMER_ID ,PSA.CUSTOMER_ID ,ACRA.PAY_FROM_CUSTOMER)
LEFT JOIN PTY SHIP
ON SHIP.CUST_ACCOUNT_ID = RCTA.SHIP_TO_CUSTOMER_ID
LEFT JOIN PTY SOLP
ON SOLP.CUST_ACCOUNT_ID = RCTA.SOLD_TO_CUSTOMER_ID
LEFT JOIN RA_CUST_TRX_LINE_GL_DIST_ALL GLDA
ON GLDA.CUSTOMER_TRX_ID = PSA.CUSTOMER_TRX_ID
AND GLDA.CUSTOMER_TRX_LINE_ID IS NULL
AND GLDA.LATEST_REC_FLAG = 'Y'
""")
save(df)
Related Material