cancel
Showing results for 
Search instead for 
Did you mean: 
suxinji
Employee Alumni
Employee Alumni

Overview

This article describes how to use the DataRobot Batch Prediction API from the Incorta Cloud. In this scenario, GCS access is granted to DataRobot and DataRobot reads the input data that include the features from a GCS bucket and will write the result to the GCS bucket.

On the Incorta side, the data stored in Incorta will be written to the GCS bucket from an Incorta materialized view (MV) and the prediction result that is stored in GCS will be read by the MV program, converted to a Spark DataFrame and saved as the MV

Solution

There are two ways to create a GCP Credential.

You can create it in DataRobot. https://app2.datarobot.com/docs/admin/for-users/stored-creds.html

suxinji_0-1649873175921.png

Or you can create a GCP Credential in Incorta Notebook:

suxinji_1-1649873205887.png

In Incorta Notebook:

suxinji_2-1649873293578.png

 

suxinji_3-1649873300853.png

 

suxinji_4-1649873307157.png

Add Property:

suxinji_5-1649873322776.png

Code Sample:

import datarobot as dr
from google.oauth2 import service_account
from google.cloud import storage
import os
import json
import pandas as pd 
import io
DATAROBOT_API_TOKEN = 'nBCamZ6dzcvUHVWaEprejhja3VZOXdxaEU0Nm5wSVQ5bDRCeG5CanFjRnM9NjFhNzNhNzllZjc0ZTljYWEyMDBkZjgxO'
DATAROBOT_ENDPOINT = 'https://app2.datarobot.com/api/v2'
DEPLOYMENT_ID = 'e72b70e00ebea6861ca66a08'

dr.Client(
        endpoint=DATAROBOT_ENDPOINT,
        token=DATAROBOT_API_TOKEN,
        user_agent_suffix='IntegrationSnippet-ApiClient'
    )
# Use this code to look up the ID of the credential object created.
credential_id = None
DR_CREDENTIAL_NAME = 'incorta_GCP_ml2022'
for cred in dr.Credential.list():
    if cred.name == DR_CREDENTIAL_NAME:
        credential_id = cred.credential_id

print(credential_id)
GCP_BUCKET_NAME = 'ic-ml2022-yyyyy-bucket'
GCP_INPUT_SCORING_FILE = 'CreditRiskPredPrepared_GCS.csv'
GCP_OUTPUT_RESULTS_FILE = 'CreditRiskPredPrediction_GCS_Output.csv'

import pandas as pd
# read data from Incorta
df = read('CreditRisk.CreditRiskPredPrepared')
pdf = df.toPandas()

storage_client = storage.Client()
# give the bucket name
bucket = storage_client.get_bucket(GCP_BUCKET_NAME)
# give the path and file name to save the file in GCS
blob = bucket.blob(GCP_INPUT_SCORING_FILE)
with blob.open("w") as f:
        f.write(pdf.to_csv(index=False))
# Set up batch prediction job
# Input: Google Cloud Storage
# Output: Google Cloud Storage
job = dr.BatchPredictionJob.score(
    deployment=DEPLOYMENT_ID,
    intake_settings={
        'type': 'gcp',
        'url': "gs://{}/{}".format(GCP_BUCKET_NAME,GCP_INPUT_SCORING_FILE),
        "credential_id": credential_id
    },
    output_settings={
        'type': 'gcp',
        'url': "gs://{}/{}".format(GCP_BUCKET_NAME,GCP_OUTPUT_RESULTS_FILE),
        "credential_id": credential_id
    }
)
job.wait_for_completion()
job.get_status()
blob_output = bucket.blob(GCP_OUTPUT_RESULTS_FILE)
data_output = blob_output.download_as_string()
pdf_data = pd.read_csv(io.BytesIO(data_output))
# select columns from the prediction data frame
pdf2 = pd.DataFrame(pdf_data, columns = ['RiskStatusPrediction_PREDICTION'])
# concat prediction data frame and original data frame
result = pd.concat([pdf, pdf2], axis=1)
# create spark data frame
df_output=spark.createDataFrame(result)
incorta.head(df_output, 10)
save(df_output)

 

 

Best Practices Index
Best Practices

Just here to browse knowledge? This might help!

Contributors
Version history
Last update:
‎04-18-2022 03:26 PM
Updated by: