06-23-2023 06:13 AM - edited 06-26-2023 10:36 AM
When replacing your current Informatica ETL processes, there are many approaches you can take to seamlessly transition to Incorta. This document will cover some of the best practices around how to manage an Informatica migration to Incorta project.
We recommend that you be familiar with these Incorta concepts before exploring this topic further.
These concepts apply to all Incorta versions.
Let's dive into the concepts that will help you migrate your ETLs to Incorta.
Here are the general steps you can follow to migrate from Informatica ETL to Incorta:
Informatica Transformation |
PySpark equivalent |
Source |
The Incorta read() function reads the parquet data into a spark dataframe. Multiple read calls are allowed and they can be joined and executed using a spark sql function. Check for examples in the sample mv code below |
Target |
The Incorta save() function writes the final spark dataframe into a parquet file, only one save() is allowed. Check for examples in the sample mv code below |
Aggregator |
|
Data Masking |
Use some of the python functions to do this |
Deduplicate |
|
Expression |
Use python functions to perform calculations on individual rows of data. |
Filter |
Refer to https://sparkbyexamples.com/pyspark/pyspark-where-filter/ on how to filter data from the dataframe. |
Joiner |
Use the spark sql function to join dataframes. Check for examples in the sample mv code below |
Lookup |
Use the spark sql function to join the lookup table to create a new dataframe |
Mapplet |
A pyspark function is the equivalent of a mapplet |
Normalizer |
Use python logic to process data with multiple-occurring fields that returns a row for each instance of the multiple-occurring data. |
Rank |
Use the spark window function, refer to https://sparkbyexamples.com/pyspark/pyspark-window-functions/ |
Router |
Use python logic to perform this. |
Sequence Generator |
Refer to https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.sequence.html |
Sorter |
Refer to https://sparkbyexamples.com/pyspark/pyspark-orderby-and-sort-explained/ |
Union |
Use the spark sql function to merge data from multiple dataframe into a single dataframe. Check for examples in the sample mv code below |
# EBS AR Aging mv code
import pyspark.sql.functions as F
df_HZ_CUST_ACCOUNTS = read("EBS_PARTY_COMMON.HZ_CUST_ACCOUNTS")
df_HZ_PARTIES = read("EBS_PARTY_COMMON.HZ_PARTIES")
df_AR_PAYMENT_SCHEDULES_ALL = read("EBS_AR.AR_PAYMENT_SCHEDULES_ALL")
df_RA_CUST_TRX_LINE_GL_DIST_ALL = read("EBS_AR.RA_CUST_TRX_LINE_GL_DIST_ALL")
df_AR_RECEIVABLE_APPLICATIONS_ALL = read("EBS_AR.AR_RECEIVABLE_APPLICATIONS_ALL")
df_AR_ADJUSTMENTS_ALL = read("EBS_AR.AR_ADJUSTMENTS_ALL")
df_AR_CASH_RECEIPT_HISTORY_ALL = read("EBS_AR.AR_CASH_RECEIPT_HISTORY_ALL")
df_HR_OPERATING_UNITS = read("EBS_HR_COMMON.HR_OPERATING_UNITS")
df_AR_CASH_RECEIPTS_ALL = read("EBS_AR.AR_CASH_RECEIPTS_ALL")
df_GL_LEDGERS = read("EBS_FIN_COMMON.GL_LEDGERS")
df_RA_CUSTOMER_TRX_ALL = read("EBS_AR.RA_CUSTOMER_TRX_ALL")
df_HZ_CUST_ACCOUNTS.createOrReplaceTempView("HZ_CUST_ACCOUNTS")
df_HZ_PARTIES.createOrReplaceTempView("HZ_PARTIES")
df_AR_PAYMENT_SCHEDULES_ALL.createOrReplaceTempView("AR_PAYMENT_SCHEDULES_ALL")
df_RA_CUST_TRX_LINE_GL_DIST_ALL.createOrReplaceTempView("RA_CUST_TRX_LINE_GL_DIST_ALL")
df_AR_RECEIVABLE_APPLICATIONS_ALL.createOrReplaceTempView("AR_RECEIVABLE_APPLICATIONS_ALL")
df_AR_ADJUSTMENTS_ALL.createOrReplaceTempView("AR_ADJUSTMENTS_ALL")
df_AR_CASH_RECEIPT_HISTORY_ALL.createOrReplaceTempView("AR_CASH_RECEIPT_HISTORY_ALL")
df_HR_OPERATING_UNITS.createOrReplaceTempView("HR_OPERATING_UNITS")
df_AR_CASH_RECEIPTS_ALL.createOrReplaceTempView("AR_CASH_RECEIPTS_ALL")
df_GL_LEDGERS.createOrReplaceTempView("GL_LEDGERS")
df_RA_CUSTOMER_TRX_ALL.createOrReplaceTempView("RA_CUSTOMER_TRX_ALL")
df_PTY = spark.sql("""SELECT CA.CUST_ACCOUNT_ID
FROM HZ_CUST_ACCOUNTS CA
JOIN HZ_PARTIES PT
ON PT.PARTY_ID = CA.PARTY_ID""")
df_HIST = spark.sql("""SELECT BASE.PAYMENT_SCHEDULE_ID ,
BASE.AS_OF_DATE ,
COALESCE(
CASE
WHEN PSA.GL_DATE_CLOSED <= BASE.AS_OF_DATE
THEN ROUND(PSA.AMOUNT_DUE_REMAINING,2)
END , SUM(BASE.AMT_DOC) OVER (PARTITION BY BASE.PAYMENT_SCHEDULE_ID ORDER BY BASE.AS_OF_DATE)) AS AMT_REM_AS_OF ,
COALESCE(
CASE
WHEN PSA.GL_DATE_CLOSED <= BASE.AS_OF_DATE
THEN ROUND(PSA.AMOUNT_DUE_REMAINING * COALESCE(PSA.EXCHANGE_RATE,0),2)
END , SUM(BASE.AMT_FUNC) OVER (PARTITION BY BASE.PAYMENT_SCHEDULE_ID ORDER BY BASE.AS_OF_DATE)) AS AMT_REM_AS_OF_FUNC
FROM AR_PAYMENT_SCHEDULES_ALL PSA
JOIN
(SELECT PAYMENT_SCHEDULE_ID ,
AS_OF_DATE ,
SUM(AMT_DOC) AS AMT_DOC ,
SUM(AMT_FUNC) AS AMT_FUNC
FROM
(SELECT PS1.PAYMENT_SCHEDULE_ID ,
PS1.GL_DATE AS AS_OF_DATE ,
SUM(PS1.AMOUNT_DUE_ORIGINAL) AS AMT_DOC ,
SUM(COALESCE(GLDA.ACCTD_AMOUNT , ROUND(PS1.AMOUNT_DUE_ORIGINAL * COALESCE(PS1.EXCHANGE_RATE , 1) , 2))) AS AMT_FUN
FROM AR_PAYMENT_SCHEDULES_ALL PS1
LEFT JOIN RA_CUST_TRX_LINE_GL_DIST_ALL GLDA
ON GLDA.CUSTOMER_TRX_ID = PS1.CUSTOMER_TRX_ID
AND GLDA.CUSTOMER_TRX_LINE_ID IS NULL
AND GLDA.LATEST_REC_FLAG = 'Y'
WHERE 1 =1
AND PS1.AMOUNT_DUE_ORIGINAL != 0
AND PS1.GL_DATE_CLOSED > PS1.GL_DATE
GROUP BY PS1.PAYMENT_SCHEDULE_ID ,
PS1.GL_DATE
UNION ALL
SELECT RA1.APPLIED_PAYMENT_SCHEDULE_ID ,
RA1.GL_DATE ,
SUM(RA1.AMOUNT_APPLIED) AS AMT_DOC ,
SUM(RA1.ACCTD_AMOUNT_APPLIED_TO) AS AMT_FUNC
FROM AR_RECEIVABLE_APPLICATIONS_ALL RA1
WHERE RA1.AMOUNT_APPLIED != 0
AND RA1.STATUS IN ('APP','ACTIVITY')
AND RA1.APPLIED_PAYMENT_SCHEDULE_ID IS NOT NULL
GROUP BY RA1.APPLIED_PAYMENT_SCHEDULE_ID ,
RA1.GL_DATE
UNION ALL
SELECT RA2.PAYMENT_SCHEDULE_ID ,
RA2.GL_DATE ,
SUM(COALESCE(RA2.AMOUNT_APPLIED_FROM , RA2.AMOUNT_APPLIED)) AS AMT_DOC ,
SUM(RA2.ACCTD_AMOUNT_APPLIED_FROM) AS AMT_FUNC
FROM AR_RECEIVABLE_APPLICATIONS_ALL RA2
WHERE RA2.AMOUNT_APPLIED != 0
AND RA2.STATUS IN ('APP','ACTIVITY')
GROUP BY RA2.PAYMENT_SCHEDULE_ID ,
RA2.GL_DATE
UNION ALL
SELECT AA1.PAYMENT_SCHEDULE_ID ,
AA1.GL_DATE ,
SUM(AA1.AMOUNT) AS AMT_DOC ,
SUM(AA1.ACCTD_AMOUNT) AS AMT_FUNC
FROM AR_ADJUSTMENTS_ALL AA1
WHERE AA1.AMOUNT != 0
AND AA1.STATUS = 'A'
GROUP BY AA1.PAYMENT_SCHEDULE_ID ,
AA1.GL_DATE
UNION ALL
SELECT PSA.PAYMENT_SCHEDULE_ID ,
R1.GL_DATE ,
0 AS AMT_DOC ,
0 AS AMT_FUNC
FROM AR_CASH_RECEIPT_HISTORY_ALL R1
JOIN AR_PAYMENT_SCHEDULES_ALL PSA
ON R1.CASH_RECEIPT_ID = PSA.CASH_RECEIPT_ID
WHERE R1.STATUS = 'REVERSED'
) PRE
GROUP BY PAYMENT_SCHEDULE_ID ,
AS_OF_DATE
) BASE ON BASE.PAYMENT_SCHEDULE_ID = PSA.PAYMENT_SCHEDULE_ID
""")
df_PTY.createOrReplaceTempView("PTY")
df_HIST.createOrReplaceTempView("HIST")
df = spark.sql("""SELECT PSA.TRX_NUMBER AS INVOICE_NUM ,
HIST.PAYMENT_SCHEDULE_ID ,
PSA.GL_DATE ,
HIST.AS_OF_DATE ,
CASE WHEN REV.STATUS IS NULL THEN HIST.AMT_REM_AS_OF ELSE 0 END AS AMT_REM_AS_OF,
CASE WHEN REV.STATUS IS NULL THEN HIST.AMT_REM_AS_OF_FUNC ELSE 0 END AS AMT_REM_AS_OF_FUNC,
PSA.LAST_UPDATE_DATE ,
PSA.CUSTOMER_TRX_ID ,
PSA.CASH_RECEIPT_ID ,
GL.CURRENCY_CODE AS CURRENCY_CODE_FUNC ,
COALESCE(RCTA.BILL_TO_SITE_USE_ID ,PSA.CUSTOMER_SITE_USE_ID ,ACRA.CUSTOMER_SITE_USE_ID) AS BILL_TO_SITE_USE_ID ,
RCTA.SOLD_TO_SITE_USE_ID ,
RCTA.SHIP_TO_SITE_USE_ID ,
COALESCE(RCTA.BILL_TO_CUSTOMER_ID ,PSA.CUSTOMER_ID ,ACRA.PAY_FROM_CUSTOMER) AS BILL_TO_CUSTOMER_ID ,
RCTA.SOLD_TO_CUSTOMER_ID ,
RCTA.SHIP_TO_CUSTOMER_ID,
GLDA.CODE_COMBINATION_ID,
COALESCE(PSA.EXCHANGE_DATE ,GLDA.GL_DATE ,PSA.GL_DATE) AS AR_FX_DATE,
GL.ACCOUNTED_PERIOD_TYPE PERIOD_TYPE,
GL.PERIOD_SET_NAME
FROM AR_PAYMENT_SCHEDULES_ALL PSA
JOIN HIST
ON HIST.PAYMENT_SCHEDULE_ID = PSA.PAYMENT_SCHEDULE_ID
LEFT JOIN HR_OPERATING_UNITS ORG
ON ORG.ORGANIZATION_ID = PSA.ORG_ID
LEFT JOIN AR_CASH_RECEIPTS_ALL ACRA
ON ACRA.CASH_RECEIPT_ID = PSA.CASH_RECEIPT_ID
LEFT JOIN AR_CASH_RECEIPT_HISTORY_ALL REV
ON REV.CASH_RECEIPT_ID = PSA.CASH_RECEIPT_ID
AND REV.STATUS = 'REVERSED'
AND REV.GL_DATE <= HIST.AS_OF_DATE
LEFT JOIN GL_LEDGERS GL
ON GL.LEDGER_ID = ORG.SET_OF_BOOKS_ID
LEFT JOIN RA_CUSTOMER_TRX_ALL RCTA
ON RCTA.CUSTOMER_TRX_ID = PSA.CUSTOMER_TRX_ID
LEFT JOIN PTY BILP
ON BILP.CUST_ACCOUNT_ID = COALESCE(RCTA.BILL_TO_CUSTOMER_ID ,PSA.CUSTOMER_ID ,ACRA.PAY_FROM_CUSTOMER)
LEFT JOIN PTY SHIP
ON SHIP.CUST_ACCOUNT_ID = RCTA.SHIP_TO_CUSTOMER_ID
LEFT JOIN PTY SOLP
ON SOLP.CUST_ACCOUNT_ID = RCTA.SOLD_TO_CUSTOMER_ID
LEFT JOIN RA_CUST_TRX_LINE_GL_DIST_ALL GLDA
ON GLDA.CUSTOMER_TRX_ID = PSA.CUSTOMER_TRX_ID
AND GLDA.CUSTOMER_TRX_LINE_ID IS NULL
AND GLDA.LATEST_REC_FLAG = 'Y'
""")
save(df)