Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_build_session_proxies_kwarg_ignore_no_proxy(self):
transport = _TransportHTTP("server", proxy="https://kwargproxy",
no_proxy=True)
session = transport._build_session()
assert 'http' not in session.proxies
assert session.proxies['https'] == "https://kwargproxy"
def test_build_certificate_no_pem(self):
transport = _TransportHTTP("", certificate_key_pem="path")
with pytest.raises(ValueError) as err:
transport._build_auth_certificate(None)
assert str(err.value) == \
"For certificate auth, the path to the certificate pem file " \
"must be specified with certificate_pem"
def test_build_certificate_no_key_pem(self):
transport = _TransportHTTP("")
with pytest.raises(ValueError) as err:
transport._build_auth_certificate(None)
assert str(err.value) == \
"For certificate auth, the path to the certificate key pem file " \
"must be specified with certificate_key_pem"
def test_send_timeout_kwargs(self, monkeypatch):
response = requests.Response()
response.status_code = 200
response._content = b"content"
response.headers['content-type'] = "application/soap+xml;charset=UTF-8"
send_mock = MagicMock()
send_mock.return_value = response
monkeypatch.setattr(requests.Session, "send", send_mock)
transport = _TransportHTTP("server", ssl=True, connection_timeout=20, read_timeout=25)
session = transport._build_session()
transport.session = session
request = requests.Request('POST', transport.endpoint, data=b"data")
prep_request = session.prepare_request(request)
actual = transport._send_request(prep_request, "server")
assert actual == b"content"
assert send_mock.call_count == 1
assert send_mock.call_args[0] == (prep_request,)
assert send_mock.call_args[1]['timeout'] == (20, 25)
def test_build_session_cert_no_validate_override_env(self):
transport = _TransportHTTP("", cert_validation=False)
os.environ['REQUESTS_CA_BUNDLE'] = 'path_to_REQUESTS_CA_CERT'
try:
session = transport._build_session()
finally:
del os.environ['REQUESTS_CA_BUNDLE']
assert session.verify is False
def test_build_ntlm_with_kwargs(self):
transport = _TransportHTTP("", auth="ntlm", username="user",
ssl=False, password="pass",
negotiate_delegate=True,
negotiate_hostname_override="host",
negotiate_send_cbt=False,
negotiate_service="HTTP",
cert_validation=False)
session = transport._build_session()
assert isinstance(transport.encryption, WinRMEncryption)
assert transport.encryption.protocol == WinRMEncryption.SPNEGO
assert isinstance(session.auth, HTTPNegotiateAuth)
assert session.auth.auth_provider == "ntlm"
assert session.auth.delegate is True
assert session.auth.hostname_override == "host"
assert session.auth.password == "pass"
assert session.auth.send_cbt is False
assert session.auth.service == 'HTTP'
def test_not_supported_auth(self):
with pytest.raises(ValueError) as err:
_TransportHTTP("", "", auth="fake")
assert str(err.value) == \
"The specified auth 'fake' is not supported, please select one " \
"of 'basic, certificate, credssp, kerberos, negotiate, ntlm'"
def test_build_session_proxies_env_no_proxy_override(self):
transport = _TransportHTTP("server", no_proxy=True)
os.environ['https_proxy'] = "https://envproxy"
try:
session = transport._build_session()
finally:
del os.environ['https_proxy']
assert 'http' not in session.proxies
assert 'https' not in session.proxies
def test_build_session_proxies_env_kwarg_override(self):
transport = _TransportHTTP("server", proxy="https://kwargproxy")
os.environ['https_proxy'] = "https://envproxy"
try:
session = transport._build_session()
finally:
del os.environ['https_proxy']
assert 'http' not in session.proxies
assert session.proxies['https'] == "https://kwargproxy"
def test_build_credssp_no_kwargs(self):
credssp = pytest.importorskip("requests_credssp")
transport = _TransportHTTP("", username="user", password="pass",
auth="credssp")
session = transport._build_session()
assert isinstance(transport.encryption, WinRMEncryption)
assert transport.encryption.protocol == WinRMEncryption.CREDSSP
assert isinstance(session.auth, credssp.HttpCredSSPAuth)
assert session.auth.auth_mechanism == 'auto'
assert session.auth.disable_tlsv1_2 is False
assert session.auth.minimum_version == 2
assert session.auth.password == 'pass'
assert session.auth.username == 'user'