02-27-2024 05:31 AM - edited 03-20-2024 10:50 AM
Incorta Copilot was rolled out in Incorta 2024.1 and has revolutionized how architects, developers, and analysts interact with their data.
Copilot for notebooks is available as a preview feature in 2024.1.0. The feature is still maturing, and more enhancements will follow in upcoming releases. If you need more information, please get in touch with Incorta Support.
Interactive notebooks have previously been an option for building materialized views. With the 2024.1 release, analysts can create interactive notebooks and run against verified business views. The prompts and experiences below are available when creating MVs in a schema or a notebook. One of the key differences between the two experiences is that you must save the output dataset of a mv with a save() function, where a notebook does not save a dataset.
Another consideration is that Copilot is aware of the user's data access. Because of this, prompts asking for join paths between tables in a physical schema by a user who only has access to business schemas will yield no result. Instead, in this example, the Copilot will recommend a view or set of views.
Easy! For each paragraph, update the code type to Incorta Copilot.
For the below examples, there are two key shortcuts to help Copilot reference the context of a user's prompt. Examples of these shortcuts will be shown below.
“@cell” can be used to include the code from the previous cell as context for the question
“@error” can be used to include the code from the previous cell as well as the trace error printed as output for the previous cell as context for the question
First and foremost, indexing your metadata will be critical to ensuring the most accurate responses. The Copilot is response will only take into account the metadata from a user's schema that was indexed. For that reason, it's recommended that Super Admins run the following commands after significant changes to schemas or business schemas. Note that these are run manually through notebooks today, but future enhancements include indexing automatically whenever a schema is updated.
To run an index job, run the following paragraph by setting the paragraph type to 'Incorta Copilot'
@index_chroma
In response, you'll get information on the indexing job. Feel free to copy and paste the information in the response in the next paragraph to monitor indexing.
@get_indexing_chroma_status xxxxxxxx-xxxx-xxxx-xxx-xxxxxxxxxxx
Upon running the status command, you will get a response like the following:
Chroma indexing progress with Id: xxxxxxxx-xxxx-xxxx-xxx-xxxxxxxxxxx is 5%
Note: If a cluster is shut down during indexing, the indexing job will be nullified.
With CoPilot, you can ask it to make recommendations of which views or tables should be used for analysis.
Prompt:
Suggest 3 physical tables that provide sales data and explain in detail when to use each one based on the main columns contained in them
Response:
Based on your inquiry, here are the three physical tables that contain sales data:
1. Table Name: productvendor
Schema Name: Product_Sales
Main Columns: ProductID, VendorID, AverageLeadTime, StandardPrice, LastReceiptCost, LastReceiptDate, MinOrderQty, MaxOrderQty, OnOrderQty, UnitMeasureCode, ModifiedDate
This table can be used when you need detailed information about the products and their vendors, including lead times, prices, and order quantities.
2. Table Name: salesorderdetail
Schema Name: Product_Sales
Main Columns: SalesOrderID, SalesOrderDetailID, CarrierTrackingNumber, OrderQty, ProductID, SpecialOfferID, UnitPrice, UnitPriceDiscount, LineTotal, rowguid, ModifiedDate
This table can be used when you need detailed information about sales orders, including tracking numbers, order quantities, product IDs, special offers, unit prices, and discounts.
3. Table Name: product
Schema Name: sales_inc_s
Main Columns: product, product_name, Subcategory, Category
This table can be used when you need information about the products, including their names and categories.
Please note that the usage of each table depends on the specific requirements of your analysis.
Use Copilot to identify tables to query and the Copilot will leverage the join relationships to automatically create joins.
Prompt:
Using the OnlineStore schema, join the salesorderheader, address, stateprovince, and countryregion tables to calculate total sales by countryregion.
Response:
import pyspark.sql.functions as F
# Read the tables
salesorderheader = read('OnlineStore.salesorderheader')
address = read('OnlineStore.address')
stateprovince = read('OnlineStore.stateprovince')
countryregion = read('OnlineStore.countryregion')
# Join the tables
joined_df = salesorderheader.join(address, salesorderheader.ShipToAddressID == address.AddressID)
joined_df = joined_df.join(stateprovince, address.StateProvinceID == stateprovince.StateProvinceID)
joined_df = joined_df.join(countryregion, stateprovince.CountryRegionCode == countryregion.CountryRegionCode)
# Calculate total sales by countryregion
result_df = joined_df.groupBy(countryregion.CountryRegionName).agg(F.sum(salesorderheader.TotalDue).alias('TotalSales'))
# Save the result
save(result_df)
Early iterations of the Copilot model have been specifically tuned to improve in notebook forecasting capabilities. By doing so, you can reference historical datasets and create predictions with a simple prompt!
Prompt:
Write one simple code block to forecast stock values for the next 30 days from the Apple view using Prophet lib.
Response:
import pandas as pd
from prophet import Prophet
# Read the data from Incorta table
apple_data = read('schema.table')
# Convert the Incorta dataframe to pandas dataframe
apple_df = apple_data.toPandas()
# Prepare the dataframe for Prophet
apple_df = apple_df.rename(columns={'Date': 'ds', 'Close': 'y'})
# Initialize and fit the model
m = Prophet()
m.fit(apple_df)
# Prepare a dataframe to hold the future dates
future = m.make_future_dataframe(periods=30)
# Make the forecast
forecast = m.predict(future)
# Print the forecast
print(forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']])
You might find yourself facing scenarios where you have SQL that you want to convert to pySpark, or perhaps python you want to convert to pySpark. Leverage the Copilot to do the heavy lifting. Here is a sample of starting code in PostGres SQL:
SELECT CustomerID, "OnlineStore"."salesorderdetail".SalesOrderID, SUM(LineTotal) AS SalesOrderTotal
FROM
"OnlineStore"."salesorderdetail"
LEFT OUTER JOIN "OnlineStore"."salesorderheader" ON (
"OnlineStore"."salesorderdetail".SalesOrderID = "OnlineStore"."salesorderheader".SalesOrderID )
GROUP BY 1, 2
Prompt:
@cell convert to pyspark. Ensure that the column salesorderID is specified from the salesorderdetail dataframe.
Response:
import pyspark.sql.functions as F
salesorderdetail_df = read('OnlineStore.salesorderdetail')
salesorderheader_df = read('OnlineStore.salesorderheader')
joined_df = salesorderdetail_df.join(salesorderheader_df, salesorderdetail_df.SalesOrderID == salesorderheader_df.SalesOrderID, 'left_outer')
result_df = joined_df.groupBy('CustomerID', salesorderdetail_df.SalesOrderID).agg(F.sum('LineTotal').alias('SalesOrderTotal'))
save(result_df)
Leverage the Copilot to optimize code for Materialized Views (MVs) where code may be verbose or slow-running. Take the following example of starting code:
import pyspark.sql.functions as F
# Read the salesperson table
df = read('OnlineStore.salesperson')
# Calculate the total sales
sales_total = df.agg(F.sum('SalesYTD')).collect()[0][0]
# Create a new dataframe with the total sales
df_total_sales = spark.createDataFrame([(sales_total,)], ['total_sales'])
# Save the result
save(df_total_sales)
Prompt:
@cell Explain performance issue in this code and how this code can be optimized, then rewrite the code with the optimization applied.
Response:
The performance issue in your code was due to the use of the collect() function which returns all the elements of the dataset as an array to the driver program. This can cause the driver program to run out of memory if the dataset is too large. To optimize the code, we replaced the collect() function with the first() function which returns the first element of the dataset. Here is the optimized code:
{'code': "import pyspark.sql.functions as F
# Read the salesperson table
df = read('OnlineStore.salesperson')
# Calculate the total sales using reduce action instead of collect to optimize performance
sales_total = df.agg(F.sum('SalesYTD')).first()[0]
# Create a new dataframe with the total sales
df_total_sales = spark.createDataFrame([(sales_total,)], ['total_sales'])
# Save the result
save(df_total_sales)", 'language': 'Pyspark'}
Any time you incur an issue with your code, open a new paragraph and use the command '@error' to send the above paragraph's error to the Copilot for processing.
For example, the following code that creates a line chart fails:
from pyspark.sql.functions import *
import matplotlib.pyplot as plt
# Read the data from Incorta table
stock_df = read('Stocks_BS.Apple')
# Filter the data for the last 30 days
stock_df = stock_df.filter(stock_df['Date'] >= date_sub(current_date(), 45))
# Plot the line chart
plt.figure(figsize=(10,5))
plt.plot(stock_df['Date'], stock_df['Close'], label='Stock Price')
plt.title('Stock Performance Over the Last 45 Days')
plt.xlabel('Date')
plt.ylabel('Stock Price')
plt.legend()
plt.show()
Open a paragraph and mention the error and any other explanation or considerations for providing updated code.
Prompt:
@error make me a proposed code fix
Copilot responds with code that corrects the error:
Response:
Here is the corrected Spark code to read the data from the Incorta table, filter it, and convert it to a Pandas DataFrame so that it can be plotted using matplotlib:
{'code': "from pyspark.sql.functions import *
import matplotlib.pyplot as plt
import pandas as pd
# Read the data from Incorta table
stock_df = read('Stocks_BS.Apple')
# Convert the Spark DataFrame to a Pandas DataFrame
stock_df = stock_df.toPandas()
# Filter the data for the last 30 days
stock_df = stock_df[stock_df['Date'] >= pd.Timestamp.now() - pd.DateOffset(days=45)]
# Plot the line chart
plt.figure(figsize=(10,5))
plt.plot(stock_df['Date'], stock_df['Close'], label='Stock Price')
plt.title('Stock Performance Over the Last 45 Days')
plt.xlabel('Date')
plt.ylabel('Stock Price')
plt.legend()
plt.show()", 'language': 'Pyspark'}
If you find yourself working on a materialized view and want a simplified explanation of how the code works, you can ask the Copilot to explain it. Here is sample starting code:
from pyspark.sql.functions import col, when, rank
from pyspark.sql.window import Window
df = read('OnlineStore.customers_ranked')
windowSpec = Window.orderBy(df['Total_Sales'].desc())
df = df.withColumn('Rank', rank().over(windowSpec))
df = df.filter(col('Rank') <= 10)
df.show()
Prompt:
@cell Explain the code
Response:
The provided code is written in PySpark and is used to rank customers based on their total sales. Here's a step-by-step explanation:
1. The code first imports necessary functions from PySpark's sql module.
2. It then reads a dataframe from the 'OnlineStore.customers_ranked' table.
3. A window specification is created that orders the data by 'Total_Sales' in descending order. This means that the customer with the highest total sales will be at the top.
4. The code then adds a new column 'Rank' to the dataframe, which ranks the rows based on the window specification. This rank is calculated based on the total sales of each customer.
5. After that, it filters the dataframe to only include rows where 'Rank' is less than or equal to 10. This means that only the top 10 customers based on total sales will be included in the final dataframe.
6. Finally, it displays the dataframe. This will show the top 10 customers with their respective ranks and total sales.
The Copilot has been trained on pySpark help documentation, making investigation of pySpark capabilities without leaving Incorta much easier!
Prompt:
When should I use reduceByKey vs groupByKey?
Response:
In Incorta MV, both reduceByKey and groupByKey are transformations in Spark that are used to perform aggregation on pair RDDs. However, they have different performance characteristics.
reduceByKey: This function performs a map-side combine with the user's provided function before shuffling the data. This can significantly reduce the amount of data that needs to be transferred across the network and lead to significant speed ups. Use this when your input data is large.
groupByKey: This function shuffles all the data across the network. It can be very expensive for large datasets. Use this when your input data is small, or when you need to perform an operation that is not associative and/or commutative, such as median or mode.
In summary, if your operation is associative and commutative, such as sum, min, max, etc., and your input data is large, you should use reduceByKey. If your operation is not associative and/or commutative and your input data is small, you should use groupByKey.