Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_global_account_limit(self, account, rse_expression):
"""
List the account limit for the specific RSE expression.
:param account: The account name.
:param rse_expression: The rse expression.
"""
path = '/'.join([self.ACCOUNTS_BASEURL, account, 'limits', 'global', quote_plus(rse_expression)])
url = build_url(choice(self.list_hosts), path=path)
res = self._send_request(url, type='GET')
if res.status_code == codes.ok:
return next(self._load_json_data(res))
exc_cls, exc_msg = self._get_exception(headers=res.headers, status_code=res.status_code, data=res.content)
raise exc_cls(exc_msg)
:returns: True if success.
:raises RSEProtocolNotSupported: if no matching protocol entry could be found.
:raises RSENotFound: if the RSE doesn't exist.
:raises KeyNotFound: if invalid data was provided for update.
:raises AccessDenied: if not authorized.
"""
path = [self.RSE_BASEURL, rse, 'protocols', scheme]
if hostname:
path.append(hostname)
if port:
path.append(str(port))
path = '/'.join(path)
url = build_url(choice(self.list_hosts), path=path)
r = self._send_request(url, type='PUT', data=dumps(data))
if r.status_code == codes.ok:
return True
else:
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
:param rse: the RSE name.
:param files: The list of files. This is a list of DIDs like :
[{'scope': , 'name': , 'state': }, {'scope': , 'name': , 'state': }, ...],
where a state value can be either of:
'A' (available)
'S' (suspicious)
'U' (unavailable)
'R' (recovered)
'B' (bad)
'L' (lost)
'D' (deleted)
:return: True if replica states have been updated successfully, otherwise an exception is raised.
"""
url = build_url(choice(self.list_hosts), path=self.REPLICAS_BASEURL)
data = {'rse': rse, 'files': files}
r = self._send_request(url, type='PUT', data=render_json(**data))
if r.status_code == codes.ok:
return True
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def list_account_attributes(self, account):
"""
List the attributes for an account.
:param account: The account name.
"""
path = '/'.join([self.ACCOUNTS_BASEURL, account, 'attr/'])
url = build_url(choice(self.list_hosts), path=path)
res = self._send_request(url, type='GET')
if res.status_code == codes.ok:
return self._load_json_data(res)
else:
exc_cls, exc_msg = self._get_exception(headers=res.headers, status_code=res.status_code, data=res.content)
raise exc_cls(exc_msg)
def export_data(self):
"""
Export data.
:returns: A dict containing data
"""
path = '/'.join([self.EXPORT_BASEURL])
url = build_url(choice(self.list_hosts), path=path)
r = self._send_request(url, type='GET')
if r.status_code == codes.ok:
return parse_response(r.text)
else:
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
:type filter: Dict
:param replication_rules: Replication rules to be set : Dictionary with keys copies, rse_expression, weight, rse_expression
:type replication_rules: Dict
:param comments: Comments for the subscription
:type comments: String
:param lifetime: Subscription's lifetime (days); False if subscription has no lifetime
:type lifetime: Integer or False
:param retroactive: Flag to know if the subscription should be applied on previous data
:type retroactive: Boolean
:param dry_run: Just print the subscriptions actions without actually executing them (Useful if retroactive flag is set)
:type dry_run: Boolean
:param priority: The priority of the subscription (3 by default)
:type priority: Integer
"""
path = self.SUB_BASEURL + '/' + account + '/' + name
url = build_url(choice(self.list_hosts), path=path)
if retroactive:
raise NotImplementedError('Retroactive mode is not implemented')
if filter and not isinstance(filter, dict):
raise TypeError('filter should be a dict')
if replication_rules and not isinstance(replication_rules, list):
raise TypeError('replication_rules should be a list')
data = dumps({'options': {'filter': filter, 'replication_rules': replication_rules, 'comments': comments,
'lifetime': lifetime, 'retroactive': retroactive, 'dry_run': dry_run, 'priority': priority}})
result = self._send_request(url, type='POST', data=data)
if result.status_code == codes.created: # pylint: disable=no-member
return result.text
else:
exc_cls, exc_msg = self._get_exception(headers=result.headers, status_code=result.status_code, data=result.content)
raise exc_cls(exc_msg)
write: integer representing the priority of this procotol for write operations (default = -1)
delete: integer representing the priority of this procotol for delete operations (default = -1)
extended_attributes: miscellaneous protocol specific information e.g. spacetoken for SRM (default = None)
:return: True if protocol was created successfully else False.
:raises Duplicate: if protocol with same hostname, port and protocol identifier
already exists for the given RSE.
:raises RSENotFound: if the RSE doesn't exist.
:raises KeyNotFound: if params is missing manadtory attributes to create the
protocol.
:raises AccessDenied: if not authorized.
"""
scheme = params['scheme']
path = '/'.join([self.RSE_BASEURL, rse, 'protocols', scheme])
url = build_url(choice(self.list_hosts), path=path)
r = self._send_request(url, type='POST', data=dumps(params))
if r.status_code == codes.created:
return True
else:
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def list_exceptions(self, exception_id=None, states=None):
"""
List exceptions to Lifetime Model.
:param id: The id of the exception
:param states: The states to filter
"""
path = self.LIFETIME_BASEURL + '/'
params = {}
if exception_id:
params['exception_id'] = exception_id
if states:
params['states'] = exception_id
url = build_url(choice(self.list_hosts), path=path, params=params)
result = self._send_request(url)
if result.status_code == codes.ok:
lifetime_exceptions = self._load_json_data(result)
return lifetime_exceptions
else:
exc_cls, exc_msg = self._get_exception(headers=result.headers, status_code=result.status_code)
raise exc_cls(exc_msg)
def list_archive_content(self, scope, name):
"""
List archive contents.
:param scope: The scope name.
:param name: The data identifier name.
"""
path = '/'.join([self.ARCHIVES_BASEURL, quote_plus(scope), quote_plus(name), 'files'])
url = build_url(choice(self.list_hosts), path=path)
r = self._send_request(url, type='GET')
if r.status_code == codes.ok:
return self._load_json_data(r)
exc_cls, exc_msg = self._get_exception(headers=r.headers, status_code=r.status_code, data=r.content)
raise exc_cls(exc_msg)
def get_rse_limits(self, rse):
"""
Get RSE limits.
:param rse: The RSE name.
:returns: True if successful, otherwise false.
"""
path = [self.RSE_BASEURL, rse, 'limits']
path = '/'.join(path)
url = build_url(choice(self.list_hosts), path=path)
r = self._send_request(url, type='GET')
if r.status_code == codes.ok:
return next(self._load_json_data(r))
exc_cls, exc_msg = self._get_exception(headers=r.headers,
status_code=r.status_code,
data=r.content)
raise exc_cls(exc_msg)