cancel
Showing results for 
Search instead for 
Did you mean: 

How do I call a load plan rest API via a python script?

amit_kothari
Employee
Employee

I have a need to execute and monitor a load plan via rest APIs using a python script, how do I do that?

1 REPLY 1

amit_kothari
Employee
Employee

Here is a sample python script to execute a load plan and monitor the run via rest apis. Refer to  this article too.

Enable Access for an Individual User

  1. As an Incorta Super User or a user with the SuperRole, sign in to the Incorta.
  2. In the Navigation bar, select Security.
  3. In the Action bar, select Users.
  4. Select the checkbox next to the user you want to enable public API access.
  5. Select Edit (pen icon) in the upper right corner of the Security page.
  6. In the Edit User drawer, select the Security tab.
  7. In the Developer Tool section, toggle Enable Public API.

Create a Personal Access Security Token

  1. Select 'Create a Personal Access Token'
  2. Give the token a name and an expiration date
  3. Copy the token and save it. This token can not be viewed at a later time. Instead, a new one will need to be created. 

amit_kothari_0-1770935813865.png

 

To check for the API payload, on your browser  login to incorta first and then go to <incorta host>/incorta/api

#execute incorta load plans via rest api - https://docs.incorta.com/cloud/references-public-api-v2
#Use python 3

import requests
import time

#update this as per your cluster
prod_connection_dict = {
'tenant': "default",
'incorta_domain': "xx.cloud4.incorta.com",
'incorta_token': "personal access token string"
}

process_start_timestamp = time.time()
connection_dict = prod_connection_dict
incorta_headers = {"Authorization": f"Bearer {connection_dict['incorta_token']}"}
body = {"loadPlanName": "Test_Load_Plan"}

try:
api_url = f"https://{connection_dict['incorta_domain']}/incorta/api/v2/{connection_dict['tenant']}/load-plan/execute"
#print(api_url)
response = requests.post(api_url, headers=incorta_headers, json=body)
data = response.json()
#print(data)
api_url = f"https://{connection_dict['incorta_domain']}/incorta/api/v2/{connection_dict['tenant']}/load-plan/status/{data['loadPlanExecutionId']}?level=1"
response = requests.get(api_url, headers=incorta_headers)
data = response.json()
jobStatus = data["status"]["jobStatus"]
while jobStatus not in ('Success', 'Finished With Errors'😞
print("Load plan is still running")
time.sleep(30)
response = requests.get(api_url, headers=incorta_headers)
data = response.json()
jobStatus = data["status"]["jobStatus"]
print(jobStatus)
print("Finished Load Plan")
print(data["status"]["jobStatus"])
except:
print("An exception occurred")