0

MV Spark loop failing

Good Day Community,

I'm working to build a historic table by looping through a SQL statement, translated into Python/Spark. Everything works, including being able to union two or more resulting spark.sql queries on the temporary tables loaded.

However, when I try to run a standard Python/Spark loop (ex. for i in var), and try to union the results, it fails consistently without good error log of what actually caused the error. I come from using Databricks, and was able to usually find the root cause, generally due to library or configuration issues (if not syntax on my part).

I'm stumped on this one. I'll provide a sample shortly of the code.

 

Please note, this is a repost due to accidentally posting in the "Analytics Thought Leadership" area.

5replies Oldest first
  • Oldest first
  • Newest first
  • Active threads
  • Popular
  • Here is the sample code used, need to loop through several numbers:

    from pyspark.sql import functions as F
    
    # iterable to build historic data table
    var1 = [-1,-2]
    
    
    # load temp tables
    df_sh = read("schema.table1")
    df_sh.createOrReplaceTempView("table1")
    
    
    # std sql query
    sqlquery = """
        select a.*
        from table1 a
        where
            a.date = last_day(add_months(CURRENT_DATE,{0}))
          """
    
    ### Option 1
    ### this fails the verification stage upon "Save" of MV
    for i in var1:
        if i ==0:
            df = spark.sql(sqlquery.format(i))
            results = df
        else:
            df = spark.sql(sqlquery.format(i))
            results = results.union(df)
    
    
    ### Option 2
    ### this fails as well
    def iter(qry, it):
        df = spark.sql(qry.format(it))
        return df
    
    results = []
    
    for i in var1:
        df = iter(sqlquery,i)
        results = results.union(df)
    
    
    ### Option 3 - works
    ### this works just fine
    df1 = spark.sql(sqlquery.format(var1[0]))
    df2 = spark.sql(sqlquery.format(var1[1]))
    results = df1.union(df2)
    save(results)
    
    
    spark.catalog.dropTempView("table1")
    Reply Like
    • Ryan Daly Hi Ryan, can I ask why you're SELECT'ing from table1 by date if only to UNION the results?  Why not just SELECT without the WHERE clause?  Wouldn't the result be the same?

      (Disclaimer: I am a python novice) But how is your variable "i" incrementing in Options 1 and 2?  We would need you to dig up the actual error from the spark logs which can be found in ./IncortaAnalytics/IncortaNode/spark/work/<app_identifier>/<worker>/stderr

      Reply Like
    • Dan Brock Hi Dan, In this case, we're rebuilding a point-in-time table and aggregating upward. The original table(s) and query is much more complex than this example, and to persist the data into a staging table for monthly data reporting. Not ideal, just the environment we're working with.

      Reply Like
  • Ryan - Spark itself doesn't offer for-loops, as it adheres to functional programming, which means you can never mutate or change a current data frame.  So you have to do like this - 

    for i in var1:
        if i ==0:
            df1 = spark.sql(sqlquery.format(i))
        else:
            df2 = spark.sql(sqlquery.format(i))

      df = df1.union(df2)

    Reply Like 1
    • Amit Kothari Thank you Amit for confirming how Spark is configured in Incorta's platform. You're correct in that we need to do a simple for loop to actually get this to work. I just determined the cause of my issue, and it all comes down to the fact of syntax. I overlooked the need to change my .format(i) to .format(var1[i+1]), which properly pulled the indexed values in the synthetic list. The loop is now working and we can create the table as necessary.

      Reply Like
Like Follow
  • 3 mths agoLast active
  • 5Replies
  • 43Views
  • 3 Following