Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_client_error_call_SSWS():
ssws_client = Client({
"orgUrl": ORG_URL,
"token": API_TOKEN + "wrong token"
})
req, error = await ssws_client.get_request_executor()\
.create_request("GET",
GET_USERS_CALL,
{},
{})
req, res_details, resp_body, error = await ssws_client\
.get_request_executor().fire_request(req)
parsed, error = HTTPClient.check_response_for_error(
req["url"], res_details, resp_body)
assert parsed is None
assert isinstance(error, OktaAPIError)
assert error.message.startswith("Okta HTTP")
async def test_client_timeout(monkeypatch):
http_client = HTTPClient({
'requestTimeout': REQUEST_TIMEOUT,
'headers': {}
})
# mock and shoot request
monkeypatch.setattr(HTTPClient, 'send_request',
mocks.mock_timeout_response)
req, res_details, resp_body, error = await http_client.send_request({
'method': 'INVALID',
'url': ORG_URL,
'headers': {},
'data': {}
})
assert all(values in [None] for values in [req, res_details, resp_body])
async def test_client_invalid_HTTP_method(monkeypatch):
http_client = HTTPClient({
'requestTimeout': REQUEST_TIMEOUT,
'headers': {}
})
monkeypatch.setattr(HTTPClient, 'send_request',
mocks.mock_invalid_HTTP_response)
req, res_details, resp_body, error = await http_client.send_request({
'method': 'INVALID',
'url': ORG_URL,
'headers': {},
'data': {}
})
assert all(values in [None] for values in [req, res_details, resp_body])
assert issubclass(type(error), aiohttp.ClientError)
assert type(error) == aiohttp.ContentTypeError
async def test_max_retries_no_timeout(monkeypatch, mocker):
client = Client(user_config=CLIENT_CONFIG)
query_params = {"limit": "1"}
monkeypatch.setattr(HTTPClient, 'send_request',
mocks.mock_GET_HTTP_Client_response_429)
monkeypatch.setattr(time, 'sleep', mocks.mock_pause_function)
http_spy = mocker.spy(HTTPClient, 'send_request')
users, resp, error = await client.list_users(query_params)
http_spy.assert_called()
assert http_spy.call_count ==\
client.get_request_executor()._max_retries + 1
assert client.get_request_executor()._request_timeout == 0
assert isinstance(error, HTTPError)
assert error is not None
assert error.status == HTTPStatus.TOO_MANY_REQUESTS
assert resp.get_status() == HTTPStatus.TOO_MANY_REQUESTS
async def test_no_x_reset_header(monkeypatch):
client = Client(user_config=CLIENT_CONFIG)
monkeypatch.setattr(HTTPClient, 'send_request',
mocks.mock_GET_HTTP_Client_response_429_no_x_reset)
monkeypatch.setattr(time, 'sleep', mocks.mock_pause_function)
users, resp, error = await client.list_users()
assert error is not None
assert isinstance(error, Exception)
assert error.args[0] == ERROR_MESSAGE_429_MISSING_DATE_X_RESET
assert users is None
assert resp is None
/publish
Args:
app_id {str}
csr_id {str}
{string}
Returns:
JsonWebKey
"""
http_method = "post".upper()
api_url = format_url(f"""
{self._base_url}
/api/v1/apps/{appId}/credentials/csrs/{csrId}
/lifecycle/publish
""")
body = HTTPClient.format_binary_data(string)
headers = {
"Accept": "application/json",
"Content-Type": "application/pkix-cert"
}
request, error = await self._request_executor.create_request(
http_method, api_url, body, headers
)
if error:
return (None, None, error)
response, error = await self._request_executor\
.execute(request, JsonWebKey)
if error:
'User-Agent': UserAgent(config["client"].get("userAgent", None))
.get_user_agent_string(),
'Accept': "application/json"
}
# SSWS header
if config["client"]["authorizationMode"] == "SSWS":
self._default_headers['Authorization'] = (
"SSWS "
f"{self._config['client']['token']}"
)
else:
# OAuth
self._oauth = OAuth(self, self._config)
self._http_client = HTTPClient({
'requestTimeout': self._request_timeout,
'headers': self._default_headers
})
'Accept': "application/json",
'Content-Type': 'application/x-www-form-urlencoded'
}, oauth=True)
# TODO Make max 1 retry
# Shoot request
if err:
return (None, err)
_, res_details, res_json, err = \
await self._request_executor.fire_request(oauth_req)
# Return HTTP Client error if raised
if err:
return (None, err)
# Check response body for error message
parsed_response, err = HTTPClient.check_response_for_error(
url, res_details, res_json)
# Return specific error if found in response
if err:
return (None, err)
# Otherwise set token and return it
self._access_token = parsed_response["access_token"]
return (self._access_token, None)
/publish
Args:
app_id {str}
csr_id {str}
{string}
Returns:
JsonWebKey
"""
http_method = "post".upper()
api_url = format_url(f"""
{self._base_url}
/api/v1/apps/{appId}/credentials/csrs/{csrId}
/lifecycle/publish
""")
body = HTTPClient.format_binary_data(string)
headers = {
"Accept": "application/json",
"Content-Type": "application/x-x509-ca-cert"
}
request, error = await self._request_executor.create_request(
http_method, api_url, body, headers
)
if error:
return (None, None, error)
response, error = await self._request_executor\
.execute(request, JsonWebKey)
if error:
ials for the IdP.
Args:
idp_id {str}
csr_id {str}
{string}
Returns:
JsonWebKey
"""
http_method = "post".upper()
api_url = format_url(f"""
{self._base_url}
/api/v1/idps/{idpId}/credentials/csrs/{csrId}
/lifecycle/publish
""")
body = HTTPClient.format_binary_data(string)
headers = {
"Accept": "application/json",
"Content-Type": "application/x-pem-file"
}
request, error = await self._request_executor.create_request(
http_method, api_url, body, headers
)
if error:
return (None, None, error)
response, error = await self._request_executor\
.execute(request, JsonWebKey)
if error: