0

Spark Materialized Views & Memory

I am using a Materialized View to join two tables together and create a single merged result. The basic PySpark code I'm using is:

from pyspark.sql import SparkSession

df1 = read("ClaimRepoDemo.ClaimImmutable")
df2 = read("ClaimRepoDemo.ClaimMutable")

result = df2.join(df1, "RawDataImport_PFMClaimIdentifier", "inner")

save(result)
 

For an initial test, I ran this MV code against source tables that had ~4 million rows each (total data size of ~2.0 GB). The test was successful and the MV load took about 5 mins to run.

However, I then tried running this MV code against much larger source tables (~65 million rows each, total data size ~40 GB) and it failed with an "Out of Memory" error. I decided to try running incremental loads on smaller chunks of the source data, so I partitioned the data by date into 25 groups of ~2.6 million rows each (see attachment) and then modified my PySpark code as follows:

from pyspark.sql import SparkSession
from pyspark.sql import functions as fn

df1 = read("ClaimRepoDemo.ClaimImmutable").where(fn.col("RawDataImport_ImportTimestamp") == '2018-08-01')
df2 = read("ClaimRepoDemo.ClaimMutable").where(fn.col("Claim_InsertTimestamp") == '2018-08-01')

result = df2.join(df1, "RawDataImport_PFMClaimIdentifier", "inner")

save(result)
 

However, this still produced an "Out of Memory" error which has me scratching my head. I'd like to use an incremental load on a PySpark MV to maintain a merged view of my data, but I can't figure out why I'm still getting the "Out of Memory" errors when I've filtered the source data to just 2.6 million rows (and I was previously able to successfully run against source data with 4 million rows). Does Spark require enough memory to store the entire source data in memory, even if I've applied a where clause to the read() function?

Any thoughts on how I can get this to work? 

9replies Oldest first
  • Oldest first
  • Newest first
  • Active threads
  • Popular
  • Dan, can you please post the list of parameters you are using for this MV.

    Reply Like
  • Hichem, I'm not sure what you mean. I'm using the Incorta UI for the MV, Language=Python,  Script / Incremental Script populated with the Python code from my post. I see there's a "Properties" section in the UI, but I have not added any key/value properties for this MV. I'm not familiar with what properties you can set and what they do. Is this what you are referring to when you say "list of parameters?"

    Reply Like
    • Dan Nielsen  these are sample values, you can play with the values

      spark.driver.memory   - 8G 
      spark.executor.memory  - 20G 
      spark.executor.cores - 4 
      spark.cores.max - 4 
      spark.sql.shuffle.partitions - 500

      Reply Like
  • Incorta allows you to allocate resources at the MV level using the properties defined in the MV definition page. 
    Please override the spark.sql.shuffle.partitions at the MV level to 2001 to see if the program can finish.

    Spark does not really require memory to store all data for processing.  

    Reply Like
  • I've tried setting a few of these properties (mainly spark.sql.shuffle.partitions) at the MV level and then ran an incremental load, but each time  I'm still getting an "Out of Memory" error:

    Transformation error: INC_005005001:Failed to load data from [spark://pwslincrtacp002.client.ext:7077] at [Materialized view] with properties [[error, OutOfMemoryError: Not enough memory available to Spark Driver/Executors for Materialized View job [ClaimRepoDemo.Claim]. ]]

    The Spark server I'm using currently has 48 cores and 15GB of memory total (3 cores / 6GB of RAM being used by data hub miniThrift jobs). I'm not an admin on the server so the only place I can specify/change any Spark settings is at the MV level.

    Reply Like
  • I spoke too soon, I kept trying different combinations of MV parameters and the following set is working:

        spark.executor.cores = 4

        spark.sql.shuffle.partitions = 500

        spark.executor.memory = 4G

        spark.driver.memory = 4G

    Thanks!

    Reply Like 2
    • Dan Nielsen  Some more hints -

      1. We can make executor.cores = cores.max. This will guarantee that only a single executor is used. 
      2. Adjust shuffle partition. The shuffle partitions should be a multiple of cores.max.

      Reply Like
  • Hi,

    i had one follow up question to this thread.

     

    The error "INC_005005001" usually comes due to the spark parameters only or  something to do with my code. i m getting same error for the below code while trying to save MV. however the same spark code runs in my other environment. so how to proceed here.

     

    FYI ,My spark has 64 cores with 4Gb Memory.

     

    to fix this 

     

    from pyspark.sql import SparkSession

    import datetime

    import sys

    ConnectionString='/u002/incorta/incorta_home/IncortaNode/spark/bin'

    sys.path.insert(0,ConnectionString)

    from datetime import date

    from pyspark.sql.functions import current_date, datediff,to_date

    import getconnstring as con

    owdtestConnFile = '/u002/incorta/incorta_home/IncortaNode/spark/bin/connection.csv'

    owdtestConn = con.readDatabaseConnectionDetails(owdtestConnFile)

     

    #creating dataframes from source tables and creating Tempviews

    MMISMDF=read("ERP_MM.MTL_INTERORG_SHIP_METHODS")

    MISMDDF = MMISMDF.select(['ship_method', 'intransit_time','from_organization_id','to_organization_id']).filter("default_flag=1 AND ISNOTNULL(destination_type)")

    MISMDDF.createOrReplaceTempView("MTL_INTERORG_SHIP_METHODS")

     

    MICVDF=read("ERP_MasterData.MTL_ITEM_CATEGORIES_V")

    MICVSODF=MICVDF.select(['category_concat_segs','inventory_item_id','organization_id']).filter("category_set_name = 'Dept_WSF'")

    MICVSODF.createOrReplaceTempView("Ship_Org_V")

    MICVIOODF=MICVDF.select(['category_concat_segs','inventory_item_id','organization_id']).filter("category_set_name = 'Inv. Operations Owner'")

    MICVIOODF.createOrReplaceTempView("Ioo_V")

     

    CICDF=read("ERP_CA.CST_ITEM_COSTS")

    CICDF.createOrReplaceTempView("CST_ITEM_COSTS")

     

    FCLDF=read("ERP_MasterData.FND_COMMON_LOOKUPS")

    FCLDF.createOrReplaceTempView("FND_COMMON_LOOKUPS")

     

    FFVNHDF=read("ERP_MasterData.FND_FLEX_VALUE_NORM_HIERARCHY_INTRANSIT")

    FFVNHDF.createOrReplaceTempView("FND_FLEX_VALUE_NORM_HIERARCHY")

    #FFVNHDF.createOrReplaceTempView("FND_NORM_H_Ioo") #al3

    #FFVNHDF.createOrReplaceTempView("FND_NORM_H_Child") #al14

    #FFVNHDF.createOrReplaceTempView("FND_NORM_H_Flex") #al15

     

    FFVLDF=read("ERP_MasterData.FND_FLEX_VALUES_VL")

    FFVLDF.createOrReplaceTempView("FND_FLEX_VALUES_VL")

    #FFVLDF.createOrReplaceTempView("FND_VAL_Desc_bu") #al16

    #FFVLDF.createOrReplaceTempView("FND_VAL_Desc_pl") #al4

     

    MSIBDF=read("ERP_MasterData.MTL_SYSTEM_ITEMS_B")

    MSIBDF.createOrReplaceTempView("MTL_SYSTEM_ITEMS_B")

     

    POAVDF=read("ERP_Purchasing.PO_AGENTS_V")

    POAVDF.createOrReplaceTempView("PO_AGENTS_V")


     

    PRHADF=read("ERP_Purchasing.PO_REQUISITION_HEADERS_ALL")

    PRHADF.createOrReplaceTempView("PO_REQUISITION_HEADERS_ALL")

     

    PRLADF=read("ERP_Purchasing.PO_REQUISITION_LINES_ALL")

    PRLADF.createOrReplaceTempView("PO_REQUISITION_LINES_ALL")

     

    RSHDF=read("ERP_Purchasing.RCV_SHIPMENT_HEADERS")

    RSHDF.createOrReplaceTempView("RCV_SHIPMENT_HEADERS")

     

    RSLDF=read("ERP_Purchasing.RCV_SHIPMENT_LINES")

    RSLDF.createOrReplaceTempView("RCV_SHIPMENT_LINES")

     

    MMTDF=read("ERP_Purchasing.MTL_MATERIAL_TRANSACTIONS")

    MMTDF.createOrReplaceTempView("MTL_MATERIAL_TRANSACTIONS")

     

    MTPRDF=read("ERP_MM_Master_Data.MTL_PARAMETERS")

    MTPRDF.createOrReplaceTempView("MTL_PARAMETERS")

    #MTPRDF.createOrReplaceTempView("MTL_PARAMETERS_SHIP") #al5

    #MTPRDF.createOrReplaceTempView("MTL_PARAMETERS_RCV") #al13

     

    Query="""SELECT date_format(current_date(), "yyyy-MM-dd") data_extract_date,

    FND_VAL_Desc_bu.description bu_division,

    FND_VAL_Desc_pl.description pl_division,

    Ioo_V.category_concat_segs AS IOO,

    MTL_PARAMETERS_RCV.organization_code organization_code,

    MTL_SYSTEM_ITEMS_B.segment1 item_no,

    FND_COMMON_LOOKUPS.meaning AS item_type,

    MTL_SYSTEM_ITEMS_B.inventory_item_status_code item_status,

    CST_ITEM_COSTS.item_cost item_cost,

    RCV_SHIPMENT_LINES.item_description item_description,

    RCV_SHIPMENT_LINES.source_document_code source_doc,

    'SHIPMENT' lot_transaction_type,

    CAST(RCV_SHIPMENT_HEADERS.shipped_date as date) shipped_date,

    CONCAT(RCV_SHIPMENT_HEADERS.shipment_num,'.',RCV_SHIPMENT_LINES.line_num) shipment_no,

    MTL_INTERORG_SHIP_METHODS.ship_method SHIP_METHOD,

    MTL_PARAMETERS_RCV.organization_code receiving_org,

    MTL_PARAMETERS_SHIP.organization_code shipping_org, 

    Ship_Org_V.category_concat_segs SHIP_ORG_DEPT,

    RCV_SHIPMENT_LINES.quantity_received QUANTITY_RECEIVED,

    RCV_SHIPMENT_LINES.quantity_shipped QUANTITY_SHIPPED,

    (RCV_SHIPMENT_LINES.quantity_shipped - RCV_SHIPMENT_LINES.quantity_received) balance,

    (CST_ITEM_COSTS.item_cost * (RCV_SHIPMENT_LINES.quantity_shipped - RCV_SHIPMENT_LINES.quantity_received)) shipment_value,

    datediff(current_date,shipped_date) days_intransit,

    MTL_INTERORG_SHIP_METHODS.intransit_time INTRANSIT_TIME,

    MTL_MATERIAL_TRANSACTIONS.creation_date revised_ship_date,

    datediff(current_date,MTL_MATERIAL_TRANSACTIONS.creation_date ) REVISED_DAYS_INTRANSIT,

    RCV_SHIPMENT_LINES.shipment_line_status_code line_status_code,

    MTL_SYSTEM_ITEMS_B.planner_code planner,

    MTL_SYSTEM_ITEMS_B.inventory_item_id inventory_item_id,

    MTL_SYSTEM_ITEMS_B.organization_id organization_id,

    RCV_SHIPMENT_LINES.shipment_line_id Integration_id,

    PO_AGENTS_V.agent_name agent_name,

    PO_REQUISITION_HEADERS_ALL.segment1 ir_number,

    MTL_PARAMETERS_RCV.organization_id receiving_organization_id,

    datediff(current_date,shipped_date) No_Of_days_intransit

    FROM MTL_SYSTEM_ITEMS_B

    LEFT OUTER JOIN PO_AGENTS_V

    ON MTL_SYSTEM_ITEMS_B.buyer_id = PO_AGENTS_V.agent_id

    JOIN FND_COMMON_LOOKUPS

    ON MTL_SYSTEM_ITEMS_B.item_type = FND_COMMON_LOOKUPS.lookup_code

    AND FND_COMMON_LOOKUPS.lookup_type = 'ITEM_TYPE'

    JOIN RCV_SHIPMENT_LINES

    ON RCV_SHIPMENT_LINES.item_id = MTL_SYSTEM_ITEMS_B.inventory_item_id

    AND RCV_SHIPMENT_LINES.to_organization_id = MTL_SYSTEM_ITEMS_B.organization_id

    JOIN CST_ITEM_COSTS

    ON RCV_SHIPMENT_LINES.to_organization_id = CST_ITEM_COSTS.organization_id

    AND RCV_SHIPMENT_LINES.shipment_line_status_code <> 'FULLY RECEIVED'

    AND CST_ITEM_COSTS.cost_type_id = 1

    AND MTL_SYSTEM_ITEMS_B.inventory_item_id = CST_ITEM_COSTS.inventory_item_id

    LEFT OUTER JOIN PO_REQUISITION_LINES_ALL

    ON RCV_SHIPMENT_LINES.requisition_line_id = PO_REQUISITION_LINES_ALL.requisition_line_id

    LEFT OUTER JOIN PO_REQUISITION_HEADERS_ALL

    ON PO_REQUISITION_LINES_ALL.requisition_header_id = PO_REQUISITION_HEADERS_ALL.requisition_header_id

    JOIN RCV_SHIPMENT_HEADERS

    ON RCV_SHIPMENT_LINES.shipment_header_id = RCV_SHIPMENT_HEADERS.shipment_header_id

    LEFT OUTER JOIN MTL_INTERORG_SHIP_METHODS

    ON RCV_SHIPMENT_LINES.from_organization_id = MTL_INTERORG_SHIP_METHODS.from_organization_id

    AND RCV_SHIPMENT_LINES.to_organization_id = MTL_INTERORG_SHIP_METHODS.to_organization_id

    JOIN MTL_PARAMETERS MTL_PARAMETERS_RCV

    ON RCV_SHIPMENT_LINES.to_organization_id = MTL_PARAMETERS_RCV.organization_id

    JOIN MTL_PARAMETERS MTL_PARAMETERS_SHIP

    ON RCV_SHIPMENT_LINES.from_organization_id = MTL_PARAMETERS_SHIP.organization_id

    AND MTL_PARAMETERS_SHIP.organization_code <> NULL

    JOIN MTL_MATERIAL_TRANSACTIONS

    ON RCV_SHIPMENT_LINES.mmt_transaction_id = MTL_MATERIAL_TRANSACTIONS.transaction_id

    LEFT OUTER JOIN Ship_Org_V

    ON MTL_SYSTEM_ITEMS_B.inventory_item_id = Ship_Org_V.inventory_item_id

    AND MTL_SYSTEM_ITEMS_B.organization_id = Ship_Org_V.organization_id

    LEFT OUTER JOIN Ioo_V

    ON MTL_SYSTEM_ITEMS_B.inventory_item_id = Ioo_V.inventory_item_id

    AND MTL_SYSTEM_ITEMS_B.organization_id = Ioo_V.organization_id

    JOIN FND_FLEX_VALUE_NORM_HIERARCHY FND_NORM_H_Ioo

    ON Ioo_V.category_concat_segs = FND_NORM_H_Ioo.child_flex_value_low

    JOIN FND_FLEX_VALUE_NORM_HIERARCHY FND_NORM_H_Flex

    ON FND_NORM_H_Ioo.parent_flex_value = FND_NORM_H_Flex.child_flex_value_low

    JOIN FND_FLEX_VALUE_NORM_HIERARCHY FND_NORM_H_Child

    ON FND_NORM_H_Flex.parent_flex_value = FND_NORM_H_Child.child_flex_value_low

    JOIN FND_FLEX_VALUES_VL FND_VAL_Desc_bu

    ON FND_VAL_Desc_bu.flex_value_meaning = FND_NORM_H_Child.parent_flex_value

    JOIN FND_FLEX_VALUES_VL FND_VAL_Desc_pl

    ON FND_VAL_Desc_pl.flex_value_meaning = FND_NORM_H_Flex.parent_flex_value"""

     

    DataFrame=spark.sql(Query)

    save(DataFrame)

     

     

    Reply Like
    • Ram -- please work with support on this issue.

      Reply Like
Like Follow
  • Status Answered
  • 3 mths agoLast active
  • 9Replies
  • 686Views
  • 7 Following