Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_fail_missing_resource(target):
with pytest.raises(NoResourceFoundError):
SSHDriver(target, "ssh")
result = self.handler()
if result is not None:
self.log.warning("unexpected return value from handler: %s",
repr(result))
self.log.info("completed handler")
if self.initial_resource:
self.log.info("waiting until %s is unavailable",
self.initial_resource.display_name)
while True:
self.target.update_resources()
if not self.initial_resource.avail:
break
sleep(0.25)
except NoResourceFoundError as e:
if e.filter and len(e.filter) > 1:
self.log.warning("resources %s not found, restarting",
e.filter)
elif e.filter:
self.log.warning("resource %s not found, restarting",
next(iter(e.filter)))
else:
self.log.warning("resource not found, restarting")
except Exception: # pylint: disable=broad-except
self.log.exception("handler failed")
return False
if self.args.once:
self.log.info("stopping handler (--once)")
return False
if timeout is None:
timeout = Timeout(max(resource.get_managed_parent().timeout for resource in waiting))
else:
timeout = Timeout(timeout)
while waiting and not timeout.expired:
waiting = set(r for r in waiting if r.avail != avail)
for r in waiting:
r.poll()
if not any(r for r in waiting if r.avail == avail):
# sleep if no progress
sleep(0.5)
if waiting:
raise NoResourceFoundError(
"Not all resources are {}: {}".format(
"available" if avail else "unavailable", waiting),
filter=waiting
)
self.update_resources()
for res in self.resources:
if not isinstance(res, cls):
continue
if name and res.name != name:
other_names.append(res.name)
continue
found.append(res)
if not found:
name_msg = " named '{}'".format(name) if name else ""
if other_names:
raise NoResourceFoundError(
"no {cls} resource{name} found in {target}, matching resources with other names: {other_names}".format( # pylint: disable=line-too-long
cls=cls, name=name_msg, target=self, other_names=other_names)
)
raise NoResourceFoundError(
"no {cls} resource{name} found in {target}".format(
cls=cls, name=name_msg, target=self)
)
elif len(found) > 1:
raise NoResourceFoundError(
"multiple resources matching {cls} found in {target}".format(cls=cls, target=self)
)
if wait_avail:
self.await_resources(found)
return found[0]
def await_resources(self, resources, timeout=None, avail=True):
"""
Poll the given resources and wait until they are (un-)available.
Args:
resources (List): the resources to poll
timeout (float): optional timeout
avail (bool): optionally wait until the resources are unavailable with avail=False
"""
self.update_resources()
waiting = set(r for r in resources if r.avail != avail)
static = set(r for r in waiting if r.get_managed_parent() is None)
if static:
raise NoResourceFoundError("Static resources are not {}: {}".format(
"available" if avail else "unavailable", static))
if not waiting:
return
if timeout is None:
timeout = Timeout(max(resource.get_managed_parent().timeout for resource in waiting))
else:
timeout = Timeout(timeout)
while waiting and not timeout.expired:
waiting = set(r for r in waiting if r.avail != avail)
for r in waiting:
r.poll()
if not any(r for r in waiting if r.avail == avail):
# sleep if no progress
def _get_ip(self, place):
target = self._get_target(place)
from ..resource import EthernetPort, NetworkService
try:
resource = target.get_resource(EthernetPort)
except NoResourceFoundError:
resource = target.get_resource(NetworkService)
return resource.address
matches = []
for details in resource.extra.get('macs').values():
ips = details.get('ips', [])
if not ips:
continue
matches.append((details['timestamp'], ips))
matches.sort()
newest = matches[-1][1]
if len(ips) > 1:
print("multiple IPs found: {}".format(ips))
return None
return newest[0]