Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init(self):
rate_limit = read_file('data/github/rate_limit')
httpretty.register_uri(httpretty.GET,
GITHUB_RATE_LIMIT,
body=rate_limit,
status=200,
forcing_headers={
'X-RateLimit-Remaining': '20',
'X-RateLimit-Reset': '15'
})
client = GitHubClient('zhquan_example', 'repo', 'aaa')
self.assertEqual(client.owner, 'zhquan_example')
self.assertEqual(client.repository, 'repo')
self.assertEqual(client.max_retries, GitHubClient.MAX_RETRIES)
self.assertEqual(client.sleep_time, GitHubClient.DEFAULT_SLEEP_TIME)
self.assertEqual(client.max_retries, GitHubClient.MAX_RETRIES)
self.assertEqual(client.base_url, 'https://api.github.com')
client = GitHubClient('zhquan_example', 'repo', 'aaa', None, False, 3, 20, 2, None, False)
self.assertEqual(client.owner, 'zhquan_example')
self.assertEqual(client.repository, 'repo')
self.assertEqual(client.token, 'aaa')
self.assertEqual(client.base_url, GITHUB_API_URL)
self.assertFalse(client.sleep_for_rate)
self.assertEqual(client.min_rate_to_sleep, 3)
self.assertEqual(client.sleep_time, 20)
body=rate_limit,
status=200,
forcing_headers={
'X-RateLimit-Remaining': '20',
'X-RateLimit-Reset': '15'
})
httpretty.register_uri(httpretty.GET,
GITHUB_USER_URL,
body=login, status=200,
forcing_headers={
'X-RateLimit-Remaining': '20',
'X-RateLimit-Reset': '15'
})
client = GitHubClient("zhquan_example", "repo", "aaa", None)
response = client.user("zhquan_example")
self.assertEqual(response, login)
body=rate_limit,
status=200,
forcing_headers={
'X-RateLimit-Remaining': '20',
'X-RateLimit-Reset': '15'
})
httpretty.register_uri(httpretty.GET,
GITHUB_ORGS_URL,
body=orgs, status=200,
forcing_headers={
'X-RateLimit-Remaining': '20',
'X-RateLimit-Reset': '15'
})
client = GitHubClient("zhquan_example", "repo", "aaa", None)
response = client.user_orgs("zhquan_example")
self.assertEqual(response, orgs)
body=rate_limit,
status=200,
forcing_headers={
'X-RateLimit-Remaining': '20',
'X-RateLimit-Reset': '15'
})
httpretty.register_uri(httpretty.GET,
GITHUB_PULL_REQUEST_1_COMMITS,
body=pull_request_commits,
status=200,
forcing_headers={
'X-RateLimit-Remaining': '20',
'X-RateLimit-Reset': '15'
})
client = GitHubClient("zhquan_example", "repo", "aaa")
pull_commits_raw = [rev for rev in client.pull_commits(1)]
self.assertEqual(pull_commits_raw[0], pull_request_commits)
body=rate_limit,
status=200,
forcing_headers={
'X-RateLimit-Remaining': '20',
'X-RateLimit-Reset': '15'
})
httpretty.register_uri(httpretty.GET,
GITHUB_ISSUES_URL,
body=issues, status=200,
forcing_headers={
'X-RateLimit-Remaining': '5',
'X-RateLimit-Reset': '5'
})
client = GitHubClient("zhquan_example", "repo", "aaa", None)
raw_issues = [issues for issues in client.issues()]
self.assertEqual(raw_issues[0], issues)
# Check requests
expected = {
'per_page': ['100'],
'state': ['all'],
'direction': ['asc'],
'sort': ['updated']
}
self.assertDictEqual(httpretty.last_request().querystring, expected)
self.assertEqual(httpretty.last_request().headers["Authorization"], "token aaa")
status=200,
forcing_headers={
'X-RateLimit-Remaining': '20',
'X-RateLimit-Reset': '15'
})
client = GitHubClient('zhquan_example', 'repo', 'aaa')
self.assertEqual(client.owner, 'zhquan_example')
self.assertEqual(client.repository, 'repo')
self.assertEqual(client.max_retries, GitHubClient.MAX_RETRIES)
self.assertEqual(client.sleep_time, GitHubClient.DEFAULT_SLEEP_TIME)
self.assertEqual(client.max_retries, GitHubClient.MAX_RETRIES)
self.assertEqual(client.base_url, 'https://api.github.com')
client = GitHubClient('zhquan_example', 'repo', 'aaa', None, False, 3, 20, 2, None, False)
self.assertEqual(client.owner, 'zhquan_example')
self.assertEqual(client.repository, 'repo')
self.assertEqual(client.token, 'aaa')
self.assertEqual(client.base_url, GITHUB_API_URL)
self.assertFalse(client.sleep_for_rate)
self.assertEqual(client.min_rate_to_sleep, 3)
self.assertEqual(client.sleep_time, 20)
self.assertEqual(client.max_retries, 2)
self.assertIsNone(client.archive)
self.assertFalse(client.from_archive)
client = GitHubClient('zhquan_example', 'repo', 'aaa', min_rate_to_sleep=RateLimitHandler.MAX_RATE_LIMIT + 1)
self.assertEqual(client.min_rate_to_sleep, RateLimitHandler.MAX_RATE_LIMIT)
client = GitHubClient('zhquan_example', 'repo', 'aaa', min_rate_to_sleep=RateLimitHandler.MAX_RATE_LIMIT - 1)
self.assertEqual(client.min_rate_to_sleep, RateLimitHandler.MAX_RATE_LIMIT - 1)
status=200,
forcing_headers={
'X-RateLimit-Remaining': '20',
'X-RateLimit-Reset': '15'
})
httpretty.register_uri(httpretty.GET,
GITHUB_ISSUES_URL,
body=abuse_rate_limit, status=429,
forcing_headers={
'X-RateLimit-Remaining': '5',
'X-RateLimit-Reset': '5',
'Retry-After': str(retry_after_value)
})
GitHubClient.MAX_RETRIES_ON_READ = 2
client = GitHubClient("zhquan_example", "repo", "aaa", None)
before = int(time.time())
expected = before + (retry_after_value * client.max_retries_on_status)
with self.assertRaises(requests.exceptions.HTTPError):
_ = [issues for issues in client.issues()]
after = int(time.time())
self.assertTrue(expected <= after)
'X-RateLimit-Remaining': '20',
'X-RateLimit-Reset': '15'
})
httpretty.register_uri(httpretty.GET,
GITHUB_ISSUES_URL,
body=issue,
status=200,
forcing_headers={
'X-RateLimit-Remaining': '0',
'X-RateLimit-Reset': '0',
'Link': '<' + GITHUB_ISSUES_URL + '/?&page=2>; rel="next", <' +
GITHUB_ISSUES_URL + '/?&page=3>; rel="last"'
})
client = GitHubClient("zhquan_example", "repo", "aaa", sleep_for_rate=False)
with self.assertRaises(RateLimitError):
_ = [issues for issues in client.issues()]
# Check requests
expected = {
'per_page': ['100'],
'state': ['all'],
'direction': ['asc'],
'sort': ['updated']
}
self.assertDictEqual(httpretty.last_request().querystring, expected)
self.assertEqual(httpretty.last_request().headers["Authorization"], "token aaa")
httpretty.register_uri(httpretty.GET,
GITHUB_RATE_LIMIT,
body=rate_limit,
status=200,
forcing_headers={
'X-RateLimit-Remaining': '20',
'X-RateLimit-Reset': '15'
})
client = GitHubClient('zhquan_example', 'repo', 'aaa')
self.assertEqual(client.owner, 'zhquan_example')
self.assertEqual(client.repository, 'repo')
self.assertEqual(client.max_retries, GitHubClient.MAX_RETRIES)
self.assertEqual(client.sleep_time, GitHubClient.DEFAULT_SLEEP_TIME)
self.assertEqual(client.max_retries, GitHubClient.MAX_RETRIES)
self.assertEqual(client.base_url, 'https://api.github.com')
client = GitHubClient('zhquan_example', 'repo', 'aaa', None, False, 3, 20, 2, None, False)
self.assertEqual(client.owner, 'zhquan_example')
self.assertEqual(client.repository, 'repo')
self.assertEqual(client.token, 'aaa')
self.assertEqual(client.base_url, GITHUB_API_URL)
self.assertFalse(client.sleep_for_rate)
self.assertEqual(client.min_rate_to_sleep, 3)
self.assertEqual(client.sleep_time, 20)
self.assertEqual(client.max_retries, 2)
self.assertIsNone(client.archive)
self.assertFalse(client.from_archive)
client = GitHubClient('zhquan_example', 'repo', 'aaa', min_rate_to_sleep=RateLimitHandler.MAX_RATE_LIMIT + 1)
self.assertEqual(client.min_rate_to_sleep, RateLimitHandler.MAX_RATE_LIMIT)
def test_calculate_time_to_reset(self):
"""Test whether the time to reset is zero if the sleep time is negative"""
rate_limit = read_file('data/github/rate_limit')
httpretty.register_uri(httpretty.GET,
GITHUB_RATE_LIMIT,
body=rate_limit,
status=200,
forcing_headers={
'X-RateLimit-Remaining': '20',
'X-RateLimit-Reset': int(datetime_utcnow().replace(microsecond=0).timestamp())
})
client = GitHubClient("zhquan_example", "repo", "aaa", sleep_for_rate=True)
time_to_reset = client.calculate_time_to_reset()
self.assertEqual(time_to_reset, 0)