Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_send_analysis_by_file_sends_analysis_and_waits_specific_time_until_compilation(self):
# Arrange
with responses.RequestsMock() as mock:
mock.add('POST',
url=self.full_url + '/analyze',
status=201,
json={'result_url': 'a/sd/asd'})
mock.add('GET',
url=self.full_url + '/analyses/asd',
status=200,
json={'result': 'report'})
analysis = Analysis(file_path='a')
wait = 1
with patch(self.patch_prop, mock_open(read_data='data')):
# Act
start = datetime.datetime.utcnow()
analysis.send(wait=wait)
duration = (datetime.datetime.utcnow() - start).total_seconds()
def token_update_callback(update_data):
client_args[update_data["team_id"]] = update_data
sc = SlackClient(
client_id="12345",
client_secret="12345",
refresh_token="refresh_token",
token_update_callback=token_update_callback,
)
# The client starts out with an empty token
assert sc.token is None
# Mock both the main API request and the token refresh request
with responses.RequestsMock() as rsps:
rsps.add(
responses.POST,
"https://slack.com/api/auth.test",
status=200,
json={"ok": True},
)
rsps.add(
responses.POST,
"https://slack.com/api/oauth.access",
status=200,
json={
"ok": True,
"access_token": access_token,
"token_type": "app",
"expires_in": 3600,
c.redirect_uris = ["https://example.com/authz"]
client_info = {
"client_id": "clientid",
"redirect_uris": ["https://example.com/authz"],
}
registration_token = (
"eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6IC"
"JlYjc1N2M3Yy00MWRlLTRmZDYtOTkwNy1hNGFiMDY1ZjEzMmEifQ.eyJqdGkiOiI2ZWY0MDZi"
"MC02YzA3LTQ0NzctOWU1YS1hY2FiZjNiMWNiMjgiLCJleHAiOjAsIm5iZiI6MCwiaWF0Ijox"
"NTczNzMxNjg5LCJpc3MiOiJodHRwczovL29wZW5pZC1wcm92aWRlci5leGFtcGxlLmNvbS9h"
"dXRoL3JlYWxtcy9tYXN0ZXIiLCJhdWQiOiJodHRwczovL29wZW5pZC1wcm92aWRlci5leGFt"
"cGxlLmNvbS9hdXRoL3JlYWxtcy9tYXN0ZXIiLCJ0eXAiOiJJbml0aWFsQWNjZXNzVG9rZW4i"
"fQ.0XTlit_JcxPZeIy8A4BzrHn1NvegVP7ws8KI0ySFex8"
)
with responses.RequestsMock() as rsps:
rsps.add(
rsps.POST,
"https://provider.example.com/registration/",
json=client_info,
)
c.register(
"https://provider.example.com/registration/",
registration_token=registration_token,
)
header = rsps.calls[0].request.headers["Authorization"]
assert header == "Bearer " + registration_token
def test_rtm_reconnect(server, rtm_start_fixture):
with responses.RequestsMock() as rsps:
rsps.add(
responses.POST,
"https://slack.com/api/rtm.connect",
status=200,
json=rtm_start_fixture
)
server.rtm_connect(auto_reconnect=True, reconnect=True, use_rtm_start=False)
for call in rsps.calls:
assert call.request.url in [
"https://slack.com/api/rtm.connect"
]
def test_delete(core_client, mgmt_server_base_uri, name, uid, params):
endpoint = mgmt_server_base_uri + 'delete-package'
with responses.RequestsMock() as rsps:
resp_body = {'foo': 'bar', 'message': 'OK'}
rsps.add(responses.POST, endpoint,
json=resp_body, status=200,
content_type='application/json')
c = cpauto.PolicyPackage(core_client)
r = c.delete(name=name, uid=uid, params=params)
assert r.status_code == 200
assert r.json() == resp_body
def token_update_callback(update_data):
client_args[update_data["team_id"]] = update_data
sc = SlackClient(
client_id="12345",
client_secret="12345",
refresh_token="refresh_token",
token_update_callback=token_update_callback,
)
# Set the token TTL to some time in the past
sc.access_token_expires_at = 0
# Mock both the main API request and the token refresh request
with responses.RequestsMock() as rsps:
rsps.add(
responses.POST,
"https://slack.com/api/auth.test",
status=200,
json={"ok": True},
)
rsps.add(
responses.POST,
"https://slack.com/api/oauth.access",
status=200,
json={
"ok": True,
"access_token": "xoxa-2-abcdef",
"token_type": "app",
"expires_in": 3600,
# client0
self.provider.cdb["client0"][
"backchannel_logout_uri"
] = "https://example.com/bc_logout"
self.provider.cdb["client0"]["client_id"] = "client0"
self.provider.cdb["number5"][
"frontchannel_logout_uri"
] = "https://example.com/fc_logout"
self.provider.cdb["number5"]["client_id"] = CLIENT_ID
# Get a session ID, anyone will do.
# I know the session backend DB is a DictSessionBackend so I can use that
_sid = list(self.provider.sdb._db.storage.keys())[0]
# This only does back channel logout
with responses.RequestsMock() as rsps:
rsps.add(rsps.POST, "https://example.com/bc_logout", status=200)
res = self.provider.do_verified_logout(_sid, "client0", alla=False)
assert set(res.keys()) == {"cookie"}
def test_wikidata_cache(cache_test_normal, basket_ball_wiki_es, monkeypatch):
"""
We test the cache for the Wikidata ES
"""
client = TestClient(app)
with responses.RequestsMock(assert_all_requests_are_fired=False) as rsps:
"""
We mock all wikipedia requests since
the information are expected to be in
the Wiki ES
"""
rsps.add("GET", re.compile(r"^https://.*\.wikipedia.org/"), status=200)
response = client.get(url=f"http://localhost/v1/pois/osm:way:7777777?lang=fr")
assert response.status_code == 200
resp = response.json()
"""
We should have a "wikipedia" block in the
answer
"""
assert any(b["type"] == "wikipedia" for b in resp["blocks"][2].get("blocks"))
from tests.testLogger import LoggingCaptor
from log import setup_custom_logger
logger = setup_custom_logger("root")
logging_captor = LoggingCaptor()
logger.addHandler(logging_captor)
with responses.RequestsMock() as rsps:
url_re = re.compile(r'.*')
rsps.add(responses.GET, url_re,
body='', status=200,
content_type='application/json')
search.search_movie()
assert any("The API key seems to be incorrect" in s for s in logging_captor.messages)
with responses.RequestsMock() as rsps:
logging_captor.messages = []
url_re = re.compile(r'.*')
rsps.add(responses.GET, url_re,
body='', status=404,
content_type='application/json')
search.search_movie()
assert any("Unable to connect with" in s for s in logging_captor.messages)
return_value = None
def run():
responses.mock.start()
StackInABox.reset_services()
for service in self.services.values():
StackInABox.register_service(service)
responses_registration(self.uri)
return_value = fn(*args_finalized, **kwargs)
StackInABox.reset_services()
responses.mock.stop()
responses.mock.reset()
with responses.RequestsMock():
run()
return return_value