Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _execute_workflow(self, action, parameters=None, execute_async=True,
expected_status=None, expected_result=None):
ex = models.LiveAction(action=action, parameters=(parameters or {}))
ex = self.st2client.executions.create(ex)
self.assertIsNotNone(ex.id)
self.assertEqual(ex.action['ref'], action)
self.assertIn(ex.status, LIVEACTION_LAUNCHED_STATUSES)
if execute_async:
return ex
if expected_status is None:
expected_status = action_constants.LIVEACTION_STATUS_SUCCEEDED
self.assertIn(expected_status, action_constants.LIVEACTION_STATUSES)
ex = self._wait_for_completion(ex)
self.assertEqual(ex.status, expected_status)
def _execute_workflow(
self,
action,
parameters=None,
execute_async=True,
expected_status=None,
expected_result=None,
):
ex = models.LiveAction(action=action, parameters=(parameters or {}))
ex = self.st2client.executions.create(ex)
self.assertIsNotNone(ex.id)
self.assertEqual(ex.action['ref'], action)
self.assertIn(ex.status, LIVEACTION_LAUNCHED_STATUSES)
if execute_async:
return ex
if expected_status is None:
expected_status = action_constants.LIVEACTION_STATUS_SUCCEEDED
self.assertIn(expected_status, action_constants.LIVEACTION_STATUSES)
ex = self._wait_for_completion(ex)
self.assertEqual(ex.status, expected_status)
def _execute_workflow(self, action, parameters=None):
ex = models.LiveAction(action=action, parameters=(parameters or {}))
ex = self.st2client.executions.create(ex)
self.assertIsNotNone(ex.id)
self.assertEqual(ex.action['ref'], action)
self.assertIn(ex.status, LIVEACTION_LAUNCHED_STATUSES)
return ex
def runAction(action_ref, params):
st2_endpoints = {
'action': "http://%s:9101" % ST2HOST,
'reactor': "http://%s:9102" % ST2HOST,
'datastore': "http://%s:9103" % ST2HOST
}
client = Client(st2_endpoints)
action_exec_mgr = client.managers['LiveAction']
execution = models.LiveAction()
execution.action = action_ref
execution.parameters = param_parser(params)
actionexec = action_exec_mgr.create(execution)
while actionexec.status not in END_STATES:
time.sleep(2)
actionexec = action_exec_mgr.get_by_id(actionexec.id)
return actionexec