Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
Given a project_name in BaseProject // NextLevelProject // TargetProjectName form,
determine the existence/accessiblity of each successive path from the BaseProject
on towards the full path ending with TargetProjectName.
If found return a pyral entity for the TargetProject which will include the ObjectID (oid)
after setting an attribute for FullProjectPath with the value of project_name.
"""
proj_path_elements = project_name.split(PROJECT_PATH_ELEMENT_SEPARATOR)
base_path_element = proj_path_elements[0]
result = self.agent.get('Project', fetch="Name,ObjectID,Parent",
query='Name = "%s"' % base_path_element,
workspace=self._currentWorkspace, project=base_path_element,
projectScopeDown=False)
if not result or (result.errors or result.resultCount != 1):
problem = "No such accessible base Project found in the Workspace '%s'" % project_name
raise RallyRESTAPIError(problem)
base_project = result.next()
parent = base_project
project_path = [base_project.Name]
for proj_path_element in proj_path_elements[1:]:
project_path.append(proj_path_element)
criteria = ['Name = "%s"' % proj_path_element , 'Parent = %s' % parent._ref]
result = self.agent.get('Project', fetch="Name,ObjectID,Parent", query=criteria, workspace=self._currentWorkspace, project=parent.ref)
if not result or result.errors or result.resultCount != 1:
problem = "No such accessible Project found: '%s'" % PROJECT_PATH_ELEMENT_SEPARATOR.join(project_path)
raise RallyRESTAPIError(problem)
path_el = result.next()
parent = path_el
if PROJECT_PATH_ELEMENT_SEPARATOR.join(project_path) != project_name:
raise RallyRESTAPIError()
return path_el
problem = "No accessible Projects found in the Workspace '%s'" % self._defaultWorkspace
raise RallyRESTAPIError(problem)
try:
projects = [proj for proj in result]
except:
problem = "Unable to obtain Project Name values for projects in the '%s' Workspace"
raise RallyRESTAPIError(problem % self._defaultWorkspace)
# does the project_name contain a ' // ' path element separator token?
# if so, then we have to sidebar process this
if project_name and PROJECT_PATH_ELEMENT_SEPARATOR in project_name:
target_project = self._findMultiElementPathToProject(project_name)
if not target_project:
problem = "No such accessible multi-element-path Project: %s found in the Workspace '%s'"
raise RallyRESTAPIError(problem % (project_name, self._currentWorkspace))
# have to set:
# self._defaultProject, self._currentProject
# self._workspace_ref, self._project_ref
# self.defaultContext, self.operatingContext
else:
match_for_default_project = [project for project in projects if project.Name == self._defaultProject]
match_for_named_project = [project for project in projects if project.Name == project_name]
if project_name:
if not match_for_named_project:
problem = "The current Workspace '%s' does not contain an accessible Project with the name of '%s'"
raise RallyRESTAPIError(problem % (self._currentWorkspace, project_name))
else:
project = match_for_named_project[0]
proj_ref = project._ref
return self.context, augments
project = kwargs['project']
wks = workspace or self._currentWorkspace or self._defaultWorkspace
if project in self._projects[wks]:
prj_ref = self._project_ref[wks][project]
elif PROJECT_PATH_ELEMENT_SEPARATOR in project: # ' // '
proj_path_leaf = self._findMultiElementPathToProject(project)
prj_ref = proj_path_leaf.ref
project = proj_path_leaf.Name
elif re.search('project/\d+$', project):
prj_ref = project
else:
problem = 'Project specified: "%s" (in workspace: "%s") not accessible with current credentials' % \
(project, workspace)
raise RallyRESTAPIError(problem)
augments.append("project=%s" % prj_ref)
self.context.project = project
if 'projectScopeUp' in kwargs:
projectScopeUp = kwargs['projectScopeUp']
if projectScopeUp in [1, True, 'true', 'True']:
augments.append("projectScopeUp=true")
elif projectScopeUp in [0, False, 'false', 'False']:
augments.append("projectScopeUp=false")
else:
augments.append("projectScopeUp=false")
else:
augments.append("projectScopeUp=false")
if 'projectScopeDown' in kwargs:
#!/usr/bin/env python
import sys, os
import types
import urllib
import py
try:
from urllib import unquote
except:
from urllib.parse import unquote
import pyral
from pyral import Rally
RallyRESTAPIError = pyral.context.RallyRESTAPIError
##################################################################################################
from rally_targets import AGICEN, AGICEN_USER, AGICEN_PSWD
from rally_targets import YETI_USER, YETI_PSWD
from rally_targets import DEFAULT_WORKSPACE, DEFAULT_PROJECT, NON_DEFAULT_PROJECT
from rally_targets import BOONDOCKS_WORKSPACE, BOONDOCKS_PROJECT
from rally_targets import PROJECT_SCOPING_TREE
##################################################################################################
def test_update_defect_in_other_project():
"""
Using a known valid Rally server and known valid access credentials,
obtain a Rally instance for the DEFAULT_WORKSPACE and DEFAULT_PROJECT.
Update the State value of a Defect identified by a FormattedID that
#!/usr/local/bin/python2.7
import sys, os
import types
import py
import pyral
from pyral import Rally
RallyRESTAPIError = pyral.context.RallyRESTAPIError
##################################################################################################
from rally_targets import AGICEN, AGICEN_USER, AGICEN_PSWD
from rally_targets import DEFAULT_WORKSPACE, DEFAULT_PROJECT
EXAMPLE_ATTACHMENT_CONTENT = "The quck brown fox eluded the lumbering sloth\n"
##################################################################################################
def conjureUpAttachmentFile(filename, content=None, mimetype="text/plain"):
"""
"""
file_content = content or EXAMPLE_ATTACHMENT_CONTENT
with open(filename, 'w') as af:
af.write(file_content)
if response.warnings:
sys.stderr.write("\n".join(str(response.warnings)) + "\n")
sys.stderr.flush()
problem = "404 Target host: '%s' is either not reachable or doesn't support the Rally WSAPI" % self.server
else: # might be a 401 No Authentication or 401 The username or password you entered is incorrect.
##
## print(response.status_code)
## print(response.headers)
## print(response.errors)
##
if 'The username or password you entered is incorrect.' in str(response.errors[0]):
problem = "Invalid credentials"
else:
error_blurb = response.errors[0][:80] if response.errors else ""
problem = "%s %s" % (response.status_code, error_blurb)
raise RallyRESTAPIError(problem)
##
## print(" RallyContextHelper.check -> _getUserInfo got the User info request response...")
## print("response resource: %s" % response.resource)
## print("response status code: %s" % response.status_code)
## print("response headers: %s" % response.headers)
## print("response errors: %s" % response.errors)
## print("response warnings: %s" % response.warnings)
## print("response resultCount: %s" % response.resultCount)
## sys.stdout.flush()
##
return response
basic_user_fields = "ObjectID,UserName,DisplayName,FirstName,LastName,Disabled,UserProfile"
try:
timer_start = time.time()
if self.user:
response = self.agent.get('User', fetch=basic_user_fields, query=user_name_query, _disableAugments=True)
else:
response = self.agent.get('User', fetch=basic_user_fields, _disableAugments=True)
timer_stop = time.time()
except Exception as ex:
##
print("-----")
print(str(ex))
##
if str(ex.args[0]).startswith('404 Service unavailable'):
# TODO: discern whether we should mention server or target_host as the culprit
raise RallyRESTAPIError("hostname: '%s' non-existent or unreachable" % self.server)
else:
raise
elapsed = timer_stop - timer_start
if response.status_code != 200:
##
## print("context check response:\n%s\n" % response)
## print("request attempt elapsed time: %6.2f" % elapsed)
##
if response.status_code == 401:
raise RallyRESTAPIError("Invalid credentials")
if response.status_code == 404:
##
## print("response.errors: {0}".format(response.errors[0]))
##
if elapsed >= float(INITIAL_REQUEST_TIME_LIMIT):
print("-----")
print(str(ex))
##
if str(ex.args[0]).startswith('404 Service unavailable'):
# TODO: discern whether we should mention server or target_host as the culprit
raise RallyRESTAPIError("hostname: '%s' non-existent or unreachable" % self.server)
else:
raise
elapsed = timer_stop - timer_start
if response.status_code != 200:
##
## print("context check response:\n%s\n" % response)
## print("request attempt elapsed time: %6.2f" % elapsed)
##
if response.status_code == 401:
raise RallyRESTAPIError("Invalid credentials")
if response.status_code == 404:
##
## print("response.errors: {0}".format(response.errors[0]))
##
if elapsed >= float(INITIAL_REQUEST_TIME_LIMIT):
problem = "Request timed out on attempt to reach %s" % self.server
elif response.errors and 'certificate verify failed' in str(response.errors[0]):
problem = "SSL certificate verification failed"
elif response.errors and 'ProxyError' in str(response.errors[0]):
mo = re.search(r'ProxyError\((.+)\)$', response.errors[0])
problem = mo.groups()[0][:-1]
problem = re.sub(r'NewConnectionError.+>:', '', problem)[:-3]
elif response.errors and 'Max retries exceeded with url' in str(response.errors[0]):
problem = "Target Rally host: '%s' non-existent or unreachable" % self.server
elif response.errors and 'NoneType' in str(response.errors[0]):
workspace=self._currentWorkspace, project=base_path_element,
projectScopeDown=False)
if not result or (result.errors or result.resultCount != 1):
problem = "No such accessible base Project found in the Workspace '%s'" % project_name
raise RallyRESTAPIError(problem)
base_project = result.next()
parent = base_project
project_path = [base_project.Name]
for proj_path_element in proj_path_elements[1:]:
project_path.append(proj_path_element)
criteria = ['Name = "%s"' % proj_path_element , 'Parent = %s' % parent._ref]
result = self.agent.get('Project', fetch="Name,ObjectID,Parent", query=criteria, workspace=self._currentWorkspace, project=parent.ref)
if not result or result.errors or result.resultCount != 1:
problem = "No such accessible Project found: '%s'" % PROJECT_PATH_ELEMENT_SEPARATOR.join(project_path)
raise RallyRESTAPIError(problem)
path_el = result.next()
parent = path_el
if PROJECT_PATH_ELEMENT_SEPARATOR.join(project_path) != project_name:
raise RallyRESTAPIError()
return path_el
self._getDefaults(user_response)
if workspace:
workspaces = self._getSubscriptionWorkspaces(subscription, workspace=workspace, limit=10)
if not workspaces:
problem = "Specified workspace of '%s' either does not exist or the user does not have permission to access that workspace"
raise RallyRESTAPIError(problem % workspace)
if len(workspaces) > 1:
problem = "Multiple workspaces (%d) found with the same name of '%s'. " +\
"You must specify a workspace with a unique name."
raise RallyRESTAPIError(problem % (len(workspaces), workspace))
self._currentWorkspace = workspaces[0].Name
if not workspace and not self._defaultWorkspace:
problem = "No Workspace was specified and there is no DefaultWorkspace setting for the user"
raise RallyRESTAPIError(problem)
if not workspace and self._defaultWorkspace:
workspaces = self._getSubscriptionWorkspaces(subscription, workspace=self._defaultWorkspace, limit=10)
if not self.isolated_workspace:
self._getSubscriptionWorkspaces(subscription, limit=0)
##
## print("ContextHelper _currentWorkspace: %s" % self._currentWorkspace)
## print("ContextHelper _defaultProject: %s" % self._defaultProject)
##
self._getWorkspacesAndProjects(workspace=self._currentWorkspace, project=self._defaultProject)
self._setOperatingContext(project)
schema_info = self.agent.getSchemaInfo(self._currentWorkspace)
processSchemaInfo(self.getWorkspace(), schema_info)