Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# any ref lists for COLLECTION attributes
# into a list of one-key dicts {'_ref' : ref}
payload = json.dumps(item)
if self._log:
self._logDest.write('%s PUT %s\n%27.27s %s\n' % (timestamp(), resource, " ", payload))
self._logDest.flush()
response = self.session.put(full_resource_url, data=payload, headers=RALLY_REST_HEADERS)
response = RallyRESTResponse(self.session, context, resource, response, "shell", 0)
if response.status_code != HTTP_REQUEST_SUCCESS_CODE:
desc = str(response.errors[0])
problem = "%s %s" % (response.status_code, desc)
#print(problem)
if self._log:
self._logDest.write('%s %s\n' % (timestamp(), problem))
self._logDest.flush()
raise RallyRESTAPIError(problem)
item = response.content['CreateResult']['Object']
ref = str(item['_ref'])
item_oid = int(ref.split('/')[-1])
desc = "created %s OID: %s" % (entityName, item_oid)
if self._log:
self._logDest.write('%s %s %s\n' % (timestamp(), response.status_code, desc))
self._logDest.flush()
# now issue a request to get the entity item (mostly so we can get the FormattedID)
# and return it
item = self._itemQuery(entityName, item_oid, workspace=workspace, project=project)
return item
obtain the security token we need to post to Rally, construct the
full url along with the query string containing the workspace ref and the security token.
POST to the resource supplying the update_item "container" and catch any
non success status code returned from the operation.
If the POST and parsing of the response was succesful, return the response instance.
"""
workspace_ref = self.contextHelper.currentWorkspaceRef()
auth_token = self.obtainSecurityToken()
full_resource_url = "%s/%s&workspace=%s&key=%s" % (self.service_url, resource, workspace_ref, auth_token)
payload = json.dumps(update_item)
response = self.session.post(full_resource_url, data=payload, headers=RALLY_REST_HEADERS)
context = self.contextHelper.currentContext()
response = RallyRESTResponse(self.session, context, resource, response, "shell", 0)
if response.status_code != HTTP_REQUEST_SUCCESS_CODE:
problem = 'Unable to update the DragAndDropRank value for the target_artifact %s, %s'
raise RallyRESTAPIError(problem % (target_artifact.FormattedID, response.errors[0]))
return response
def _ensureRankItemSanity(self, target_artifact, reference_artifact=None):
"""
Ranking can only be done for an item that is an Artifact subclass.
If a reference_artifact is supplied, it too must be an Artifact subclass instance.
"""
class_ancestors = [cls.__name__ for cls in target_artifact.__class__.mro()]
target_is_artifact = 'Artifact' in class_ancestors
if not target_is_artifact:
problem = "Unable to change DragAndDropRank for target %s, not an Artifact"
raise RallyRESTAPIError(problem % (target_artifact.__class__.__name__))
if reference_artifact:
class_ancestors = [cls.__name__ for cls in reference_artifact.__class__.mro()]
reference_is_artifact = 'Artifact' in class_ancestors
if not reference_is_artifact:
problem = "Unable to change DragAndDropRank for target %s, reference item is not an Artifact"
raise RallyRESTAPIError(problem % (target_artifact.__class__.__name__))
return target_artifact.__class__.__name__.lower()
## print("Rally._itemQuery('%s', %s, workspace=%s, project=%s)" % (entityName, oid, workspace, project))
##
resource = '%s/%s' % (entityName, oid)
context, augments = self.contextHelper.identifyContext(workspace=workspace, project=project)
if augments:
resource += ("?" + "&".join(augments))
if self._log:
self._logDest.write('%s GET %s\n' % (timestamp(), resource))
self._logDest.flush()
response = self._getResourceByOID(context, entityName, oid)
if self._log:
self._logDest.write('%s %s %s\n' % (timestamp(), response.status_code, resource))
self._logDest.flush()
if not response or response.status_code != HTTP_REQUEST_SUCCESS_CODE:
problem = "Unreferenceable %s OID: %s" % (entityName, oid)
raise RallyRESTAPIError('%s %s' % (response.status_code, problem))
response = RallyRESTResponse(self.session, context, '%s.x' % entityName, response, "full", 1)
item = response.next()
return item # return back an instance representing the item
def getCollection(context, collection_url, **kwargs):
"""
Retrieves a reference in _rallyCache to a Rally instance and uses that to
call its getCollection method.
Returns a RallyRESTResponse instance that has status_code, headers and content attributes.
"""
global _rallyCache
rallyContext = _rallyCache.get(context, None)
if not rallyContext:
ck_matches = [rck for rck in list(_rallyCache.keys()) if rck.identity() == context.identity()]
if ck_matches:
rck = ck_matches.pop()
rallyContext = _rallyCache[rck]
else:
# raising an Exception is the only thing we can do, don't see any prospect of recovery...
raise RallyRESTAPIError('Unable to find Rally instance for context: %s' % context)
rally = rallyContext.get('rally')
response = rally.getCollection(collection_url, **kwargs)
return response
def post(self, entityName, itemData, workspace='current', project='current', **kwargs):
"""
Given a Rally entityName, a dict with data that the entity should be updated with,
issue the REST call and return a representation of updated target entity item.
"""
auth_token = self.obtainSecurityToken()
# see if we need to transform workspace / project values of 'current' to actual
if workspace == 'current':
workspace = self.getWorkspace().Name # just need the Name here
if project == 'current':
project = self.getProject().Name # just need the Name here
entityName = self._officialRallyEntityName(entityName)
if entityName.lower() == 'recyclebinentry':
raise RallyRESTAPIError("update operation unsupported for RecycleBinEntry")
oid = itemData.get('ObjectID', None)
if not oid:
formattedID = itemData.get('FormattedID', None)
if not formattedID:
raise RallyRESTAPIError('An identifying field (ObjectID or FormattedID) must be specified')
fmtIdQuery = 'FormattedID = "%s"' % formattedID
response = self.get(entityName, fetch="ObjectID", query=fmtIdQuery,
workspace=workspace, project=project)
if response.status_code != HTTP_REQUEST_SUCCESS_CODE or response.resultCount == 0:
raise RallyRESTAPIError('Target %s %s could not be located' % (entityName, formattedID))
target = response.next()
oid = target.ObjectID
itemData['ObjectID'] = oid
auth_token = self.obtainSecurityToken()
# see if we need to transform workspace / project values of 'current' to actual
if workspace == 'current':
workspace = self.getWorkspace().Name # just need the Name here
if project == 'current':
project = self.getProject().Name # just need the Name here
entityName = self._officialRallyEntityName(entityName)
if entityName.lower() == 'recyclebinentry':
raise RallyRESTAPIError("update operation unsupported for RecycleBinEntry")
oid = itemData.get('ObjectID', None)
if not oid:
formattedID = itemData.get('FormattedID', None)
if not formattedID:
raise RallyRESTAPIError('An identifying field (ObjectID or FormattedID) must be specified')
fmtIdQuery = 'FormattedID = "%s"' % formattedID
response = self.get(entityName, fetch="ObjectID", query=fmtIdQuery,
workspace=workspace, project=project)
if response.status_code != HTTP_REQUEST_SUCCESS_CODE or response.resultCount == 0:
raise RallyRESTAPIError('Target %s %s could not be located' % (entityName, formattedID))
target = response.next()
oid = target.ObjectID
itemData['ObjectID'] = oid
resource = '%s/%s?key=%s' % (entityName.lower(), oid, auth_token)
context, augments = self.contextHelper.identifyContext(workspace=workspace, project=project)
if augments:
resource += ("&" + "&".join(augments))
full_resource_url = "%s/%s" % (self.service_url, resource)
itemData = self.validateAttributeNames(entityName, itemData)
resource = '%s/%s?key=%s' % (entityName.lower(), oid, auth_token)
context, augments = self.contextHelper.identifyContext(workspace=workspace, project=project)
if augments:
resource += ("&" + "&".join(augments))
full_resource_url = "%s/%s" % (self.service_url, resource)
itemData = self.validateAttributeNames(entityName, itemData)
item = {entityName: self._greased(itemData)}
payload = json.dumps(item)
if self._log:
self._logDest.write('%s POST %s\n%27.27s %s\n' % (timestamp(), resource, " ", item))
self._logDest.flush()
response = self.session.post(full_resource_url, data=payload, headers=RALLY_REST_HEADERS)
response = RallyRESTResponse(self.session, context, resource, response, "shell", 0)
if response.status_code != HTTP_REQUEST_SUCCESS_CODE:
raise RallyRESTAPIError('Unable to update the %s' % entityName)
# now issue a request to get the entity item (mostly so we can get the FormattedID)
# and return it
item = self._itemQuery(entityName, oid, workspace=workspace, project=project)
return item
def addCollectionItems(self, target, items):
"""
Given a target which is a hydrated RallyEntity instance having a valid _type
and a a list of hydrated Rally Entity instances (items)
all of the same _type, construct a valid AC WSAPI collection url and
issue a POST request to that URL supplying the item refs in an appropriate
JSON structure as the payload.
"""
if not items: return None
auth_token = self.obtainSecurityToken()
target_type = target._type
item_types = [item._type for item in items]
first_item_type = item_types[0]
outliers = [item_type for item_type in item_types if item_type != first_item_type]
if outliers:
raise RallyRESTAPIError("addCollectionItems: all items must be of the same type")
resource = "%s/%s/%ss/add" % (target_type, target.oid, first_item_type)
collection_url = '%s/%s?fetch=Name&key=%s' % (self.service_url, resource, auth_token)
payload = {"CollectionItems":[{'_ref' : "%s/%s" % (str(item._type), str(item.oid))}
for item in items]}
response = self.session.post(collection_url, data=json.dumps(payload), headers=RALLY_REST_HEADERS)
context = self.contextHelper.currentContext()
response = RallyRESTResponse(self.session, context, resource, response, "shell", 0)
added_items = [str(item[u'Name']) for item in response.data[u'Results']]
return added_items