cancel
Showing results for 
Search instead for 
Did you mean: 
suxinji
Employee Alumni
Employee Alumni

Overview

This post shows how to call the DataRobot real-time API from an Incorta Materialized View pySpark program.
The goal is to get the predictions by calling the DataRobot end point that was hosted in DataRobot cloud based env.

Assumptions

  • The Machine Learning has been developed using DataRobot cloud environment.  Incorta data has been shared for training and testing the model.
  • The prediction must be produced in the production system in a real time or as an integrated data pipeline within Incorta.

Solution

In Incorta Notebook: 

1. The combination of deployment ID and API key is representing the end point hosted in Data Robot.

2. The DataRobot real time API assumes that the data can be passed as a string with the CSV format.

The  Incorta data must be converted from Spark DataFrame to Pandas DataFrame first.

We then use Pandas to_csv() to convert the data in Pandas's data structure as the CSV format.

 

suxinji_0-1649872996830.png
 
The DataRobot API only returns the prediction in the same sequence order as the feature data we sent for prediction.  The returned data is available as a normal Python list (rows).
Example where we merge the prediction with the original data:
 
suxinji_1-1649873014816.png

Finally, the data has to be converted to Spark DataFrame in order to save as an Incorta materialized view.

Sample Code(s)

 

"""
Usage:
    python datarobot-predict.py <input-file.csv>
 
This example uses the requests library which you can install with:
    pip install requests
We highly recommend that you update SSL certificates with:
    pip install -U urllib3[secure] certifi
"""
import sys
import json
import requests
 
API_URL = 'https://app2.datarobot.com/api/v2/deployments/{deployment_id}/predictions/'
API_KEY = 'NjFhNzNhNzllZjc0ZTljYWEyMDBkZjgxOnBCamZ6dzcvUHVWaEprejhja3VZOXdxaEU0Nm5wSVQ5bDRCeG5CanFjRnM9'
 
DEPLOYMENT_ID = '61ca66aebea608e72b70e008'
 
# Don't change this. It is enforced server-side too.
MAX_PREDICTION_FILE_SIZE_BYTES = 52428800  # 50 MB
 
 
class DataRobotPredictionError(Exception):
    """Raised if there are issues getting predictions from DataRobot"""
 
 
def make_datarobot_deployment_predictions(data, deployment_id):
    """
    Make predictions on data provided using DataRobot deployment_id provided.
    See docs for details:
         https://app2.datarobot.com/docs/predictions/api/dr-predapi.html
 
    Parameters
    ----------
    data : str
        If using CSV as input:
        Feature1,Feature2
        numeric_value,string
 
        Or if using JSON as input:
        [{"Feature1":numeric_value,"Feature2":"string"}]
 
    deployment_id : str
        The ID of the deployment to make predictions with.
 
    Returns
    -------
    Response schema:
        https://app2.datarobot.com/docs/predictions/api/dr-predapi.html#response-schema
 
    Raises
    ------
    DataRobotPredictionError if there are issues getting predictions from DataRobot
    """
    # Set HTTP headers. The charset should match the contents of the file.
    headers = {
        # As default, we expect CSV as input data.
        # Should you wish to supply JSON instead,
        # comment out the line below and use the line after that instead:
        'Content-Type': 'text/plain; charset=UTF-8',
        # 'Content-Type': 'application/json; charset=UTF-8',
 
        'Authorization': 'Bearer {}'.format(API_KEY),
    }
 
    url = API_URL.format(deployment_id=deployment_id)
 
    # Prediction Explanations:
    # See the documentation for more information:
    # https://app2.datarobot.com/docs/predictions/api/dr-predapi.html#request-pred-explanations
    # Should you wish to include Prediction Explanations or Prediction Warnings in the result,
    # Change the parameters below accordingly, and remove the comment from the params field below:
 
    params = {
        # If explanations are required, uncomment the line below
        # 'maxExplanations': 3,
        # 'thresholdHigh': 0.5,
        # 'thresholdLow': 0.15,
        # Uncomment this for Prediction Warnings, if enabled for your deployment.
        # 'predictionWarningEnabled': 'true',
    }
    # Make API request for predictions
    predictions_response = requests.post(
        url,
        data=data,
        headers=headers,
        # Prediction Explanations:
        # Uncomment this to include explanations in your prediction
        # params=params,
    )
    _raise_dataroboterror_for_status(predictions_response)
    # Return a Python dict following the schema in the documentation
    return predictions_response.json()
 
 
def _raise_dataroboterror_for_status(response):
    """Raise DataRobotPredictionError if the request fails along with the response returned"""
    try:
        response.raise_for_status()
    except requests.exceptions.HTTPError:
        err_msg = '{code} Error: {msg}'.format(
            code=response.status_code, msg=response.text)
        raise DataRobotPredictionError(err_msg)

import pandas as pd
# read data from Incorta
df = read('CreditRisk.CreditRiskPredPrepared')
pdf = df.toPandas()
from io import StringIO
csv_buffer = StringIO()
pdf.to_csv(csv_buffer)
data = csv_buffer.getvalue()
predictions = make_datarobot_deployment_predictions(data, DEPLOYMENT_ID)
rows = predictions["data"]
import pandas as pd 
row_list = [(r['rowId'], r['prediction']) for r in rows]
# select columns from the prediction data frame
pdf2 = pd.DataFrame(row_list, columns = ['rowId', 'prediction'])
# concat prediction data frame and original data frame
result = pd.concat([pdf, pdf2], axis=1)
# create spark data frame
df_output=spark.createDataFrame(result)
incorta.show(df_output)
save(df_output)

Links

Incorta with DataRobot Prediction API Overview

Incorta On-premises with DataRobot Real-time Prediction API

Incorta On-premises with DataRobot Batch Prediction API

Incorta Cloud with DataRobot GCP Real-time Prediction API

Incorta Cloud with DataRobot GCP Batch Prediction API

 

Best Practices Index
Best Practices

Just here to browse knowledge? This might help!

Version history
Last update:
‎07-29-2022 11:10 AM
Updated by: