SeaCogs/issuecards/api/github.py
cswimr fdbc4c0810
Some checks failed
Actions / Lint Code (Ruff & Pylint) (pull_request) Failing after 55s
Actions / Build Documentation (MkDocs) (pull_request) Failing after 1m1s
feat(issuecards): init
2025-03-28 09:56:44 -05:00

204 lines
7.9 KiB
Python

import aiohttp
from ..logger import logger
from ..models import Issue, Repository
def _populate_headers(repository: Repository) -> dict[str, str]:
headers: dict[str, str] = {"User-Agent": repository.user_agent, "X-GitHub-Api-Version": "2022-11-28"}
if repository.provider.token is not None:
headers["Authorization"] = f"Bearer {repository.provider.token}"
else:
logger.debug("GitHub API key is not set for provider '%s'. Using unauthenticated request.", repository.provider.url)
return headers
async def fetch_github_issue(repository: Repository, issue_number: int) -> Issue:
"""
Fetch an issue from a GitHub instance.
If the issue cannot be fetched because it isn't an issue or pull request, this function will try to find a discussion within the same repository with the given number.
Args:
repository (Repository): The repository to fetch the issue from.
issue_number (int): The issue number.
Returns:
Issue: An Issue object containing the issue details.
Raises:
aiohttp.ClientResponseError: If the request to the GitHub API fails.
ValueError: If the response from the GitHub API is empty.
"""
url = f"{repository.provider.api_url}/repos/{repository.owner}/{repository.name}/issues/{issue_number}"
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=_populate_headers(repository)) as response:
match response.status:
case 200:
pass
case 404:
return await _fetch_github_discussion(repository, issue_number)
case _:
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=response.history,
status=response.status,
message=f"Failed to fetch issue '{issue_number}' from '{repository.url}'.",
)
data = await response.json()
if data is None:
raise ValueError("Received empty response from the GitHub API.")
pull_request = data.get("pull_request")
if pull_request is not None:
if pull_request.get("merged_at") is not None:
issue_type = "pull_request_merged"
elif data.get("draft", False) is True:
issue_type = "pull_request_draft"
elif data.get("state", "open") == "closed":
issue_type = "pull_request_closed"
else:
issue_type = "pull_request"
elif data.get("state", "open") == "closed":
state_reason = data.get("state_reason", "completed")
if state_reason in ["not_planned", "duplicate"]:
issue_type = "issue_not_planned"
else:
issue_type = "issue_closed"
else:
issue_type = "issue"
return Issue(
repository=repository,
number=data["number"],
author=data["user"]["login"],
author_avatar=data["user"]["avatar_url"],
author_url=data["user"]["html_url"],
link=data["html_url"],
title=data["title"],
body=data["body"],
type=issue_type,
labels=[label["name"] for label in data.get("labels", [])],
draft=data.get("draft", False),
creation_date=data["created_at"],
milestone=data.get("milestone", {}).get("title") if data.get("milestone") is not None else None,
milestone_url=data.get("milestone", {}).get("html_url") if data.get("milestone") is not None else None,
merge_date=data.get("pull_request").get("merged_at") if data.get("pull_request") is not None else None,
response=response,
)
async def _fetch_github_discussion(repository: Repository, discussion_number: int) -> Issue:
url = f"{repository.provider.api_url}/repos/{repository.owner}/{repository.name}/discussions/{discussion_number}"
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=_populate_headers(repository)) as response:
match response.status:
case 200:
pass
case _:
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=response.history,
status=response.status,
message=f"Failed to fetch discussion {discussion_number} from {repository.owner}/{repository.name}",
)
data = await response.json()
if data is None:
raise ValueError("Received empty response from GitHub API.")
if data.get("state", "open") == "closed":
state_reason = data.get("state_reason", "completed")
if state_reason == "duplicate":
discussion_type = "discussion_duplicate"
elif state_reason == "outdated":
discussion_type = "discussion_outdated"
else:
discussion_type = "discussion_closed"
else:
answer = data.get("answer_chosen_by")
if answer is not None:
discussion_type = "discussion_answered"
else:
discussion_type = "discussion"
return Issue(
repository=repository,
number=discussion_number,
author=data["user"]["login"],
author_avatar=data["user"]["avatar_url"],
author_url=data["user"]["html_url"],
link=data["html_url"],
title=data["title"],
body=data["body"],
type=discussion_type,
labels=[label["name"] for label in data.get("labels", [])],
draft=False,
creation_date=data["created_at"],
response=response,
)
# async def fetch_github_issue(repository: Repository, issue_number: int) -> Issue:
# """
# Fetch an issue from a GitHub instance.
# If the issue cannot be fetched because it isn't an issue or pull request, this function will try to find a discussion within the same repository with the given number.
# Args:
# repository (Repository): The repository to fetch the issue from.
# issue_number (int): The issue number.
# Returns:
# Issue: An Issue object containing the issue details.
# """
# query = gql(
# f"""
# {{
# repository(owner: "{repository.owner}", name: "{repository.name}") {{
# issueOrPullRequest(number: {issue_number}) {{
# ... on Issue {{
# title
# body
# author {{
# login
# avaturUrl
# url
# }}
# url
# issueState: state
# labels(first: 10) {{
# nodes {{
# name
# }}
# }}
# createdAt
# milestone {{
# title
# url
# }}
# }}
# ... on PullRequest {{
# title
# body
# author {{
# login
# avaturUrl
# url
# }}
# url
# pullRequestState: state
# labels(first: 10) {{
# nodes {{
# name
# }}
# }}
# createdAt
# milestone {{
# title
# url
# }}
# mergedAt
# }}
# }}
# }}
# }}
# """,
# )
# transport = AIOHTTPTransport(url=repository.provider.graphql_url, headers=_populate_headers(repository))
# async with Client(transport=transport, fetch_schema_from_transport=True) as client:
# response = await client.execute(query)
# return response