Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_empty_registry_produces_no_credentials():
vault = Vault()
registry = OperatorRegistry()
await authenticate(
registry=registry,
vault=vault,
)
assert not vault
with pytest.raises(LoginError):
async for _, _ in vault:
pass
def test_direct_auth_fails_on_errors_in_client(login_mocks, kubernetes):
login_mocks.client_in_cluster.side_effect = kubernetes.config.ConfigException
login_mocks.client_from_file.side_effect = kubernetes.config.ConfigException
with pytest.raises(LoginError):
login()
assert login_mocks.pykube_in_cluster.called
assert not login_mocks.pykube_from_file.called
assert login_mocks.client_in_cluster.called
assert login_mocks.client_from_file.called
async def test_yielding_after_creation(mocker):
vault = Vault()
mocker.patch.object(vault._ready, 'wait_for_on')
mocker.patch.object(vault._ready, 'wait_for_off')
with pytest.raises(LoginError):
async for _, _ in vault:
pass
assert vault._ready.wait_for_on.called
assert vault._ready.wait_for_on.awaited
def test_direct_auth_fails_on_errors_in_pykube(login_mocks, any_kubernetes):
login_mocks.pykube_in_cluster.side_effect = FileNotFoundError
login_mocks.pykube_from_file.side_effect = FileNotFoundError
with pytest.raises(LoginError):
login()
assert login_mocks.pykube_in_cluster.called
assert login_mocks.pykube_from_file.called
elif info.ca_data:
ca_path = tempfiles[base64.b64decode(info.ca_data)]
else:
ca_path = None
if info.certificate_path and info.certificate_data:
raise credentials.LoginError("Both certificate path & data are set. Need only one.")
elif info.certificate_path:
certificate_path = info.certificate_path
elif info.certificate_data:
certificate_path = tempfiles[base64.b64decode(info.certificate_data)]
else:
certificate_path = None
if info.private_key_path and info.private_key_data:
raise credentials.LoginError("Both private key path & data are set. Need only one.")
elif info.private_key_path:
private_key_path = info.private_key_path
elif info.private_key_data:
private_key_path = tempfiles[base64.b64decode(info.private_key_data)]
else:
private_key_path = None
# The SSL part (both client certificate auth and CA verification).
context: ssl.SSLContext
if certificate_path and private_key_path:
context = ssl.create_default_context(
purpose=ssl.Purpose.CLIENT_AUTH,
cafile=ca_path)
context.load_cert_chain(
certfile=certificate_path,
keyfile=private_key_path)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
vault: credentials.Vault = vault_var.get()
async for key, info, session in vault.extended(APISession.from_connection_info, 'sessions'):
try:
return await fn(*args, **kwargs, session=session)
except aiohttp.ClientResponseError as e:
if e.status == 401:
await vault.invalidate(key, exc=e)
else:
raise
else:
raise credentials.LoginError("Ran out of connection credentials.")
return cast(_F, wrapper)
def select(self) -> Tuple[VaultKey, VaultItem]:
"""
Select the next item (not the info!) to try (and do so infinitely).
.. warning::
This method is not async/await-safe: if the data change on the go,
it can lead to improper items returned.
"""
if not self._current:
raise LoginError("No valid credentials are available.")
prioritised: Dict[int, List[Tuple[VaultKey, VaultItem]]]
prioritised = collections.defaultdict(list)
for key, item in self._current.items():
prioritised[item.info.priority].append((key, item))
top_priority = max(list(prioritised.keys()))
key, item = random.choice(prioritised[top_priority])
return key, item
async def wrapper(*args: Any, **kwargs: Any) -> Any:
vault: credentials.Vault = vault_var.get()
async for key, info, session in vault.extended(APISession.from_connection_info, 'sessions'):
try:
async for item in fn(*args, **kwargs, session=session):
yield item
break # out of credentials cycle (instead of `return`)
except aiohttp.ClientResponseError as e:
if e.status == 401:
await vault.invalidate(key, exc=e)
else:
raise
else:
raise credentials.LoginError("Ran out of connection credentials.")
return cast(_F, wrapper)