04-11-2024 01:35 PM
I have a need to execute and monitor a load plan via rest APIs using a python script, how do I do that?
Solved! Go to Solution.
04-11-2024 01:36 PM - edited 03-19-2025 01:44 PM
Here is a sample python script to execute a load plan and monitor the run via rest apis. Refer to this article too.
To check for the API payload, on your browser login to incorta first and then go to <incorta host>/incorta/api
#execute incorta load plans via rest api - https://docs.incorta.com/cloud/references-public-api-v2
#Use python 3
import requests
import time
#update this as per your cluster
prod_connection_dict = {
'tenant': "default",
'incorta_domain': "xx.cloud4.incorta.com",
'incorta_token': "personal access token string"
}
process_start_timestamp = time.time()
connection_dict = prod_connection_dict
incorta_headers = {"Authorization": f"Bearer {connection_dict['incorta_token']}"}
body = {"loadPlanName": "Test_Load_Plan"}
try:
api_url = f"https://{connection_dict['incorta_domain']}/incorta/api/v2/{connection_dict['tenant']}/load-plan/execute"
#print(api_url)
response = requests.post(api_url, headers=incorta_headers, json=body)
data = response.json()
#print(data)
api_url = f"https://{connection_dict['incorta_domain']}/incorta/api/v2/{connection_dict['tenant']}/load-plan/status/{data['loadPlanExecutionId']}?level=1"
response = requests.get(api_url, headers=incorta_headers)
data = response.json()
jobStatus = data["status"]["jobStatus"]
while jobStatus not in ('Success', 'Finished With Errors'😞
print("Load plan is still running")
time.sleep(30)
response = requests.get(api_url, headers=incorta_headers)
data = response.json()
jobStatus = data["status"]["jobStatus"]
print(jobStatus)
print("Finished Load Plan")
print(data["status"]["jobStatus"])
except:
print("An exception occurred")