suxinji
-modified.png?version=preview)
Employee Alumni
Options
- Article History
- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
on 08-01-2022 07:46 AM
Overview
This post shows how to call the DataRobot real-time API from an Incorta Materialized View pySpark program.
The goal is to get the predictions by calling the DataRobot end point that was hosted in DataRobot cloud based env.
Assumptions
- The Machine Learning has been developed using DataRobot cloud environment. Incorta data has been shared for training and testing the model.
- The prediction must be produced in the production system in a real time or as an integrated data pipeline within Incorta.
Solution
In Incorta Notebook:
1. The combination of deployment ID and API key is representing the end point hosted in Data Robot.
2. The DataRobot real time API assumes that the data can be passed as a string with the CSV format.
The Incorta data must be converted from Spark DataFrame to Pandas DataFrame first.
We then use Pandas to_csv() to convert the data in Pandas's data structure as the CSV format.
The DataRobot API only returns the prediction in the same sequence order as the feature data we sent for prediction. The returned data is available as a normal Python list (rows).
Example where we merge the prediction with the original data:
Finally, the data has to be converted to Spark DataFrame in order to save as an Incorta materialized view.
Sample Code(s)
"""
Usage:
python datarobot-predict.py <input-file.csv>
This example uses the requests library which you can install with:
pip install requests
We highly recommend that you update SSL certificates with:
pip install -U urllib3[secure] certifi
"""
import sys
import json
import requests
API_URL = 'https://app2.datarobot.com/api/v2/deployments/{deployment_id}/predictions/'
API_KEY = 'NjFhNzNhNzllZjc0ZTljYWEyMDBkZjgxOnBCamZ6dzcvUHVWaEprejhja3VZOXdxaEU0Nm5wSVQ5bDRCeG5CanFjRnM9'
DEPLOYMENT_ID = '61ca66aebea608e72b70e008'
# Don't change this. It is enforced server-side too.
MAX_PREDICTION_FILE_SIZE_BYTES = 52428800 # 50 MB
class DataRobotPredictionError(Exception):
"""Raised if there are issues getting predictions from DataRobot"""
def make_datarobot_deployment_predictions(data, deployment_id):
"""
Make predictions on data provided using DataRobot deployment_id provided.
See docs for details:
https://app2.datarobot.com/docs/predictions/api/dr-predapi.html
Parameters
----------
data : str
If using CSV as input:
Feature1,Feature2
numeric_value,string
Or if using JSON as input:
[{"Feature1":numeric_value,"Feature2":"string"}]
deployment_id : str
The ID of the deployment to make predictions with.
Returns
-------
Response schema:
https://app2.datarobot.com/docs/predictions/api/dr-predapi.html#response-schema
Raises
------
DataRobotPredictionError if there are issues getting predictions from DataRobot
"""
# Set HTTP headers. The charset should match the contents of the file.
headers = {
# As default, we expect CSV as input data.
# Should you wish to supply JSON instead,
# comment out the line below and use the line after that instead:
'Content-Type': 'text/plain; charset=UTF-8',
# 'Content-Type': 'application/json; charset=UTF-8',
'Authorization': 'Bearer {}'.format(API_KEY),
}
url = API_URL.format(deployment_id=deployment_id)
# Prediction Explanations:
# See the documentation for more information:
# https://app2.datarobot.com/docs/predictions/api/dr-predapi.html#request-pred-explanations
# Should you wish to include Prediction Explanations or Prediction Warnings in the result,
# Change the parameters below accordingly, and remove the comment from the params field below:
params = {
# If explanations are required, uncomment the line below
# 'maxExplanations': 3,
# 'thresholdHigh': 0.5,
# 'thresholdLow': 0.15,
# Uncomment this for Prediction Warnings, if enabled for your deployment.
# 'predictionWarningEnabled': 'true',
}
# Make API request for predictions
predictions_response = requests.post(
url,
data=data,
headers=headers,
# Prediction Explanations:
# Uncomment this to include explanations in your prediction
# params=params,
)
_raise_dataroboterror_for_status(predictions_response)
# Return a Python dict following the schema in the documentation
return predictions_response.json()
def _raise_dataroboterror_for_status(response):
"""Raise DataRobotPredictionError if the request fails along with the response returned"""
try:
response.raise_for_status()
except requests.exceptions.HTTPError:
err_msg = '{code} Error: {msg}'.format(
code=response.status_code, msg=response.text)
raise DataRobotPredictionError(err_msg)
import pandas as pd
# read data from Incorta
df = read('CreditRisk.CreditRiskPredPrepared')
pdf = df.toPandas()
from io import StringIO
csv_buffer = StringIO()
pdf.to_csv(csv_buffer)
data = csv_buffer.getvalue()
predictions = make_datarobot_deployment_predictions(data, DEPLOYMENT_ID)
rows = predictions["data"]
import pandas as pd
row_list = [(r['rowId'], r['prediction']) for r in rows]
# select columns from the prediction data frame
pdf2 = pd.DataFrame(row_list, columns = ['rowId', 'prediction'])
# concat prediction data frame and original data frame
result = pd.concat([pdf, pdf2], axis=1)
# create spark data frame
df_output=spark.createDataFrame(result)
incorta.show(df_output)
save(df_output)
Links
Incorta with DataRobot Prediction API Overview
Incorta On-premises with DataRobot Real-time Prediction API
Incorta On-premises with DataRobot Batch Prediction API
Labels: