on 04-19-2022 09:00 AM
This article describes how to use the DataRobot Batch Prediction API from the Incorta Cloud. In this scenario, GCS access is granted to DataRobot and DataRobot reads the input data that include the features from a GCS bucket and will write the result to the GCS bucket.
On the Incorta side, the data stored in Incorta will be written to the GCS bucket from an Incorta materialized view (MV) and the prediction result that is stored in GCS will be read by the MV program, converted to a Spark DataFrame and saved as the MV
There are two ways to create a GCP Credential.
You can create it in DataRobot. https://app2.datarobot.com/docs/admin/for-users/stored-creds.html
Or you can create a GCP Credential in Incorta Notebook:
In Incorta Notebook:
Add Property:
Code Sample:
import datarobot as dr
from google.oauth2 import service_account
from google.cloud import storage
import os
import json
import pandas as pd
import io
DATAROBOT_API_TOKEN = 'nBCamZ6dzcvUHVWaEprejhja3VZOXdxaEU0Nm5wSVQ5bDRCeG5CanFjRnM9NjFhNzNhNzllZjc0ZTljYWEyMDBkZjgxO'
DATAROBOT_ENDPOINT = 'https://app2.datarobot.com/api/v2'
DEPLOYMENT_ID = 'e72b70e00ebea6861ca66a08'
dr.Client(
endpoint=DATAROBOT_ENDPOINT,
token=DATAROBOT_API_TOKEN,
user_agent_suffix='IntegrationSnippet-ApiClient'
)
# Use this code to look up the ID of the credential object created.
credential_id = None
DR_CREDENTIAL_NAME = 'incorta_GCP_ml2022'
for cred in dr.Credential.list():
if cred.name == DR_CREDENTIAL_NAME:
credential_id = cred.credential_id
print(credential_id)
GCP_BUCKET_NAME = 'ic-ml2022-yyyyy-bucket'
GCP_INPUT_SCORING_FILE = 'CreditRiskPredPrepared_GCS.csv'
GCP_OUTPUT_RESULTS_FILE = 'CreditRiskPredPrediction_GCS_Output.csv'
import pandas as pd
# read data from Incorta
df = read('CreditRisk.CreditRiskPredPrepared')
pdf = df.toPandas()
storage_client = storage.Client()
# give the bucket name
bucket = storage_client.get_bucket(GCP_BUCKET_NAME)
# give the path and file name to save the file in GCS
blob = bucket.blob(GCP_INPUT_SCORING_FILE)
with blob.open("w") as f:
f.write(pdf.to_csv(index=False))
# Set up batch prediction job
# Input: Google Cloud Storage
# Output: Google Cloud Storage
job = dr.BatchPredictionJob.score(
deployment=DEPLOYMENT_ID,
intake_settings={
'type': 'gcp',
'url': "gs://{}/{}".format(GCP_BUCKET_NAME,GCP_INPUT_SCORING_FILE),
"credential_id": credential_id
},
output_settings={
'type': 'gcp',
'url': "gs://{}/{}".format(GCP_BUCKET_NAME,GCP_OUTPUT_RESULTS_FILE),
"credential_id": credential_id
}
)
job.wait_for_completion()
job.get_status()
blob_output = bucket.blob(GCP_OUTPUT_RESULTS_FILE)
data_output = blob_output.download_as_string()
pdf_data = pd.read_csv(io.BytesIO(data_output))
# select columns from the prediction data frame
pdf2 = pd.DataFrame(pdf_data, columns = ['RiskStatusPrediction_PREDICTION'])
# concat prediction data frame and original data frame
result = pd.concat([pdf, pdf2], axis=1)
# create spark data frame
df_output=spark.createDataFrame(result)
incorta.head(df_output, 10)
save(df_output)