cancel
Showing results for 
Search instead for 
Did you mean: 

How do I call a load plan rest API via a python script?

amit_kothari
Employee
Employee

I have a need to execute and monitor a load plan via rest APIs using a python script, how do I do that?

1 REPLY 1

amit_kothari
Employee
Employee

Here is a sample python script to execute a load plan and monitor the run via rest apis. Refer to  this article too.

To check for the API payload, on your browser  login to incorta first and then go to <incorta host>/incorta/api

#execute incorta load plans via rest api - https://docs.incorta.com/cloud/references-public-api-v2
#Use python 3

import requests
import time

#update this as per your cluster
prod_connection_dict = {
'tenant': "default",
'incorta_domain': "xx.cloud4.incorta.com",
'incorta_token': "personal access token string"
}

process_start_timestamp = time.time()
connection_dict = prod_connection_dict
incorta_headers = {"Authorization": f"Bearer {connection_dict['incorta_token']}"}
body = {"loadPlanName": "Test_Load_Plan"}

try:
api_url = f"https://{connection_dict['incorta_domain']}/incorta/api/v2/{connection_dict['tenant']}/load-plan/execute"
#print(api_url)
response = requests.post(api_url, headers=incorta_headers, json=body)
data = response.json()
#print(data)
api_url = f"https://{connection_dict['incorta_domain']}/incorta/api/v2/{connection_dict['tenant']}/load-plan/status/{data['loadPlanExecutionId']}?level=1"
response = requests.get(api_url, headers=incorta_headers)
data = response.json()
jobStatus = data["status"]["jobStatus"]
while jobStatus not in ('Success', 'Finished With Errors'😞
print("Load plan is still running")
time.sleep(30)
response = requests.get(api_url, headers=incorta_headers)
data = response.json()
jobStatus = data["status"]["jobStatus"]
print(jobStatus)
print("Finished Load Plan")
print(data["status"]["jobStatus"])
except:
print("An exception occurred")