import re from io import BytesIO import numpy as np from cairosvg import svg2png from discord import Embed, Emoji, Message from PIL import Image from pyocticons import octicon from redbot.core.bot import Red, commands from redbot.core.utils.chat_formatting import bold, humanize_list, inline from redbot.core.utils.views import SimpleMenu from typing_extensions import override from .api import fetch_issue from .config import config, register_config from .constants import TYPES, convert_name from .logger import logger from .models import Provider, Repository class IssueCards(commands.Cog): """Post embeds for Git hosting provider issues when posting issue links or messages with an issue identifier in them (#)""" __author__ = ["[cswimr](https://c.csw.im/cswimr)"] __git__ = "https://c.csw.im/cswimr/SeaCogs" __version__ = "1.0.0" __documentation__ = "https://seacogs.csw.im/issuecards/" def __init__(self, bot: Red) -> None: super().__init__() self.bot: Red = bot self.application_emojis: list[Emoji] = [] register_config(config) @override def format_help_for_context(self, ctx: commands.Context) -> str: pre_processed = super().format_help_for_context(ctx) or "" n = "\n" if "\n\n" not in pre_processed else "" text = [ f"{pre_processed}{n}", f"{bold('Cog Version:')} [{self.__version__}]({self.__git__})", f"{bold('Author:')} {humanize_list(self.__author__)}", f"{bold('Documentation:')} {self.__documentation__}", ] return "\n".join(text) @override async def cog_load(self) -> None: await self._create_emojis() @commands.Cog.listener("on_message_without_command") async def issue_idenitifer_listener(self, message: Message) -> None: """Listen for messages and check if they contain issue identifiers.""" if message.author.bot or not message.guild: return identifier_pattern = r"(?:(?<=\s)|(?<=^))(?P\w+)(?P#|!)(?P\d+)(?=$|\s)" url_pattern = r"(?Phttps?:\/\/[^\/]+)\/(?P[^\/]+)\/(?P[^\/]+)(?:\/-)?\/(?Pissues|discussions|pull|pulls|merge_requests)\/(?P\d+)" matches = [] for match in re.finditer(identifier_pattern, message.content): matches.append( { "match_type": "identifier", "prefix": match.group("prefix"), "gitlab_issue_type": match.group("gitlab_issue_type"), "issue_number": match.group("issue_number"), }, ) for match in re.finditer(url_pattern, message.content): matches.append( { "match_type": "url", "base_url": match.group("base_url"), "author": match.group("author"), "repository_name": match.group("repository_name"), "type": match.group("type"), "number": match.group("number"), }, ) if not matches: return logger.debug("Found %s issue identifiers in message '%s'. Fetching issue from matches: %s", len(matches), message.id, matches) providers = await Provider.fetch_all(guild=message.guild) async with message.channel.typing(): first = True for match in matches: if match["match_type"] == "identifier": prefix = match["prefix"] gitlab_issue_type_group = match["gitlab_issue_type"] issue_number = match["issue_number"] if gitlab_issue_type_group == "!": gitlab_issue_type = "merge_request" else: gitlab_issue_type = "issue" try: repository = await Repository.from_prefix(cog=self, guild=message.guild, prefix=prefix) issue = await fetch_issue(repository, issue_number, gitlab_issue_type) except ValueError: continue if first is True: first = False await issue.send(message, reply=True) else: await issue.send(message) elif match["match_type"] == "url": base_url = match["base_url"] author = match["author"] repository_name = match["repository_name"] issue_type = match["type"] number = match["number"] provider: Provider | None = None for p in providers: if p.url == base_url: provider = p if p.guild is not None: break if provider is None: continue if issue_type == "merge_requests": gitlab_issue_type = "merge_request" else: gitlab_issue_type = "issue" try: repository = Repository(cog=self, guild=message.guild, owner=author, name=repository_name, provider=provider) issue = await fetch_issue(repository, number, gitlab_issue_type) except ValueError: continue if first is True: first = False await issue.send(message, reply=True) else: await issue.send(message) async def _create_emojis(self) -> None: """Create the application emojis used for the cog.""" application_emojis: list[Emoji] = await self.bot.fetch_application_emojis() all_application_emojis: list[Emoji] = application_emojis.copy() for t in TYPES: if convert_name(t["name"]) in (emoji.name for emoji in application_emojis): logger.trace("Emoji '%s' already exists, skipping", t["name"]) continue try: icon = octicon(t["octicon"], 256) if not icon: raise ValueError("Failed to create octicon emoji for '%s'. Does this icon name exist within octicons?" % t["octicon"]) rendered_emoji = BytesIO() svg2png(icon, write_to=rendered_emoji) image = Image.open(rendered_emoji) image = image.convert("RGBA") data = np.array(image) red, green, blue, _alpha = data.T black_areas = (red == 0) & (blue == 0) & (green == 0) data[..., :-1][black_areas.T] = t["color"].to_rgb() image = Image.fromarray(data) with BytesIO() as emoji: image.save(emoji, "PNG") emoji.seek(0) all_application_emojis.append(await self.bot.create_application_emoji(name=convert_name(t["name"]), image=emoji.getvalue())) except Exception as e: logger.exception("Failed to create emoji for type '%s'", t["name"], exc_info=e) self.application_emojis = all_application_emojis @commands.group(name="issuecards") async def issuecards(self, ctx: commands.Context) -> None: """Manage the IssueCards cog configuration.""" pass @issuecards.group(name="providers") async def issuecards_providers(self, ctx: commands.Context) -> None: """Manage IssueCards providers.""" pass @issuecards_providers.command(name="list") @commands.bot_has_permissions(embed_links=True) async def issuecards_providers_list(self, ctx: commands.Context) -> None: """List the IssueCards providers.""" providers = await Provider.fetch_all(guild=ctx.guild) if not providers: await ctx.send("No providers found.") return embed_color = await self.bot.get_embed_color(ctx.channel) pages: list[Embed] = [] for i in range(0, len(providers), 24): embed = Embed( title="IssueCards Providers", description="List of registered providers. Non-global providers take priority over global providers.", color=embed_color, ) embed.set_footer(text=f"Page {i // 24 + 1}/{len(providers) // 24 + 1}") for provider in providers[i : i + 24]: # fmt: off field_value = ( f"{bold('URL:')} {provider.url}\n" f"{bold('Service:')} {provider.service}\n" f"{bold('Token:')} {'Set' if provider.token else 'Not Set'}\n" f"{bold('Global:')} {'No' if provider.guild else 'Yes'}" ) # fmt: on embed.add_field(name=provider.id, value=field_value, inline=True) pages.append(embed) await SimpleMenu(pages).start(ctx, ephemeral=True) # pyright: ignore[reportArgumentType] @issuecards.group(name="repositories", aliases=["repository", "repo", "repos"]) async def issuecards_repositories(self, ctx: commands.Context) -> None: """Manage IssueCards repositories.""" pass @issuecards_repositories.command(name="add") async def issuecards_repositories_add(self, ctx: commands.Context, owner: str, name: str, provider_id: str, prefix: str | None = None) -> None: """Add a repository to the IssueCards configuration. **Arguments** * `owner`: The owner of the repository. * `name`: The name of the repository. * `provider_id`: The ID of the provider that contains this repository. Use `[p]issuecards providers list` to get a list of valid providers. * `prefix`: The prefix used for determining which repository to retrieve an issue from when using the `#` syntax. Defaults to `None`, disabling the prefix functionality. """ prefix = prefix.lower() if prefix else None try: provider = await Provider.from_config(provider_id, ctx.guild) except ValueError: await ctx.send(f"Invalid provider ID. Please use {inline(f'{ctx.prefix}issuecards providers list')} for a list of valid providers.") return try: await Repository.from_config(cog=self, guild=ctx.guild, owner=owner, repository=name, provider_id=provider.id) await ctx.send(f"Repository `{owner}/{name}` already exists in the configuration.") return except ValueError: pass repository = Repository(cog=self, guild=ctx.guild, owner=owner, name=name, provider=provider, prefix=prefix) await repository.to_config() await ctx.send(f"Repository `{owner}/{name}` {f'with prefix `{prefix}` ' if prefix is not None else ' '}has been added to the configuration.") @issuecards_repositories.command(name="remove") async def issuecards_repositories_remove(self, ctx: commands.Context, owner: str, name: str, provider_id: str) -> None: """Remove a repository from the IssueCards configuration. **Arguments** * `owner`: The owner of the repository. * `name`: The name of the repository. * `provider_id`: The ID of the provider that contains this repository. Use `[p]issuecards providers list` to get a list of valid providers. """ try: provider = await Provider.from_config(provider_id, ctx.guild) except ValueError: await ctx.send(f"Invalid provider ID. Please use {inline(f'{ctx.prefix}issuecards providers list')} for a list of valid providers.") return repository = await Repository.from_config(cog=self, guild=ctx.guild, owner=owner, repository=name, provider_id=provider.id) if not repository: await ctx.send(f"Repository `{owner}/{name}` does not exist in the configuration.") return await repository.remove_config() await ctx.send(f"Repository `{owner}/{name}` has been removed from the configuration.") @issuecards_repositories.command(name="list") @commands.bot_has_permissions(embed_links=True) async def issuecards_repositories_list(self, ctx: commands.Context) -> None: """List the IssueCards repositories.""" repositories = await Repository.fetch_all(cog=self, guild=ctx.guild) if not repositories: await ctx.send("No repositories found.") return embed_color = await self.bot.get_embed_color(ctx.channel) pages: list[Embed] = [] for i in range(0, len(repositories), 25): embed = Embed( title="IssueCards Repositories", description="List of registered repositories.", color=embed_color, ) embed.set_footer(text=f"Page {i // 25 + 1}/{len(repositories) // 25 + 1}") for repository in repositories[i : i + 25]: # fmt: off field_value = ( f"{bold('URL:')} {repository.url}\n" f"{bold('Provider:')} {repository.provider.id}\n" f"{bold('Prefix:')} {repository.prefix or 'None'}\n" f"{bold('Global:')} {'No' if repository.guild else 'Yes'}" ) # fmt: on embed.add_field(name=f"{repository.owner}/{repository.name}", value=field_value, inline=True) pages.append(embed) await SimpleMenu(pages).start(ctx, ephemeral=True) # pyright: ignore[reportArgumentType]