Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def delete_registry_bucket(bucket):
"""
Removes a bucket from the NiFi Registry
Args:
bucket (Bucket): the Bucket object to remove
Returns:
(Bucket): The updated Bucket object
"""
try:
return nipyapi.registry.BucketsApi().delete_bucket(
bucket_id=bucket.identifier
)
except (nipyapi.registry.rest.ApiException, AttributeError) as e:
raise ValueError(e)
msg = "{0}\n{1}".format(type(e).__name__, str(e))
raise ApiException(status=0, reason=msg)
if _preload_content:
r = RESTResponse(r)
# In the python 3, the response.data is bytes.
# we need to decode it to string.
if PY3:
r.data = r.data.decode('utf8')
# log response body
logger.debug("response body: %s", r.data)
if not 200 <= r.status <= 299:
raise ApiException(http_resp=r)
return r
headers=headers)
else:
# Cannot generate the request from given parameters
msg = """Cannot prepare a request message for provided arguments.
Please check that your arguments match declared content type."""
raise ApiException(status=0, reason=msg)
# For `GET`, `HEAD`
else:
r = self.pool_manager.request(method, url,
fields=query_params,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
except urllib3.exceptions.SSLError as e:
msg = "{0}\n{1}".format(type(e).__name__, str(e))
raise ApiException(status=0, reason=msg)
if _preload_content:
r = RESTResponse(r)
# In the python 3, the response.data is bytes.
# we need to decode it to string.
if PY3:
r.data = r.data.decode('utf8')
# log response body
logger.debug("response body: %s", r.data)
if not 200 <= r.status <= 299:
raise ApiException(http_resp=r)
return r
headers=headers)
# Pass a `string` parameter directly in the body to support
# other content types than Json when `body` argument is provided
# in serialized form
elif isinstance(body, str):
request_body = body
r = self.pool_manager.request(method, url,
body=request_body,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
else:
# Cannot generate the request from given parameters
msg = """Cannot prepare a request message for provided arguments.
Please check that your arguments match declared content type."""
raise ApiException(status=0, reason=msg)
# For `GET`, `HEAD`
else:
r = self.pool_manager.request(method, url,
fields=query_params,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
except urllib3.exceptions.SSLError as e:
msg = "{0}\n{1}".format(type(e).__name__, str(e))
raise ApiException(status=0, reason=msg)
if _preload_content:
r = RESTResponse(r)
# In the python 3, the response.data is bytes.
# we need to decode it to string.
identity=identity
)
else:
# must be nifi
user_obj = nipyapi.nifi.UserEntity(
revision=nipyapi.nifi.RevisionDTO(
version=0
),
component=nipyapi.nifi.UserDTO(
identity=identity
)
)
try:
return getattr(nipyapi, service).TenantsApi().create_user(user_obj)
except (nipyapi.nifi.rest.ApiException,
nipyapi.registry.rest.ApiException) as e:
if 'already exists' in e.body and not strict:
return get_service_user(identity, service=service)
raise ValueError(e.body)
def __deserialize_date(self, string):
"""
Deserializes string to date.
:param string: str.
:return: date.
"""
try:
from dateutil.parser import parse
return parse(string).date()
except ImportError:
return string
except ValueError:
raise ApiException(
status=0,
reason="Failed to parse `{0}` into a date object".format(string)
)