Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@rate_limit(limit_amount=50, limit_period=60)
def _reqs(self, tag):
""" Grab all the pull requests """
return [(tag, i) for i in self.ghc.pull_requests.list(tag)]
def _rate_limit(*args, **kw):
if time.time() - rate_limit.start > self.limit_period:
rate_limit.start, rate_limit.counter = time.time(), 0
rate_limit.counter += 1
if rate_limit.counter == self.limit_amount - 1:
duration = self.limit_period - \
(time.time() - rate_limit.start) + 1
log.warning("Expected to exceed API rate limit. Sleeping {0}s",
duration)
time.sleep(duration)
return func(*args, **kw)
@rate_limit(limit_amount=50, limit_period=60)
def _add_comments(self, tag, issue):
""" Return an issue with comments added """
if issue.comments == 0:
return tag, issue, {}
comments = self.ghc.issues.comments(tag, issue.number)
result = {}
for c in comments:
d = time.mktime(c.updated_at.timetuple())
result['annotation_%i' % int(d)] = "%s: %s" % (
c.user[:6], c.body[:29]
)
return tag, issue, result
@rate_limit(limit_amount=50, limit_period=60)
def _issues(self, tag):
""" Grab all the issues """
return [(tag, i) for i in self.ghc.issues.list(tag)]
def __init__(self, limit_amount, limit_period):
self.limit_amount = limit_amount
self.limit_period = limit_period
rate_limit.start = time.time()
rate_limit.counter = 0