SeaCogs/issuecards/issuecards.py

309 lines
14 KiB
Python
Raw Normal View History

2025-03-28 09:56:44 -05:00
import re
from io import BytesIO
import numpy as np
from cairosvg import svg2png
from discord import Embed, Emoji, Message
from PIL import Image
from pyocticons import octicon
from redbot.core.bot import Red, commands
from redbot.core.utils.chat_formatting import bold, humanize_list, inline
from redbot.core.utils.views import SimpleMenu
from typing_extensions import override
from .api import fetch_issue
from .config import config, register_config
from .constants import TYPES, convert_name
from .logger import logger
from .models import Provider, Repository
class IssueCards(commands.Cog):
"""Post embeds for Git hosting provider issues when posting issue links or messages with an issue identifier in them (<repo>#<issue number>)"""
__author__ = ["[cswimr](https://c.csw.im/cswimr)"]
__git__ = "https://c.csw.im/cswimr/SeaCogs"
__version__ = "1.0.0"
__documentation__ = "https://seacogs.csw.im/issuecards/"
def __init__(self, bot: Red) -> None:
super().__init__()
self.bot: Red = bot
self.application_emojis: list[Emoji] = []
register_config(config)
@override
def format_help_for_context(self, ctx: commands.Context) -> str:
pre_processed = super().format_help_for_context(ctx) or ""
n = "\n" if "\n\n" not in pre_processed else ""
text = [
f"{pre_processed}{n}",
f"{bold('Cog Version:')} [{self.__version__}]({self.__git__})",
f"{bold('Author:')} {humanize_list(self.__author__)}",
f"{bold('Documentation:')} {self.__documentation__}",
]
return "\n".join(text)
@override
async def cog_load(self) -> None:
await self._create_emojis()
@commands.Cog.listener("on_message_without_command")
async def issue_idenitifer_listener(self, message: Message) -> None:
"""Listen for messages and check if they contain issue identifiers."""
if message.author.bot or not message.guild:
return
identifier_pattern = r"(?:(?<=\s)|(?<=^))(?P<prefix>\w+)(?P<gitlab_issue_type>#|!)(?P<issue_number>\d+)(?=$|\s)"
url_pattern = r"(?P<base_url>https?:\/\/[^\/]+)\/(?P<author>[^\/]+)\/(?P<repository_name>[^\/]+)(?:\/-)?\/(?P<type>issues|discussions|pull|pulls|merge_requests)\/(?P<number>\d+)"
matches = []
matches.extend(
{
"match_type": "identifier",
"prefix": match.group("prefix"),
"gitlab_issue_type": match.group("gitlab_issue_type"),
"issue_number": match.group("issue_number"),
}
for match in re.finditer(identifier_pattern, message.content)
)
matches.extend(
{
"match_type": "url",
"base_url": match.group("base_url"),
"author": match.group("author"),
"repository_name": match.group("repository_name"),
"type": match.group("type"),
"number": match.group("number"),
}
for match in re.finditer(url_pattern, message.content)
)
2025-03-28 09:56:44 -05:00
if not matches:
return
logger.debug("Found %s issue identifiers in message '%s'. Fetching issue from matches: %s", len(matches), message.id, matches)
providers = await Provider.fetch_all(guild=message.guild)
async with message.channel.typing():
first = True
for match in matches:
if match["match_type"] == "identifier":
prefix = match["prefix"]
gitlab_issue_type_group = match["gitlab_issue_type"]
issue_number = match["issue_number"]
if gitlab_issue_type_group == "!":
gitlab_issue_type = "merge_request"
else:
gitlab_issue_type = "issue"
try:
repository = await Repository.from_prefix(cog=self, guild=message.guild, prefix=prefix)
issue = await fetch_issue(repository, issue_number, gitlab_issue_type)
except ValueError:
continue
if first is True:
first = False
await issue.send(message, reply=True)
else:
await issue.send(message)
elif match["match_type"] == "url":
base_url = match["base_url"]
author = match["author"]
repository_name = match["repository_name"]
issue_type = match["type"]
number = match["number"]
provider: Provider | None = None
for p in providers:
if p.url == base_url:
provider = p
if p.guild is not None:
break
if provider is None:
continue
if issue_type == "merge_requests":
gitlab_issue_type = "merge_request"
else:
gitlab_issue_type = "issue"
try:
repository = Repository(cog=self, guild=message.guild, owner=author, name=repository_name, provider=provider)
issue = await fetch_issue(repository, number, gitlab_issue_type)
except ValueError:
continue
if first is True:
first = False
await issue.send(message, reply=True)
else:
await issue.send(message)
async def _create_emojis(self) -> None:
"""Create the application emojis used for the cog."""
application_emojis: list[Emoji] = await self.bot.fetch_application_emojis()
all_application_emojis: list[Emoji] = application_emojis.copy()
for t in TYPES:
if convert_name(t["name"]) in (emoji.name for emoji in application_emojis):
logger.trace("Emoji '%s' already exists, skipping", t["name"])
continue
try:
icon = octicon(t["octicon"], 256)
if not icon:
raise ValueError("Failed to create octicon emoji for '%s'. Does this icon name exist within octicons?" % t["octicon"])
rendered_emoji = BytesIO()
svg2png(icon, write_to=rendered_emoji)
image = Image.open(rendered_emoji)
image = image.convert("RGBA")
data = np.array(image)
red, green, blue, _alpha = data.T
black_areas = (red == 0) & (blue == 0) & (green == 0)
data[..., :-1][black_areas.T] = t["color"].to_rgb()
image = Image.fromarray(data)
with BytesIO() as emoji:
image.save(emoji, "PNG")
emoji.seek(0)
all_application_emojis.append(await self.bot.create_application_emoji(name=convert_name(t["name"]), image=emoji.getvalue()))
except Exception as e:
logger.exception("Failed to create emoji for type '%s'", t["name"], exc_info=e)
self.application_emojis = all_application_emojis
@commands.group(name="issuecards")
async def issuecards(self, ctx: commands.Context) -> None:
"""Manage the IssueCards cog configuration."""
pass
@issuecards.group(name="providers")
async def issuecards_providers(self, ctx: commands.Context) -> None:
"""Manage IssueCards providers."""
pass
@issuecards_providers.command(name="list")
@commands.bot_has_permissions(embed_links=True)
async def issuecards_providers_list(self, ctx: commands.Context) -> None:
"""List the IssueCards providers."""
providers = await Provider.fetch_all(guild=ctx.guild)
if not providers:
await ctx.send("No providers found.")
return
embed_color = await self.bot.get_embed_color(ctx.channel)
pages: list[Embed] = []
for i in range(0, len(providers), 24):
embed = Embed(
title="IssueCards Providers",
description="List of registered providers. Non-global providers take priority over global providers.",
color=embed_color,
)
embed.set_footer(text=f"Page {i // 24 + 1}/{len(providers) // 24 + 1}")
for provider in providers[i : i + 24]:
# fmt: off
field_value = (
f"{bold('URL:')} {provider.url}\n"
f"{bold('Service:')} {provider.service}\n"
f"{bold('Token:')} {'Set' if provider.token else 'Not Set'}\n"
f"{bold('Global:')} {'No' if provider.guild else 'Yes'}"
)
# fmt: on
embed.add_field(name=provider.id, value=field_value, inline=True)
pages.append(embed)
await SimpleMenu(pages).start(ctx, ephemeral=True) # pyright: ignore[reportArgumentType]
@issuecards.group(name="repositories", aliases=["repository", "repo", "repos"])
async def issuecards_repositories(self, ctx: commands.Context) -> None:
"""Manage IssueCards repositories."""
pass
@issuecards_repositories.command(name="add")
async def issuecards_repositories_add(self, ctx: commands.Context, owner: str, name: str, provider_id: str, prefix: str | None = None) -> None:
"""Add a repository to the IssueCards configuration.
**Arguments**
* `owner`: The owner of the repository.
* `name`: The name of the repository.
* `provider_id`: The ID of the provider that contains this repository.
Use `[p]issuecards providers list` to get a list of valid providers.
* `prefix`: The prefix used for determining which repository to retrieve an issue from when using the `<prefix>#<issue-num>` syntax.
Defaults to `None`, disabling the prefix functionality.
"""
prefix = prefix.lower() if prefix else None
try:
provider = await Provider.from_config(provider_id, ctx.guild)
except ValueError:
await ctx.send(f"Invalid provider ID. Please use {inline(f'{ctx.prefix}issuecards providers list')} for a list of valid providers.")
return
try:
await Repository.from_config(cog=self, guild=ctx.guild, owner=owner, repository=name, provider_id=provider.id)
await ctx.send(f"Repository `{owner}/{name}` already exists in the configuration.")
return
except ValueError:
pass
repository = Repository(cog=self, guild=ctx.guild, owner=owner, name=name, provider=provider, prefix=prefix)
await repository.to_config()
await ctx.send(f"Repository `{owner}/{name}` {f'with prefix `{prefix}` ' if prefix is not None else ' '}has been added to the configuration.")
@issuecards_repositories.command(name="remove")
async def issuecards_repositories_remove(self, ctx: commands.Context, owner: str, name: str, provider_id: str) -> None:
"""Remove a repository from the IssueCards configuration.
**Arguments**
* `owner`: The owner of the repository.
* `name`: The name of the repository.
* `provider_id`: The ID of the provider that contains this repository.
Use `[p]issuecards providers list` to get a list of valid providers.
"""
try:
provider = await Provider.from_config(provider_id, ctx.guild)
except ValueError:
await ctx.send(f"Invalid provider ID. Please use {inline(f'{ctx.prefix}issuecards providers list')} for a list of valid providers.")
return
repository = await Repository.from_config(cog=self, guild=ctx.guild, owner=owner, repository=name, provider_id=provider.id)
if not repository:
await ctx.send(f"Repository `{owner}/{name}` does not exist in the configuration.")
return
await repository.remove_config()
await ctx.send(f"Repository `{owner}/{name}` has been removed from the configuration.")
@issuecards_repositories.command(name="list")
@commands.bot_has_permissions(embed_links=True)
async def issuecards_repositories_list(self, ctx: commands.Context) -> None:
"""List the IssueCards repositories."""
repositories = await Repository.fetch_all(cog=self, guild=ctx.guild)
if not repositories:
await ctx.send("No repositories found.")
return
embed_color = await self.bot.get_embed_color(ctx.channel)
pages: list[Embed] = []
for i in range(0, len(repositories), 25):
embed = Embed(
title="IssueCards Repositories",
description="List of registered repositories.",
color=embed_color,
)
embed.set_footer(text=f"Page {i // 25 + 1}/{len(repositories) // 25 + 1}")
for repository in repositories[i : i + 25]:
# fmt: off
field_value = (
f"{bold('URL:')} {repository.url}\n"
f"{bold('Provider:')} {repository.provider.id}\n"
f"{bold('Prefix:')} {repository.prefix or 'None'}\n"
f"{bold('Global:')} {'No' if repository.guild else 'Yes'}"
)
# fmt: on
embed.add_field(name=f"{repository.owner}/{repository.name}", value=field_value, inline=True)
pages.append(embed)
await SimpleMenu(pages).start(ctx, ephemeral=True) # pyright: ignore[reportArgumentType]