Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def create_repo_if_not_exists(
self, repo_name: str,
*,
http_session: ClientSession,
):
"""Ensure that the repo exists under the org."""
github_api = await self._get_github_client(http_session)
with contextlib.suppress(gidgethub.InvalidField):
await github_api.post(
f'/orgs/{self.github_org_name}/repos',
data={'name': repo_name},
)
logger.info(
'Repo %s has been created',
f'https://github.com'
f'/{self.github_org_name}'
if status_code >= 500:
exc_type = GitHubBroken
elif status_code >= 400:
exc_type = BadRequest
if status_code == 403:
rate_limit = RateLimit.from_http(headers)
if rate_limit and not rate_limit.remaining:
raise RateLimitExceeded(rate_limit, message)
elif status_code == 422:
errors = data.get("errors", None)
if errors:
fields = ", ".join(repr(e["field"]) for e in errors)
message = f"{message} for {fields}"
else:
message = data["message"]
raise InvalidField(errors, message)
elif status_code >= 300:
exc_type = RedirectionException
else:
exc_type = HTTPException
status_code_enum = http.HTTPStatus(status_code)
args: Union[Tuple[http.HTTPStatus, str], Tuple[http.HTTPStatus]]
if message:
args = status_code_enum, message
else:
args = status_code_enum,
raise exc_type(*args)