Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_partitionmanager_create(self, input_props, exp_prop_names,
exp_exc):
"""Test PartitionManager.create()."""
partition_mgr = self.cpc.partitions
if exp_exc is not None:
with pytest.raises(exp_exc.__class__) as exc_info:
# Execute the code to be tested
partition = partition_mgr.create(properties=input_props)
exc = exc_info.value
if isinstance(exp_exc, HTTPError):
assert exc.http_status == exp_exc.http_status
assert exc.reason == exp_exc.reason
else:
# Execute the code to be tested.
# Note: the Partition object returned by Partition.create() has
# the input properties plus 'object-uri'.
partition = partition_mgr.create(properties=input_props)
# Check the resource for consistency within itself
assert isinstance(partition, Partition)
partition_name = partition.name
exp_partition_name = partition.properties['name']
assert partition_name == exp_partition_name
partition_uri = partition.uri
('starting', HTTPError({'http-status': 409, 'reason': 1})),
('active', None),
('stopping', HTTPError({'http-status': 409, 'reason': 1})),
('degraded', None),
('reservation-error', None),
('paused', None),
]
)
def test_hba_reassign_port(self, initial_partition_status, exp_exc):
"""Test Hba.reassign_port()."""
# Add a faked HBA to be tested.
# Its port points to a faked URI.
faked_hba = self.add_hba1()
# Add a faked FCP with one port that the HBA will be reassigned to
faked_adapter = self.faked_cpc.adapters.add({
from zhmcclient_mock import FakedSession
from tests.common.utils import assert_resources
# Object IDs and names of our faked CPCs:
CPC1_NAME = 'cpc 1' # z13s in DPM mode
CPC1_OID = 'cpc1-oid'
CPC1_MAX_CRYPTO_DOMAINS = 40 # Crypto Express5S on a z13s
CPC2_NAME = 'cpc 2' # z13s in classic mode
CPC2_OID = 'cpc2-oid'
CPC3_NAME = 'cpc 3' # zEC12
CPC3_OID = 'cpc3-oid'
CPC4_NAME = 'cpc 4' # z14-ZR1 in DPM mode
CPC4_OID = 'cpc4-oid'
HTTPError_404_1 = HTTPError({'http-status': 404, 'reason': 1})
HTTPError_409_5 = HTTPError({'http-status': 409, 'reason': 5})
HTTPError_409_4 = HTTPError({'http-status': 409, 'reason': 4})
HTTPError_400_7 = HTTPError({'http-status': 400, 'reason': 7})
# Names of our faked crypto adapters:
CRYPTO1_NAME = 'crypto 1'
CRYPTO2_NAME = 'crypto 2'
CPC1_UNUSED_CRYPTO_DOMAINS = list(range(4, CPC1_MAX_CRYPTO_DOMAINS))
GET_FREE_CRYPTO_DOMAINS_ENVIRONMENTS = {
'env0-example': {
'desc': "The example from the description of method "
"Cpc.get_free_crypto_domains()",
'cpc_name': CPC1_NAME,
'adapter_names': [
'not-operating', HTTPError({'http-status': 500, 'reason': 263})),
('operating', dict(force=False),
'not-operating', HTTPError({'http-status': 500, 'reason': 263})),
('operating', dict(force=True),
'not-operating', None),
('exceptions', dict(force=False),
'not-operating', None),
('exceptions', dict(force=True),
'not-operating', None),
('not-activated', dict(),
'exceptions', StatusTimeout(None, None, None, None)),
('not-activated', dict(allow_status_exceptions=False),
'exceptions', StatusTimeout(None, None, None, None)),
('not-activated', dict(allow_status_exceptions=True),
'exceptions', None),
job = Job(session, self.job_uri, op_method, op_uri)
query_job_status_result = {
'status': 'complete',
'job-status-code': 500,
'job-reason-code': 42,
'job-results': {
# Content is not documented for the error case.
# Some failures result in an 'message' field.
'message': 'bla message',
},
}
m.get(self.job_uri, json=query_job_status_result)
m.delete(self.job_uri, status_code=204)
with pytest.raises(HTTPError) as exc_info:
job_status, op_result = job.check_for_completion()
exc = exc_info.value
assert exc.http_status == 500
assert exc.reason == 42
assert exc.message == 'bla message'
def test_httperror_initial_attrs(self, arg_names, args):
"""Test initial attributes of HTTPError."""
body = args[0]
posargs, kwargs = func_args(args, arg_names)
# Execute the code to be tested
exc = HTTPError(*posargs, **kwargs)
assert isinstance(exc, Error)
assert len(exc.args) == 1
assert exc.args[0] == body.get('message', None)
assert exc.http_status == body.get('http-status', None)
assert exc.reason == body.get('reason', None)
assert exc.message == body.get('message', None)
assert exc.request_method == body.get('request-method', None)
assert exc.request_uri == body.get('request-uri', None)
assert exc.request_query_parms == body.get('request-query-parms', None)
assert exc.request_headers == body.get('request-headers', None)
assert exc.request_authenticated_as == \
body.get('request-authenticated-as', None)
assert exc.request_body == body.get('request-body', None)
assert exc.request_body_as_string == \
body.get('request-body-as-string', None)
from tests.common.utils import assert_resources
# Object IDs and names of our faked CPCs:
CPC1_NAME = 'cpc 1' # z13s in DPM mode
CPC1_OID = 'cpc1-oid'
CPC1_MAX_CRYPTO_DOMAINS = 40 # Crypto Express5S on a z13s
CPC2_NAME = 'cpc 2' # z13s in classic mode
CPC2_OID = 'cpc2-oid'
CPC3_NAME = 'cpc 3' # zEC12
CPC3_OID = 'cpc3-oid'
CPC4_NAME = 'cpc 4' # z14-ZR1 in DPM mode
CPC4_OID = 'cpc4-oid'
HTTPError_404_1 = HTTPError({'http-status': 404, 'reason': 1})
HTTPError_409_5 = HTTPError({'http-status': 409, 'reason': 5})
HTTPError_409_4 = HTTPError({'http-status': 409, 'reason': 4})
HTTPError_400_7 = HTTPError({'http-status': 400, 'reason': 7})
# Names of our faked crypto adapters:
CRYPTO1_NAME = 'crypto 1'
CRYPTO2_NAME = 'crypto 2'
CPC1_UNUSED_CRYPTO_DOMAINS = list(range(4, CPC1_MAX_CRYPTO_DOMAINS))
GET_FREE_CRYPTO_DOMAINS_ENVIRONMENTS = {
'env0-example': {
'desc': "The example from the description of method "
"Cpc.get_free_crypto_domains()",
'cpc_name': CPC1_NAME,
'adapter_names': [
CRYPTO1_NAME,
get_status_mock.return_value = act_exp_status
if exp_excs:
with pytest.raises(Exception) as exc_info:
# Execute the code to be tested
lpar.activate(**input_kwargs)
exc = exc_info.value
exp_exc_classes = [e.__class__ for e in exp_excs]
assert isinstance(exc, tuple(exp_exc_classes))
if isinstance(exc, HTTPError):
exp_httperror = [e for e in exp_excs
if isinstance(e, HTTPError)][0]
assert exc.http_status == exp_httperror.http_status
assert exc.reason == exp_httperror.reason
else:
# Execute the code to be tested.
ret = lpar.activate(**input_kwargs)
assert ret is None
lpar.pull_full_properties()
status = lpar.get_property('status')
assert status == act_exp_status
(sg.name, sg.get_property('type'), sg.get_property('shared'),
sg.get_property('fulfillment-state'), part_names_str))
try:
volumes = sg.storage_volumes.list()
except zhmcclient.HTTPError as exc:
print("Error listing storage volumes of storage group %s:\n"
"HTTPError: %s" % (sg.name, exc))
volumes = []
print(" Storage Volumes: %s" % len(volumes))
if sg.get_property('type') == 'fcp':
try:
vsrs = sg.virtual_storage_resources.list()
except zhmcclient.HTTPError as exc:
print("Error listing virtual storage resources of storage group %s:\n"
"HTTPError: %s" % (sg.name, exc))
vsrs = []
for vsr in vsrs:
port = vsr.adapter_port
adapter = port.manager.parent
print(" Virtual Storage Resource: %s (devno: %s, "
"adapter.port: %s.%s, attached to partition: %s)" %
(vsr.name, vsr.get_property('device-number'),
adapter.name, port.name, vsr.attached_partition.name))
else:
print(" No Virtual Storage Resources")
session.logoff()
if timestats:
does not make sense to first log on.
Because this is a faked HMC, this does not perform a real logon,
but it is still used to update the state in the faked HMC.
Raises:
:exc:`~zhmcclient.HTTPError`
:exc:`~zhmcclient.ParseError` (not implemented)
:exc:`~zhmcclient.AuthError` (not implemented)
:exc:`~zhmcclient.ConnectionError`
"""
try:
self._urihandler.delete(self._hmc, uri, logon_required)
except HTTPError as exc:
raise zhmcclient.HTTPError(exc.response())
except ConnectionError as exc:
raise zhmcclient.ConnectionError(exc.message, None)