Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
context, augments = self.contextHelper.identifyContext(workspace=workspace, project=project)
if augments:
resource += ("&" + "&".join(augments))
full_resource_url = "%s/%s" % (self.service_url, resource)
itemData = self.validateAttributeNames(entityName, itemData)
item = {entityName: self._greased(itemData)} # where _greased is a convenience
# method that will transform
# any ref lists for COLLECTION attributes
# into a list of one-key dicts {'_ref' : ref}
payload = json.dumps(item)
if self._log:
self._logDest.write('%s PUT %s\n%27.27s %s\n' % (timestamp(), resource, " ", payload))
self._logDest.flush()
response = self.session.put(full_resource_url, data=payload, headers=RALLY_REST_HEADERS)
response = RallyRESTResponse(self.session, context, resource, response, "shell", 0)
if response.status_code != HTTP_REQUEST_SUCCESS_CODE:
desc = str(response.errors[0])
problem = "%s %s" % (response.status_code, desc)
#print(problem)
if self._log:
self._logDest.write('%s %s\n' % (timestamp(), problem))
self._logDest.flush()
raise RallyRESTAPIError(problem)
item = response.content['CreateResult']['Object']
ref = str(item['_ref'])
item_oid = int(ref.split('/')[-1])
desc = "created %s OID: %s" % (entityName, item_oid)
if self._log:
self._logDest.write('%s %s %s\n' % (timestamp(), response.status_code, desc))
self._logDest.flush()
(timestamp(), response.status_code, response.content[:56]))
self._logDest.flush()
##
## if kwargs.get('debug', False):
## print(response.status_code, response.headers, response.content)
##
errorResponse = ErrorResponse(response.status_code, response.content)
response = RallyRESTResponse(self.session, context, resource, errorResponse, self.hydration, 0)
problem = "ERRORS: %s\nWARNINGS: %s\n" % ("\n".join(response.errors),
"\n".join(response.warnings))
raise RallyRESTAPIError(problem)
##
## print(response.content)
##
response = RallyRESTResponse(self.session, context, resource, response, "shell", 0)
if response.errors:
status = False
desc = response.errors[0]
else:
status = True
desc = '%s deleted' % entityName
if self._log:
self._logDest.write('%s %s %s\n' % (timestamp(), response.status_code, desc))
self._logDest.flush()
return status
oid = target.ObjectID
itemData['ObjectID'] = oid
resource = '%s/%s?key=%s' % (entityName.lower(), oid, auth_token)
context, augments = self.contextHelper.identifyContext(workspace=workspace, project=project)
if augments:
resource += ("&" + "&".join(augments))
full_resource_url = "%s/%s" % (self.service_url, resource)
itemData = self.validateAttributeNames(entityName, itemData)
item = {entityName: self._greased(itemData)}
payload = json.dumps(item)
if self._log:
self._logDest.write('%s POST %s\n%27.27s %s\n' % (timestamp(), resource, " ", item))
self._logDest.flush()
response = self.session.post(full_resource_url, data=payload, headers=RALLY_REST_HEADERS)
response = RallyRESTResponse(self.session, context, resource, response, "shell", 0)
if response.status_code != HTTP_REQUEST_SUCCESS_CODE:
raise RallyRESTAPIError('Unable to update the %s' % entityName)
# now issue a request to get the entity item (mostly so we can get the FormattedID)
# and return it
item = self._itemQuery(entityName, oid, workspace=workspace, project=project)
return item
"LastPasswordUpdateDate", "Disabled",
"Subscription", "SubscriptionAdmin",
"Role", "UserPermissions", "TeamMemberships",
"UserProfile",
"TimeZone", # a UserProfile attribute
# and other UserProfile attributes
]
user_inclusion = "((Disabled = true) OR (Disabled = false))"
users_resource = 'users?fetch=%s&query=%s&pagesize=%s&start=1&workspace=%s' % \
(",".join(user_attrs), user_inclusion, MAX_PAGESIZE, workspace_ref)
full_resource_url = '%s/%s' % (self.service_url, users_resource)
response = self.session.get(full_resource_url, timeout=SERVICE_REQUEST_TIMEOUT)
if response.status_code != HTTP_REQUEST_SUCCESS_CODE:
return []
response = RallyRESTResponse(self.session, context, users_resource, response, "full", 0)
users = [user for user in response]
# find the operator of this instance of Rally and short-circuit now if they *aren't* a SubscriptionAdmin
operator = [user for user in users if user.UserName == self.user]
if not operator or len(operator) == 0 or not operator[0].SubscriptionAdmin:
self.setWorkspace(saved_workspace_name)
return users
user_profile_resource = 'userprofile?fetch=true&query=&pagesize=%s&start=1&workspace=%s' % (MAX_PAGESIZE, workspace_ref)
response = self.session.get('%s/%s' % (self.service_url, user_profile_resource),
timeout=SERVICE_REQUEST_TIMEOUT)
if response.status_code != HTTP_REQUEST_SUCCESS_CODE:
warning("Unable to retrieve UserProfile information for users")
profiles = []
else:
response = RallyRESTResponse(self.session, context, user_profile_resource, response, "full", 0)
if not rallyContext:
# raising an Exception is the only thing we can do, don't see any prospect of recovery...
raise RallyRESTAPIError('Unable to find Rally instance for context: %s' % context)
##
## print("_rallyCache.keys:")
## for key in _rallyCache.keys():
## print(" -->%s<--" % key)
## print("")
## print(" apparently no key to match: -->%s<--" % context)
## print(" context is a %s" % type(context))
##
rally = rallyContext.get('rally')
resp = rally._getResourceByOID(context, entity, oid, **kwargs)
if 'unwrap' not in kwargs or not kwargs.get('unwrap', False):
return resp
response = RallyRESTResponse(rally.session, context, "%s.x" % entity, resp, "full", 1)
return response
# 'Errors', 'Warnings', 'Results'
response = self.session.get(request_url, timeout=SERVICE_REQUEST_TIMEOUT)
except Exception as ex:
if response:
##
## print("Exception detected for session.get requests, response status code: %s" % response.status_code)
##
ret_code, content = response.status_code, response.content
else:
ret_code, content = PAGE_NOT_FOUND_CODE, str(ex.args[0])
if self._log:
self._logDest.write('%s %s\n' % (timestamp(), ret_code))
self._logDest.flush()
errorResponse = ErrorResponse(ret_code, content)
response = RallyRESTResponse(self.session, context, request_url, errorResponse, self.hydration, 0)
return response
##
## print("response.status_code is %s" % response.status_code)
##
if response.status_code != HTTP_REQUEST_SUCCESS_CODE:
if self._log:
code, verbiage = response.status_code, response.content[:56]
self._logDest.write('%s %s %s ...\n' % (timestamp(), code, verbiage))
self._logDest.flush()
##
## print(response)
##
#if response.status_code == PAGE_NOT_FOUND_CODE:
# problem = "%s Service unavailable from %s, check for proper hostname" % \
# (response.status_code, self.service_url)
"""
Given an Rally target Artifact and a resource URI (sans the self.service_url prefix)
and a dict that serves as a "container" for the target item's _ref value,
obtain the security token we need to post to Rally, construct the
full url along with the query string containing the workspace ref and the security token.
POST to the resource supplying the update_item "container" and catch any
non success status code returned from the operation.
If the POST and parsing of the response was succesful, return the response instance.
"""
workspace_ref = self.contextHelper.currentWorkspaceRef()
auth_token = self.obtainSecurityToken()
full_resource_url = "%s/%s&workspace=%s&key=%s" % (self.service_url, resource, workspace_ref, auth_token)
payload = json.dumps(update_item)
response = self.session.post(full_resource_url, data=payload, headers=RALLY_REST_HEADERS)
context = self.contextHelper.currentContext()
response = RallyRESTResponse(self.session, context, resource, response, "shell", 0)
if response.status_code != HTTP_REQUEST_SUCCESS_CODE:
problem = 'Unable to update the DragAndDropRank value for the target_artifact %s, %s'
raise RallyRESTAPIError(problem % (target_artifact.FormattedID, response.errors[0]))
return response
and a items which is a list of hydrated Rally Entity instances
all of the same _type, construct a valid AC WSAPI collection url and
issue a POST request to that URL supplying the item refs in an appropriate
JSON structure as the payload.
"""
if not items: return None
auth_token = self.obtainSecurityToken()
target_type = target._type
item_type = items[0]._type
resource = "%s/%s/%ss/remove" % (target_type, target.oid, item_type)
collection_url = '%s/%s?key=%s' % (self.service_url, resource, auth_token)
payload = {"CollectionItems":[{'_ref' : "%s/%s" % (str(item._type), str(item.oid))}
for item in items]}
response = self.session.post(collection_url, data=json.dumps(payload), headers=RALLY_REST_HEADERS)
context = self.contextHelper.currentContext()
response = RallyRESTResponse(self.session, context, resource, response, "shell", 0)
return response
if not items: return None
auth_token = self.obtainSecurityToken()
target_type = target._type
item_types = [item._type for item in items]
first_item_type = item_types[0]
outliers = [item_type for item_type in item_types if item_type != first_item_type]
if outliers:
raise RallyRESTAPIError("addCollectionItems: all items must be of the same type")
resource = "%s/%s/%ss/add" % (target_type, target.oid, first_item_type)
collection_url = '%s/%s?fetch=Name&key=%s' % (self.service_url, resource, auth_token)
payload = {"CollectionItems":[{'_ref' : "%s/%s" % (str(item._type), str(item.oid))}
for item in items]}
response = self.session.post(collection_url, data=json.dumps(payload), headers=RALLY_REST_HEADERS)
context = self.contextHelper.currentContext()
response = RallyRESTResponse(self.session, context, resource, response, "shell", 0)
added_items = [str(item[u'Name']) for item in response.data[u'Results']]
return added_items
users = [user for user in response]
# find the operator of this instance of Rally and short-circuit now if they *aren't* a SubscriptionAdmin
operator = [user for user in users if user.UserName == self.user]
if not operator or len(operator) == 0 or not operator[0].SubscriptionAdmin:
self.setWorkspace(saved_workspace_name)
return users
user_profile_resource = 'userprofile?fetch=true&query=&pagesize=%s&start=1&workspace=%s' % (MAX_PAGESIZE, workspace_ref)
response = self.session.get('%s/%s' % (self.service_url, user_profile_resource),
timeout=SERVICE_REQUEST_TIMEOUT)
if response.status_code != HTTP_REQUEST_SUCCESS_CODE:
warning("Unable to retrieve UserProfile information for users")
profiles = []
else:
response = RallyRESTResponse(self.session, context, user_profile_resource, response, "full", 0)
profiles = [profile for profile in response]
# do our own brute force "join" operation on User to UserProfile info
for user in users:
# get any matching user profiles (aka mups), there really should only be 1 matching...
mups = [prof for prof in profiles
if hasattr(user, 'UserProfile') and prof._ref == user.UserProfile._ref]
if not mups:
up = user.UserProfile if hasattr(user, 'UserProfile') else "Unknown"
problem = "unable to find a matching UserProfile record for User: %s UserProfile: %s"
#warning("%s" % (problem % (user.UserName, up)))
continue
else:
if len(mups) > 1:
anomaly = "Found %d UserProfile items associated with username: %s"
warning("%s" % (anomaly % (len(mups), user.UserName)))