feat(repo): make all cogs pylance-typechecking compliant
at `basic` level, does not include Aurora as it's being rewritten in the `aurora/v3` branch
This commit is contained in:
parent
ea0b7937f8
commit
2a5b924409
11 changed files with 184 additions and 139 deletions
15
.vscode/settings.json
vendored
15
.vscode/settings.json
vendored
|
@ -11,11 +11,22 @@
|
||||||
"[jsonc]": {
|
"[jsonc]": {
|
||||||
"editor.defaultFormatter": "vscode.json-language-features"
|
"editor.defaultFormatter": "vscode.json-language-features"
|
||||||
},
|
},
|
||||||
"editor.formatOnSave": true,
|
|
||||||
"files.exclude": {
|
"files.exclude": {
|
||||||
"**/.git": true,
|
"**/.git": true,
|
||||||
"**/__pycache__": true,
|
"**/__pycache__": true,
|
||||||
"**/.ruff_cache": true,
|
"**/.ruff_cache": true,
|
||||||
"**/.mypy_cache": true
|
"**/.mypy_cache": true
|
||||||
}
|
},
|
||||||
|
"python.analysis.diagnosticSeverityOverrides": {
|
||||||
|
"reportAttributeAccessIssue": false, // disabled because `commands.group.command` is listed as Any / Unknown for some reason
|
||||||
|
"reportCallIssue": "information"
|
||||||
|
},
|
||||||
|
"python.analysis.diagnosticMode": "workspace",
|
||||||
|
"python.analysis.supportDocstringTemplate": true,
|
||||||
|
"python.analysis.typeCheckingMode": "basic",
|
||||||
|
"python.analysis.typeEvaluation.enableReachabilityAnalysis": true,
|
||||||
|
"python.analysis.typeEvaluation.strictDictionaryInference": true,
|
||||||
|
"python.analysis.typeEvaluation.strictListInference": true,
|
||||||
|
"python.analysis.typeEvaluation.strictSetInference": true,
|
||||||
|
"editor.formatOnSave": true,
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@ class AntiPolls(commands.Cog):
|
||||||
|
|
||||||
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
|
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
|
||||||
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
|
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
|
||||||
__version__ = "1.0.2"
|
__version__ = "1.0.3"
|
||||||
__documentation__ = "https://seacogs.coastalcommits.com/antipolls/"
|
__documentation__ = "https://seacogs.coastalcommits.com/antipolls/"
|
||||||
|
|
||||||
def __init__(self, bot: Red):
|
def __init__(self, bot: Red):
|
||||||
|
@ -82,7 +82,7 @@ class AntiPolls(commands.Cog):
|
||||||
return self.logger.trace("Deleted poll message %s", message.id)
|
return self.logger.trace("Deleted poll message %s", message.id)
|
||||||
return self.logger.verbose("Message %s is not a poll, ignoring", message.id)
|
return self.logger.verbose("Message %s is not a poll, ignoring", message.id)
|
||||||
|
|
||||||
@commands.group(name="antipolls", aliases=["ap"])
|
@commands.group(name="antipolls", aliases=["ap"]) # type: ignore
|
||||||
@commands.guild_only()
|
@commands.guild_only()
|
||||||
@commands.admin_or_permissions(manage_guild=True)
|
@commands.admin_or_permissions(manage_guild=True)
|
||||||
async def antipolls(self, ctx: commands.Context) -> None:
|
async def antipolls(self, ctx: commands.Context) -> None:
|
||||||
|
@ -95,6 +95,8 @@ class AntiPolls(commands.Cog):
|
||||||
@antipolls_roles.command(name="add")
|
@antipolls_roles.command(name="add")
|
||||||
async def antipolls_roles_add(self, ctx: commands.Context, *roles: discord.Role) -> None:
|
async def antipolls_roles_add(self, ctx: commands.Context, *roles: discord.Role) -> None:
|
||||||
"""Add roles to the whitelist."""
|
"""Add roles to the whitelist."""
|
||||||
|
assert ctx.guild is not None # using `assert` here and in the rest of this file to satisfy typecheckers
|
||||||
|
# this is safe because the commands are part of a guild-only command group
|
||||||
async with self.config.guild(ctx.guild).role_whitelist() as role_whitelist:
|
async with self.config.guild(ctx.guild).role_whitelist() as role_whitelist:
|
||||||
role_whitelist: list
|
role_whitelist: list
|
||||||
failed: list[discord.Role] = []
|
failed: list[discord.Role] = []
|
||||||
|
@ -110,6 +112,7 @@ class AntiPolls(commands.Cog):
|
||||||
@antipolls_roles.command(name="remove")
|
@antipolls_roles.command(name="remove")
|
||||||
async def antipolls_roles_remove(self, ctx: commands.Context, *roles: discord.Role) -> None:
|
async def antipolls_roles_remove(self, ctx: commands.Context, *roles: discord.Role) -> None:
|
||||||
"""Remove roles from the whitelist."""
|
"""Remove roles from the whitelist."""
|
||||||
|
assert ctx.guild is not None
|
||||||
async with self.config.guild(ctx.guild).role_whitelist() as role_whitelist:
|
async with self.config.guild(ctx.guild).role_whitelist() as role_whitelist:
|
||||||
role_whitelist: list
|
role_whitelist: list
|
||||||
failed: list[discord.Role] = []
|
failed: list[discord.Role] = []
|
||||||
|
@ -125,10 +128,11 @@ class AntiPolls(commands.Cog):
|
||||||
@antipolls_roles.command(name="list")
|
@antipolls_roles.command(name="list")
|
||||||
async def antipolls_roles_list(self, ctx: commands.Context) -> discord.Message:
|
async def antipolls_roles_list(self, ctx: commands.Context) -> discord.Message:
|
||||||
"""List roles in the whitelist."""
|
"""List roles in the whitelist."""
|
||||||
|
assert ctx.guild is not None
|
||||||
role_whitelist = await self.config.guild(ctx.guild).role_whitelist()
|
role_whitelist = await self.config.guild(ctx.guild).role_whitelist()
|
||||||
if not role_whitelist:
|
if not role_whitelist:
|
||||||
return await ctx.send("No roles in the whitelist.")
|
return await ctx.send("No roles in the whitelist.")
|
||||||
roles = [ctx.guild.get_role(role) for role in role_whitelist]
|
roles = [role for role in (ctx.guild.get_role(role) for role in role_whitelist) if role is not None]
|
||||||
return await ctx.send(humanize_list([role.mention for role in roles]))
|
return await ctx.send(humanize_list([role.mention for role in roles]))
|
||||||
|
|
||||||
@antipolls.group(name="channels")
|
@antipolls.group(name="channels")
|
||||||
|
@ -138,6 +142,7 @@ class AntiPolls(commands.Cog):
|
||||||
@antipolls_channels.command(name="add")
|
@antipolls_channels.command(name="add")
|
||||||
async def antipolls_channels_add(self, ctx: commands.Context, *channels: discord.TextChannel) -> None:
|
async def antipolls_channels_add(self, ctx: commands.Context, *channels: discord.TextChannel) -> None:
|
||||||
"""Add channels to the whitelist."""
|
"""Add channels to the whitelist."""
|
||||||
|
assert ctx.guild is not None
|
||||||
async with self.config.guild(ctx.guild).channel_whitelist() as channel_whitelist:
|
async with self.config.guild(ctx.guild).channel_whitelist() as channel_whitelist:
|
||||||
channel_whitelist: list
|
channel_whitelist: list
|
||||||
failed: list[discord.TextChannel] = []
|
failed: list[discord.TextChannel] = []
|
||||||
|
@ -153,6 +158,7 @@ class AntiPolls(commands.Cog):
|
||||||
@antipolls_channels.command(name="remove")
|
@antipolls_channels.command(name="remove")
|
||||||
async def antipolls_channels_remove(self, ctx: commands.Context, *channels: discord.TextChannel) -> None:
|
async def antipolls_channels_remove(self, ctx: commands.Context, *channels: discord.TextChannel) -> None:
|
||||||
"""Remove channels from the whitelist."""
|
"""Remove channels from the whitelist."""
|
||||||
|
assert ctx.guild is not None
|
||||||
async with self.config.guild(ctx.guild).channel_whitelist() as channel_whitelist:
|
async with self.config.guild(ctx.guild).channel_whitelist() as channel_whitelist:
|
||||||
channel_whitelist: list
|
channel_whitelist: list
|
||||||
failed: list[discord.TextChannel] = []
|
failed: list[discord.TextChannel] = []
|
||||||
|
@ -168,14 +174,19 @@ class AntiPolls(commands.Cog):
|
||||||
@antipolls_channels.command(name="list")
|
@antipolls_channels.command(name="list")
|
||||||
async def antipolls_channels_list(self, ctx: commands.Context) -> discord.Message:
|
async def antipolls_channels_list(self, ctx: commands.Context) -> discord.Message:
|
||||||
"""List channels in the whitelist."""
|
"""List channels in the whitelist."""
|
||||||
|
assert ctx.guild is not None
|
||||||
channel_whitelist = await self.config.guild(ctx.guild).channel_whitelist()
|
channel_whitelist = await self.config.guild(ctx.guild).channel_whitelist()
|
||||||
if not channel_whitelist:
|
if not channel_whitelist:
|
||||||
return await ctx.send("No channels in the whitelist.")
|
return await ctx.send("No channels in the whitelist.")
|
||||||
channels = [ctx.guild.get_channel(channel) for channel in channel_whitelist]
|
channels = [channel for channel in (ctx.guild.get_channel(channel) for channel in channel_whitelist) if channel is not None]
|
||||||
|
for c in channels:
|
||||||
|
if not c:
|
||||||
|
channels.remove(c)
|
||||||
return await ctx.send(humanize_list([channel.mention for channel in channels]))
|
return await ctx.send(humanize_list([channel.mention for channel in channels]))
|
||||||
|
|
||||||
@antipolls.command(name="managemessages")
|
@antipolls.command(name="managemessages")
|
||||||
async def antipolls_managemessages(self, ctx: commands.Context, enabled: bool) -> None:
|
async def antipolls_managemessages(self, ctx: commands.Context, enabled: bool) -> None:
|
||||||
"""Toggle Manage Messages permission check."""
|
"""Toggle Manage Messages permission check."""
|
||||||
|
assert ctx.guild is not None
|
||||||
await self.config.guild(ctx.guild).manage_messages.set(enabled)
|
await self.config.guild(ctx.guild).manage_messages.set(enabled)
|
||||||
await ctx.tick()
|
await ctx.tick()
|
||||||
|
|
|
@ -26,7 +26,7 @@ class Backup(commands.Cog):
|
||||||
|
|
||||||
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
|
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
|
||||||
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
|
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
|
||||||
__version__ = "1.1.2"
|
__version__ = "1.1.3"
|
||||||
__documentation__ = "https://seacogs.coastalcommits.com/backup/"
|
__documentation__ = "https://seacogs.coastalcommits.com/backup/"
|
||||||
|
|
||||||
def __init__(self, bot: Red):
|
def __init__(self, bot: Red):
|
||||||
|
@ -45,14 +45,15 @@ class Backup(commands.Cog):
|
||||||
]
|
]
|
||||||
return "\n".join(text)
|
return "\n".join(text)
|
||||||
|
|
||||||
@commands.group(autohelp=True)
|
@commands.group(autohelp=True) # type: ignore
|
||||||
@commands.is_owner()
|
@commands.is_owner()
|
||||||
async def backup(self, ctx: commands.Context):
|
async def backup(self, ctx: commands.Context) -> None:
|
||||||
"""Backup your installed cogs."""
|
"""Backup your installed cogs."""
|
||||||
|
pass
|
||||||
|
|
||||||
@backup.command(name="export")
|
@backup.command(name="export")
|
||||||
@commands.is_owner()
|
@commands.is_owner()
|
||||||
async def backup_export(self, ctx: commands.Context):
|
async def backup_export(self, ctx: commands.Context) -> None:
|
||||||
"""Export your installed repositories and cogs to a file."""
|
"""Export your installed repositories and cogs to a file."""
|
||||||
downloader = ctx.bot.get_cog("Downloader")
|
downloader = ctx.bot.get_cog("Downloader")
|
||||||
if downloader is None:
|
if downloader is None:
|
||||||
|
@ -91,13 +92,13 @@ class Backup(commands.Cog):
|
||||||
|
|
||||||
@backup.command(name="import")
|
@backup.command(name="import")
|
||||||
@commands.is_owner()
|
@commands.is_owner()
|
||||||
async def backup_import(self, ctx: commands.Context):
|
async def backup_import(self, ctx: commands.Context) -> None:
|
||||||
"""Import your installed repositories and cogs from an export file."""
|
"""Import your installed repositories and cogs from an export file."""
|
||||||
try:
|
try:
|
||||||
export = json.loads(await ctx.message.attachments[0].read())
|
export = json.loads(await ctx.message.attachments[0].read())
|
||||||
except (json.JSONDecodeError, IndexError):
|
except (json.JSONDecodeError, IndexError):
|
||||||
try:
|
try:
|
||||||
export = json.loads(await ctx.message.reference.resolved.attachments[0].read())
|
export = json.loads(await ctx.message.reference.resolved.attachments[0].read()) # type: ignore - this is fine to let error because it gets handled
|
||||||
except (json.JSONDecodeError, IndexError, AttributeError):
|
except (json.JSONDecodeError, IndexError, AttributeError):
|
||||||
await ctx.send(error("Please provide a valid JSON export file."))
|
await ctx.send(error("Please provide a valid JSON export file."))
|
||||||
return
|
return
|
||||||
|
|
|
@ -27,7 +27,7 @@ class Bible(commands.Cog):
|
||||||
|
|
||||||
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
|
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
|
||||||
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
|
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
|
||||||
__version__ = "1.1.3"
|
__version__ = "1.1.4"
|
||||||
__documentation__ = "https://seacogs.coastalcommits.com/pterodactyl/"
|
__documentation__ = "https://seacogs.coastalcommits.com/pterodactyl/"
|
||||||
|
|
||||||
def __init__(self, bot: Red):
|
def __init__(self, bot: Red):
|
||||||
|
@ -145,6 +145,7 @@ class Bible(commands.Cog):
|
||||||
if response.status == 503:
|
if response.status == 503:
|
||||||
raise bible.errors.ServiceUnavailableError
|
raise bible.errors.ServiceUnavailableError
|
||||||
|
|
||||||
|
assert self.bot.user is not None # bot will always be logged in
|
||||||
fums_url = "https://fums.api.bible/f3"
|
fums_url = "https://fums.api.bible/f3"
|
||||||
fums_params = {
|
fums_params = {
|
||||||
"t": data["meta"]["fumsToken"],
|
"t": data["meta"]["fumsToken"],
|
||||||
|
@ -246,9 +247,9 @@ class Bible(commands.Cog):
|
||||||
from_verse, to_verse = passage.replace(":", ".").split("-")
|
from_verse, to_verse = passage.replace(":", ".").split("-")
|
||||||
if "." not in to_verse:
|
if "." not in to_verse:
|
||||||
to_verse = f"{from_verse.split('.')[0]}.{to_verse}"
|
to_verse = f"{from_verse.split('.')[0]}.{to_verse}"
|
||||||
passage = await self._get_passage(ctx, bible_id, f"{book_id}.{from_verse}-{book_id}.{to_verse}", True)
|
retrieved_passage = await self._get_passage(ctx, bible_id, f"{book_id}.{from_verse}-{book_id}.{to_verse}", True)
|
||||||
else:
|
else:
|
||||||
passage = await self._get_passage(ctx, bible_id, f"{book_id}.{passage.replace(':', '.')}", False)
|
retrieved_passage = await self._get_passage(ctx, bible_id, f"{book_id}.{passage.replace(':', '.')}", False)
|
||||||
except (
|
except (
|
||||||
bible.errors.BibleAccessError,
|
bible.errors.BibleAccessError,
|
||||||
bible.errors.NotFoundError,
|
bible.errors.NotFoundError,
|
||||||
|
@ -259,21 +260,21 @@ class Bible(commands.Cog):
|
||||||
await ctx.send(e.message)
|
await ctx.send(e.message)
|
||||||
return
|
return
|
||||||
|
|
||||||
if len(passage["content"]) > 4096:
|
if len(retrieved_passage["content"]) > 4096:
|
||||||
await ctx.send("The passage is too long to send.")
|
await ctx.send("The passage is too long to send.")
|
||||||
return
|
return
|
||||||
|
|
||||||
if await ctx.embed_requested():
|
if await ctx.embed_requested():
|
||||||
icon = self.get_icon(await ctx.embed_color())
|
icon = self.get_icon(await ctx.embed_color())
|
||||||
embed = Embed(
|
embed = Embed(
|
||||||
title=f"{passage['reference']}",
|
title=f"{retrieved_passage['reference']}",
|
||||||
description=passage["content"].replace("¶ ", ""),
|
description=retrieved_passage["content"].replace("¶ ", ""),
|
||||||
color=await ctx.embed_color(),
|
color=await ctx.embed_color(),
|
||||||
)
|
)
|
||||||
embed.set_footer(text=f"{ctx.prefix}bible passage - Powered by API.Bible - {version.abbreviation_local} ({version.language_local}, {version.description_local})", icon_url="attachment://icon.png")
|
embed.set_footer(text=f"{ctx.prefix}bible passage - Powered by API.Bible - {version.abbreviation_local} ({version.language_local}, {version.description_local})", icon_url="attachment://icon.png")
|
||||||
await ctx.send(embed=embed, file=icon)
|
await ctx.send(embed=embed, file=icon)
|
||||||
else:
|
else:
|
||||||
await ctx.send(f"## {passage['reference']}\n{passage['content']}")
|
await ctx.send(f"## {retrieved_passage['reference']}\n{retrieved_passage['content']}")
|
||||||
|
|
||||||
@bible.command(name="random")
|
@bible.command(name="random")
|
||||||
async def bible_random(self, ctx: commands.Context):
|
async def bible_random(self, ctx: commands.Context):
|
||||||
|
|
|
@ -16,7 +16,7 @@ class EmojiInfo(commands.Cog):
|
||||||
|
|
||||||
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
|
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
|
||||||
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
|
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
|
||||||
__version__ = "1.0.2"
|
__version__ = "1.0.3"
|
||||||
__documentation__ = "https://seacogs.coastalcommits.com/emojiinfo/"
|
__documentation__ = "https://seacogs.coastalcommits.com/emojiinfo/"
|
||||||
|
|
||||||
def __init__(self, bot: Red) -> None:
|
def __init__(self, bot: Red) -> None:
|
||||||
|
@ -69,7 +69,7 @@ class EmojiInfo(commands.Cog):
|
||||||
else:
|
else:
|
||||||
emoji_id = ""
|
emoji_id = ""
|
||||||
markdown = f"`{emoji}`"
|
markdown = f"`{emoji}`"
|
||||||
name = f"{bold('Name:')} {emoji.aliases.pop(0)}\n"
|
name = f"{bold('Name:')} {emoji.aliases.pop(0) if emoji.aliases else emoji.name}\n"
|
||||||
aliases = f"{bold('Aliases:')} {', '.join(emoji.aliases)}\n" if emoji.aliases else ""
|
aliases = f"{bold('Aliases:')} {', '.join(emoji.aliases)}\n" if emoji.aliases else ""
|
||||||
group = f"{bold('Group:')} {emoji.group}\n"
|
group = f"{bold('Group:')} {emoji.group}\n"
|
||||||
|
|
||||||
|
@ -82,15 +82,13 @@ class EmojiInfo(commands.Cog):
|
||||||
await interaction.response.defer(ephemeral=ephemeral)
|
await interaction.response.defer(ephemeral=ephemeral)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
emoji: PartialEmoji = PartialEmoji.from_str(self, value=emoji)
|
retrieved_emoji: PartialEmoji = PartialEmoji.from_str(self, value=emoji)
|
||||||
(
|
string, emoji_url = await self.get_emoji_info(retrieved_emoji)
|
||||||
string,
|
|
||||||
emoji_url,
|
|
||||||
) = await self.get_emoji_info(emoji)
|
|
||||||
self.logger.verbose(f"Emoji:\n{string}")
|
self.logger.verbose(f"Emoji:\n{string}")
|
||||||
except (IndexError, UnboundLocalError):
|
except (IndexError, UnboundLocalError):
|
||||||
return await interaction.followup.send("Please provide a valid emoji!")
|
return await interaction.followup.send("Please provide a valid emoji!")
|
||||||
|
|
||||||
|
assert isinstance(interaction.channel, discord.TextChannel)
|
||||||
if await self.bot.embed_requested(channel=interaction.channel):
|
if await self.bot.embed_requested(channel=interaction.channel):
|
||||||
embed = discord.Embed(title="Emoji Information", description=string, color=await self.fetch_primary_color(emoji_url) or await self.bot.get_embed_color(interaction.channel))
|
embed = discord.Embed(title="Emoji Information", description=string, color=await self.fetch_primary_color(emoji_url) or await self.bot.get_embed_color(interaction.channel))
|
||||||
embed.set_thumbnail(url=emoji_url)
|
embed.set_thumbnail(url=emoji_url)
|
||||||
|
@ -104,20 +102,18 @@ class EmojiInfo(commands.Cog):
|
||||||
async def emoji(self, ctx: commands.Context, *, emoji: str) -> None:
|
async def emoji(self, ctx: commands.Context, *, emoji: str) -> None:
|
||||||
"""Retrieve information about an emoji."""
|
"""Retrieve information about an emoji."""
|
||||||
try:
|
try:
|
||||||
emoji: PartialEmoji = PartialEmoji.from_str(self, value=emoji)
|
retrieved_emoji: PartialEmoji = PartialEmoji.from_str(self, value=emoji)
|
||||||
(
|
string, emoji_url = await self.get_emoji_info(retrieved_emoji)
|
||||||
string,
|
|
||||||
emoji_url,
|
|
||||||
) = await self.get_emoji_info(emoji)
|
|
||||||
self.logger.verbose(f"Emoji:\n{string}")
|
self.logger.verbose(f"Emoji:\n{string}")
|
||||||
except (IndexError, UnboundLocalError):
|
except (IndexError, UnboundLocalError):
|
||||||
return await ctx.send("Please provide a valid emoji!")
|
await ctx.send("Please provide a valid emoji!")
|
||||||
|
return
|
||||||
|
|
||||||
if await ctx.embed_requested():
|
if await ctx.embed_requested():
|
||||||
embed = discord.Embed(title="Emoji Information", description=string, color=await self.fetch_primary_color(emoji_url) or await ctx.embed_color)
|
embed = discord.Embed(title="Emoji Information", description=string, color=await self.fetch_primary_color(emoji_url) or await ctx.embed_color())
|
||||||
embed.set_thumbnail(url=emoji_url)
|
embed.set_thumbnail(url=emoji_url)
|
||||||
|
|
||||||
await ctx.send(embed=embed)
|
await ctx.send(embed=embed)
|
||||||
return None
|
return
|
||||||
await ctx.send(content=string)
|
await ctx.send(content=string)
|
||||||
return None
|
return
|
||||||
|
|
|
@ -77,7 +77,7 @@ class PartialEmoji(discord.PartialEmoji):
|
||||||
name = groups["name"]
|
name = groups["name"]
|
||||||
return cls(name=name, animated=animated, id=emoji_id)
|
return cls(name=name, animated=animated, id=emoji_id)
|
||||||
|
|
||||||
path: data_manager.Path = data_manager.bundled_data_path(coginstance) / "emojis.json"
|
path = data_manager.bundled_data_path(coginstance) / "emojis.json"
|
||||||
with open(path, "r", encoding="UTF-8") as file:
|
with open(path, "r", encoding="UTF-8") as file:
|
||||||
emojis: dict = json.load(file)
|
emojis: dict = json.load(file)
|
||||||
emoji_aliases = []
|
emoji_aliases = []
|
||||||
|
|
|
@ -2,18 +2,18 @@ import py_compile
|
||||||
from asyncio import run_coroutine_threadsafe
|
from asyncio import run_coroutine_threadsafe
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from tempfile import NamedTemporaryFile
|
from tempfile import NamedTemporaryFile
|
||||||
from typing import TYPE_CHECKING, List, Sequence, Tuple
|
from typing import Generator, List, Sequence
|
||||||
|
|
||||||
|
import discord
|
||||||
from red_commons.logging import RedTraceLogger, getLogger
|
from red_commons.logging import RedTraceLogger, getLogger
|
||||||
from redbot.core import Config, checks, commands
|
from redbot.core import Config, checks, commands
|
||||||
from redbot.core.bot import Red
|
from redbot.core.bot import Red
|
||||||
from redbot.core.core_commands import CoreLogic
|
from redbot.core.core_commands import CoreLogic
|
||||||
from redbot.core.utils.chat_formatting import bold, box, humanize_list
|
from redbot.core.utils.chat_formatting import bold, box, humanize_list
|
||||||
|
from typing_extensions import override
|
||||||
from watchdog.events import FileSystemEvent, FileSystemMovedEvent, RegexMatchingEventHandler
|
from watchdog.events import FileSystemEvent, FileSystemMovedEvent, RegexMatchingEventHandler
|
||||||
from watchdog.observers import Observer
|
from watchdog.observers import Observer
|
||||||
|
from watchdog.observers.api import BaseObserver
|
||||||
if TYPE_CHECKING:
|
|
||||||
from watchdog.observers import ObserverType
|
|
||||||
|
|
||||||
|
|
||||||
class HotReload(commands.Cog):
|
class HotReload(commands.Cog):
|
||||||
|
@ -21,24 +21,26 @@ class HotReload(commands.Cog):
|
||||||
|
|
||||||
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
|
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
|
||||||
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
|
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
|
||||||
__version__ = "1.4.0"
|
__version__ = "1.4.1"
|
||||||
__documentation__ = "https://seacogs.coastalcommits.com/hotreload/"
|
__documentation__ = "https://seacogs.coastalcommits.com/hotreload/"
|
||||||
|
|
||||||
def __init__(self, bot: Red) -> None:
|
def __init__(self, bot: Red) -> None:
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.bot: Red = bot
|
self.bot: Red = bot
|
||||||
self.config = Config.get_conf(self, identifier=294518358420750336, force_registration=True)
|
self.config: Config = Config.get_conf(cog_instance=self, identifier=294518358420750336, force_registration=True)
|
||||||
self.logger: RedTraceLogger = getLogger(name="red.SeaCogs.HotReload")
|
self.logger: RedTraceLogger = getLogger(name="red.SeaCogs.HotReload")
|
||||||
self.observers: List[ObserverType] = []
|
self.observers: List[BaseObserver] = []
|
||||||
self.config.register_global(notify_channel=None, compile_before_reload=False)
|
self.config.register_global(notify_channel=None, compile_before_reload=False)
|
||||||
watchdog_loggers = [getLogger(name="watchdog.observers.inotify_buffer")]
|
watchdog_loggers = [getLogger(name="watchdog.observers.inotify_buffer")]
|
||||||
for watchdog_logger in watchdog_loggers:
|
for watchdog_logger in watchdog_loggers:
|
||||||
watchdog_logger.setLevel("INFO") # SHUT UP!!!!
|
watchdog_logger.setLevel("INFO") # SHUT UP!!!!
|
||||||
|
|
||||||
|
@override
|
||||||
async def cog_load(self) -> None:
|
async def cog_load(self) -> None:
|
||||||
"""Start the observer when the cog is loaded."""
|
"""Start the observer when the cog is loaded."""
|
||||||
self.bot.loop.create_task(self.start_observer())
|
_ = self.bot.loop.create_task(self.start_observer())
|
||||||
|
|
||||||
|
@override
|
||||||
async def cog_unload(self) -> None:
|
async def cog_unload(self) -> None:
|
||||||
"""Stop the observer when the cog is unloaded."""
|
"""Stop the observer when the cog is unloaded."""
|
||||||
for observer in self.observers:
|
for observer in self.observers:
|
||||||
|
@ -46,6 +48,7 @@ class HotReload(commands.Cog):
|
||||||
observer.join()
|
observer.join()
|
||||||
self.logger.info("Stopped observer. No longer watching for file changes.")
|
self.logger.info("Stopped observer. No longer watching for file changes.")
|
||||||
|
|
||||||
|
@override
|
||||||
def format_help_for_context(self, ctx: commands.Context) -> str:
|
def format_help_for_context(self, ctx: commands.Context) -> str:
|
||||||
pre_processed = super().format_help_for_context(ctx) or ""
|
pre_processed = super().format_help_for_context(ctx) or ""
|
||||||
n = "\n" if "\n\n" not in pre_processed else ""
|
n = "\n" if "\n\n" not in pre_processed else ""
|
||||||
|
@ -57,7 +60,7 @@ class HotReload(commands.Cog):
|
||||||
]
|
]
|
||||||
return "\n".join(text)
|
return "\n".join(text)
|
||||||
|
|
||||||
async def get_paths(self) -> Tuple[Path]:
|
async def get_paths(self) -> Generator[Path]:
|
||||||
"""Retrieve user defined paths."""
|
"""Retrieve user defined paths."""
|
||||||
cog_manager = self.bot._cog_mgr # noqa: SLF001 # We have to use this private method because there is no public API to get user defined paths
|
cog_manager = self.bot._cog_mgr # noqa: SLF001 # We have to use this private method because there is no public API to get user defined paths
|
||||||
cog_paths = await cog_manager.user_defined_paths()
|
cog_paths = await cog_manager.user_defined_paths()
|
||||||
|
@ -79,7 +82,7 @@ class HotReload(commands.Cog):
|
||||||
self.logger.warning("Path %s does not exist. Skipping.", path)
|
self.logger.warning("Path %s does not exist. Skipping.", path)
|
||||||
continue
|
continue
|
||||||
self.logger.debug("Adding observer schedule for path %s.", path)
|
self.logger.debug("Adding observer schedule for path %s.", path)
|
||||||
observer.schedule(event_handler=HotReloadHandler(cog=self, path=path), path=path, recursive=True)
|
observer.schedule(event_handler=HotReloadHandler(cog=self, path=path), path=str(path), recursive=True)
|
||||||
observer.start()
|
observer.start()
|
||||||
self.logger.info("Started observer. Watching for file changes.")
|
self.logger.info("Started observer. Watching for file changes.")
|
||||||
is_first = False
|
is_first = False
|
||||||
|
@ -91,24 +94,24 @@ class HotReload(commands.Cog):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@hotreload_group.command(name="notifychannel")
|
@hotreload_group.command(name="notifychannel")
|
||||||
async def hotreload_notifychannel(self, ctx: commands.Context, channel: commands.TextChannelConverter) -> None:
|
async def hotreload_notifychannel(self, ctx: commands.Context, channel: discord.TextChannel) -> None:
|
||||||
"""Set the channel to send notifications to."""
|
"""Set the channel to send notifications to."""
|
||||||
await self.config.notify_channel.set(channel.id)
|
await self.config.notify_channel.set(channel.id)
|
||||||
await ctx.send(f"Notifications will be sent to {channel.mention}.")
|
await ctx.send(f"Notifications will be sent to {channel.mention}.")
|
||||||
|
|
||||||
@hotreload_group.command(name="compile")
|
@hotreload_group.command(name="compile") # type: ignore
|
||||||
async def hotreload_compile(self, ctx: commands.Context, compile_before_reload: bool) -> None:
|
async def hotreload_compile(self, ctx: commands.Context, compile_before_reload: bool) -> None:
|
||||||
"""Set whether to compile modified files before reloading."""
|
"""Set whether to compile modified files before reloading."""
|
||||||
await self.config.compile_before_reload.set(compile_before_reload)
|
await self.config.compile_before_reload.set(compile_before_reload)
|
||||||
await ctx.send(f"I {'will' if compile_before_reload else 'will not'} compile modified files before hotreloading cogs.")
|
await ctx.send(f"I {'will' if compile_before_reload else 'will not'} compile modified files before hotreloading cogs.")
|
||||||
|
|
||||||
@hotreload_group.command(name="list")
|
@hotreload_group.command(name="list") # type: ignore
|
||||||
async def hotreload_list(self, ctx: commands.Context) -> None:
|
async def hotreload_list(self, ctx: commands.Context) -> None:
|
||||||
"""List the currently active observers."""
|
"""List the currently active observers."""
|
||||||
if not self.observers:
|
if not self.observers:
|
||||||
await ctx.send("No observers are currently active.")
|
await ctx.send("No observers are currently active.")
|
||||||
return
|
return
|
||||||
await ctx.send(f"Currently active observers (If there are more than one of these, report an issue): {box(humanize_list(self.observers, style='unit'))}")
|
await ctx.send(f"Currently active observers (If there are more than one of these, report an issue): {box(humanize_list([str(o) for o in self.observers], style='unit'))}")
|
||||||
|
|
||||||
|
|
||||||
class HotReloadHandler(RegexMatchingEventHandler):
|
class HotReloadHandler(RegexMatchingEventHandler):
|
||||||
|
@ -129,13 +132,13 @@ class HotReloadHandler(RegexMatchingEventHandler):
|
||||||
if event.event_type not in allowed_events:
|
if event.event_type not in allowed_events:
|
||||||
return
|
return
|
||||||
|
|
||||||
relative_src_path = Path(event.src_path).relative_to(self.path)
|
relative_src_path = Path(str(event.src_path)).relative_to(self.path)
|
||||||
src_package_name = relative_src_path.parts[0]
|
src_package_name = relative_src_path.parts[0]
|
||||||
cogs_to_reload = [src_package_name]
|
cogs_to_reload = [src_package_name]
|
||||||
|
|
||||||
if isinstance(event, FileSystemMovedEvent):
|
if isinstance(event, FileSystemMovedEvent):
|
||||||
dest = f" to {event.dest_path}"
|
dest = f" to {event.dest_path}"
|
||||||
relative_dest_path = Path(event.dest_path).relative_to(self.path)
|
relative_dest_path = Path(str(event.dest_path)).relative_to(self.path)
|
||||||
dest_package_name = relative_dest_path.parts[0]
|
dest_package_name = relative_dest_path.parts[0]
|
||||||
if dest_package_name != src_package_name:
|
if dest_package_name != src_package_name:
|
||||||
cogs_to_reload.append(dest_package_name)
|
cogs_to_reload.append(dest_package_name)
|
||||||
|
@ -147,7 +150,7 @@ class HotReloadHandler(RegexMatchingEventHandler):
|
||||||
run_coroutine_threadsafe(
|
run_coroutine_threadsafe(
|
||||||
coro=self.reload_cogs(
|
coro=self.reload_cogs(
|
||||||
cog_names=cogs_to_reload,
|
cog_names=cogs_to_reload,
|
||||||
paths=[Path(p) for p in (event.src_path, getattr(event, "dest_path", None)) if p],
|
paths=[Path(str(p)) for p in (event.src_path, getattr(event, "dest_path", None)) if p],
|
||||||
),
|
),
|
||||||
loop=self.cog.bot.loop,
|
loop=self.cog.bot.loop,
|
||||||
)
|
)
|
||||||
|
@ -163,7 +166,7 @@ class HotReloadHandler(RegexMatchingEventHandler):
|
||||||
self.logger.info("Reloaded cogs: %s", humanize_list(cog_names, style="unit"))
|
self.logger.info("Reloaded cogs: %s", humanize_list(cog_names, style="unit"))
|
||||||
|
|
||||||
channel = self.cog.bot.get_channel(await self.cog.config.notify_channel())
|
channel = self.cog.bot.get_channel(await self.cog.config.notify_channel())
|
||||||
if channel:
|
if channel and isinstance(channel, discord.TextChannel):
|
||||||
await channel.send(f"Reloaded cogs: {humanize_list(cog_names, style='unit')}")
|
await channel.send(f"Reloaded cogs: {humanize_list(cog_names, style='unit')}")
|
||||||
|
|
||||||
def compile_modified_files(self, cog_names: Sequence[str], paths: Sequence[Path]) -> bool:
|
def compile_modified_files(self, cog_names: Sequence[str], paths: Sequence[Path]) -> bool:
|
||||||
|
@ -176,7 +179,7 @@ class HotReloadHandler(RegexMatchingEventHandler):
|
||||||
try:
|
try:
|
||||||
with NamedTemporaryFile() as temp_file:
|
with NamedTemporaryFile() as temp_file:
|
||||||
self.logger.debug("Attempting to compile %s", path)
|
self.logger.debug("Attempting to compile %s", path)
|
||||||
py_compile.compile(file=path, cfile=temp_file.name, doraise=True)
|
py_compile.compile(file=str(path), cfile=temp_file.name, doraise=True)
|
||||||
self.logger.debug("Successfully compiled %s", path)
|
self.logger.debug("Successfully compiled %s", path)
|
||||||
|
|
||||||
except py_compile.PyCompileError as e:
|
except py_compile.PyCompileError as e:
|
||||||
|
|
|
@ -37,16 +37,20 @@ class Nerdify(commands.Cog):
|
||||||
]
|
]
|
||||||
return "\n".join(text)
|
return "\n".join(text)
|
||||||
|
|
||||||
|
|
||||||
@commands.command(aliases=["nerd"])
|
@commands.command(aliases=["nerd"])
|
||||||
async def nerdify(
|
async def nerdify(
|
||||||
self, ctx: commands.Context, *, text: Optional[str] = None,
|
self,
|
||||||
|
ctx: commands.Context,
|
||||||
|
*,
|
||||||
|
text: Optional[str] = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Nerdify the replied to message, previous message, or your own text."""
|
"""Nerdify the replied to message, previous message, or your own text."""
|
||||||
if not text:
|
if not text:
|
||||||
if hasattr(ctx.message, "reference") and ctx.message.reference:
|
if hasattr(ctx.message, "reference") and ctx.message.reference:
|
||||||
with suppress(
|
with suppress(
|
||||||
discord.Forbidden, discord.NotFound, discord.HTTPException,
|
discord.Forbidden,
|
||||||
|
discord.NotFound,
|
||||||
|
discord.HTTPException,
|
||||||
):
|
):
|
||||||
message_id = ctx.message.reference.message_id
|
message_id = ctx.message.reference.message_id
|
||||||
if message_id:
|
if message_id:
|
||||||
|
@ -62,7 +66,9 @@ class Nerdify(commands.Cog):
|
||||||
ctx.channel,
|
ctx.channel,
|
||||||
self.nerdify_text(text),
|
self.nerdify_text(text),
|
||||||
allowed_mentions=discord.AllowedMentions(
|
allowed_mentions=discord.AllowedMentions(
|
||||||
everyone=False, users=False, roles=False,
|
everyone=False,
|
||||||
|
users=False,
|
||||||
|
roles=False,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -77,7 +83,10 @@ class Nerdify(commands.Cog):
|
||||||
return f'"{text}" 🤓'
|
return f'"{text}" 🤓'
|
||||||
|
|
||||||
async def type_message(
|
async def type_message(
|
||||||
self, destination: discord.abc.Messageable, content: str, **kwargs: Any,
|
self,
|
||||||
|
destination: discord.abc.Messageable,
|
||||||
|
content: str,
|
||||||
|
**kwargs: Any,
|
||||||
) -> Union[discord.Message, None]:
|
) -> Union[discord.Message, None]:
|
||||||
"""Simulate typing and sending a message to a destination.
|
"""Simulate typing and sending a message to a destination.
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
from typing import Mapping, Optional, Tuple, Union
|
from typing import AsyncIterable, Iterable, Mapping, Optional, Tuple, Union
|
||||||
|
|
||||||
import discord
|
import discord
|
||||||
import websockets
|
import websockets
|
||||||
|
@ -9,8 +9,9 @@ from pydactyl import PterodactylClient
|
||||||
from redbot.core import app_commands, commands
|
from redbot.core import app_commands, commands
|
||||||
from redbot.core.app_commands import Choice
|
from redbot.core.app_commands import Choice
|
||||||
from redbot.core.bot import Red
|
from redbot.core.bot import Red
|
||||||
from redbot.core.utils.chat_formatting import bold, box, error, humanize_list
|
from redbot.core.utils.chat_formatting import bold, box, humanize_list
|
||||||
from redbot.core.utils.views import ConfirmView
|
from redbot.core.utils.views import ConfirmView
|
||||||
|
from typing_extensions import override
|
||||||
|
|
||||||
from pterodactyl import mcsrvstatus
|
from pterodactyl import mcsrvstatus
|
||||||
from pterodactyl.config import config, register_config
|
from pterodactyl.config import config, register_config
|
||||||
|
@ -22,7 +23,7 @@ class Pterodactyl(commands.Cog):
|
||||||
|
|
||||||
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
|
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
|
||||||
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
|
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
|
||||||
__version__ = "2.0.5"
|
__version__ = "2.0.6"
|
||||||
__documentation__ = "https://seacogs.coastalcommits.com/pterodactyl/"
|
__documentation__ = "https://seacogs.coastalcommits.com/pterodactyl/"
|
||||||
|
|
||||||
def __init__(self, bot: Red):
|
def __init__(self, bot: Red):
|
||||||
|
@ -32,9 +33,10 @@ class Pterodactyl(commands.Cog):
|
||||||
self.websocket: Optional[websockets.ClientConnection] = None
|
self.websocket: Optional[websockets.ClientConnection] = None
|
||||||
self.retry_counter: int = 0
|
self.retry_counter: int = 0
|
||||||
register_config(config)
|
register_config(config)
|
||||||
self.task = self.get_task()
|
self.task = self._get_task()
|
||||||
self.update_topic.start()
|
self.update_topic.start()
|
||||||
|
|
||||||
|
@override
|
||||||
def format_help_for_context(self, ctx: commands.Context) -> str:
|
def format_help_for_context(self, ctx: commands.Context) -> str:
|
||||||
pre_processed = super().format_help_for_context(ctx) or ""
|
pre_processed = super().format_help_for_context(ctx) or ""
|
||||||
n = "\n" if "\n\n" not in pre_processed else ""
|
n = "\n" if "\n\n" not in pre_processed else ""
|
||||||
|
@ -46,50 +48,57 @@ class Pterodactyl(commands.Cog):
|
||||||
]
|
]
|
||||||
return "\n".join(text)
|
return "\n".join(text)
|
||||||
|
|
||||||
|
@override
|
||||||
async def cog_load(self) -> None:
|
async def cog_load(self) -> None:
|
||||||
pterodactyl_keys = await self.bot.get_shared_api_tokens("pterodactyl")
|
pterodactyl_keys = await self.bot.get_shared_api_tokens("pterodactyl")
|
||||||
api_key = pterodactyl_keys.get("api_key")
|
api_key = pterodactyl_keys.get("api_key")
|
||||||
if api_key is None:
|
if api_key is None:
|
||||||
self.task.cancel()
|
self.maybe_cancel_task()
|
||||||
logger.error("Pterodactyl API key not set. Please set it using `[p]set api`.")
|
logger.error("Pterodactyl API key not set. Please set it using `[p]set api`.")
|
||||||
return
|
return
|
||||||
base_url = await config.base_url()
|
base_url = await config.base_url()
|
||||||
if base_url is None:
|
if base_url is None:
|
||||||
self.task.cancel()
|
self.maybe_cancel_task()
|
||||||
logger.error("Pterodactyl base URL not set. Please set it using `[p]pterodactyl config url`.")
|
logger.error("Pterodactyl base URL not set. Please set it using `[p]pterodactyl config url`.")
|
||||||
return
|
return
|
||||||
server_id = await config.server_id()
|
server_id = await config.server_id()
|
||||||
if server_id is None:
|
if server_id is None:
|
||||||
self.task.cancel()
|
self.maybe_cancel_task()
|
||||||
logger.error("Pterodactyl server ID not set. Please set it using `[p]pterodactyl config serverid`.")
|
logger.error("Pterodactyl server ID not set. Please set it using `[p]pterodactyl config serverid`.")
|
||||||
return
|
return
|
||||||
|
|
||||||
self.client = PterodactylClient(base_url, api_key).client
|
self.client = PterodactylClient(base_url, api_key).client
|
||||||
|
|
||||||
|
@override
|
||||||
async def cog_unload(self) -> None:
|
async def cog_unload(self) -> None:
|
||||||
self.update_topic.cancel()
|
self.update_topic.cancel()
|
||||||
self.task.cancel()
|
self.maybe_cancel_task()
|
||||||
self.retry_counter = 0
|
|
||||||
|
|
||||||
def get_task(self) -> asyncio.Task:
|
def maybe_cancel_task(self, reset_retry_counter: bool = True) -> None:
|
||||||
|
if self.task:
|
||||||
|
self.task.cancel()
|
||||||
|
if reset_retry_counter:
|
||||||
|
self.retry_counter = 0
|
||||||
|
|
||||||
|
def _get_task(self) -> asyncio.Task:
|
||||||
from pterodactyl.websocket import establish_websocket_connection
|
from pterodactyl.websocket import establish_websocket_connection
|
||||||
|
|
||||||
task = self.bot.loop.create_task(establish_websocket_connection(self), name="Pterodactyl Websocket Connection")
|
task = self.bot.loop.create_task(establish_websocket_connection(self), name="Pterodactyl Websocket Connection")
|
||||||
task.add_done_callback(self.error_callback)
|
task.add_done_callback(self._error_callback)
|
||||||
return task
|
return task
|
||||||
|
|
||||||
def error_callback(self, fut) -> None: # NOTE Thanks flame442 and zephyrkul for helping me figure this out
|
def _error_callback(self, fut) -> None: # NOTE Thanks flame442 and zephyrkul for helping me figure this out
|
||||||
try:
|
try:
|
||||||
fut.result()
|
fut.result()
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
logger.info("WebSocket task has been cancelled.")
|
logger.info("WebSocket task has been cancelled.")
|
||||||
except Exception as e: # pylint: disable=broad-exception-caught
|
except Exception as e: # pylint: disable=broad-exception-caught
|
||||||
logger.error("WebSocket task has failed: %s", e, exc_info=e)
|
logger.error("WebSocket task has failed: %s", e, exc_info=e)
|
||||||
self.task.cancel()
|
self.maybe_cancel_task(reset_retry_counter=False)
|
||||||
if self.retry_counter < 5:
|
if self.retry_counter < 5:
|
||||||
self.retry_counter += 1
|
self.retry_counter += 1
|
||||||
logger.info("Retrying in %s seconds...", 5 * self.retry_counter)
|
logger.info("Retrying in %s seconds...", 5 * self.retry_counter)
|
||||||
self.task = self.bot.loop.call_later(5 * self.retry_counter, self.get_task)
|
self.task = self.bot.loop.call_later(5 * self.retry_counter, self._get_task)
|
||||||
else:
|
else:
|
||||||
logger.info("Retry limit reached. Stopping task.")
|
logger.info("Retry limit reached. Stopping task.")
|
||||||
|
|
||||||
|
@ -100,9 +109,9 @@ class Pterodactyl(commands.Cog):
|
||||||
console = self.bot.get_channel(await config.console_channel())
|
console = self.bot.get_channel(await config.console_channel())
|
||||||
chat = self.bot.get_channel(await config.chat_channel())
|
chat = self.bot.get_channel(await config.chat_channel())
|
||||||
if console:
|
if console:
|
||||||
await console.edit(topic=topic)
|
await console.edit(topic=topic) # type: ignore
|
||||||
if chat:
|
if chat:
|
||||||
await chat.edit(topic=topic)
|
await chat.edit(topic=topic) # type: ignore
|
||||||
|
|
||||||
@commands.Cog.listener()
|
@commands.Cog.listener()
|
||||||
async def on_message_without_command(self, message: discord.Message) -> None:
|
async def on_message_without_command(self, message: discord.Message) -> None:
|
||||||
|
@ -113,13 +122,7 @@ class Pterodactyl(commands.Cog):
|
||||||
return
|
return
|
||||||
logger.debug("Received console command from %s: %s", message.author.id, message.content)
|
logger.debug("Received console command from %s: %s", message.author.id, message.content)
|
||||||
await message.channel.send(f"Received console command from {message.author.id}: {message.content[:1900]}", allowed_mentions=discord.AllowedMentions.none())
|
await message.channel.send(f"Received console command from {message.author.id}: {message.content[:1900]}", allowed_mentions=discord.AllowedMentions.none())
|
||||||
try:
|
await self._send(json.dumps({"event": "send command", "args": [message.content]}))
|
||||||
await self.websocket.send(json.dumps({"event": "send command", "args": [message.content]}))
|
|
||||||
except websockets.exceptions.ConnectionClosed as e:
|
|
||||||
logger.error("WebSocket connection closed: %s", e)
|
|
||||||
self.task.cancel()
|
|
||||||
self.retry_counter = 0
|
|
||||||
self.task = self.get_task()
|
|
||||||
if message.channel.id == await config.chat_channel() and message.author.bot is False:
|
if message.channel.id == await config.chat_channel() and message.author.bot is False:
|
||||||
logger.debug("Received chat message from %s: %s", message.author.id, message.content)
|
logger.debug("Received chat message from %s: %s", message.author.id, message.content)
|
||||||
channel = self.bot.get_channel(await config.console_channel())
|
channel = self.bot.get_channel(await config.console_channel())
|
||||||
|
@ -127,13 +130,22 @@ class Pterodactyl(commands.Cog):
|
||||||
await channel.send(f"Received chat message from {message.author.id}: {message.content[:1900]}", allowed_mentions=discord.AllowedMentions.none())
|
await channel.send(f"Received chat message from {message.author.id}: {message.content[:1900]}", allowed_mentions=discord.AllowedMentions.none())
|
||||||
msg = json.dumps({"event": "send command", "args": [await self.get_chat_command(message)]})
|
msg = json.dumps({"event": "send command", "args": [await self.get_chat_command(message)]})
|
||||||
logger.debug("Sending chat message to server:\n%s", msg)
|
logger.debug("Sending chat message to server:\n%s", msg)
|
||||||
|
await self._send(message=msg)
|
||||||
|
|
||||||
|
async def _send(self, message: Union[websockets.Data, Iterable[websockets.Data], AsyncIterable[websockets.Data]], text: bool = False):
|
||||||
|
"""Send a message through the websocket connection. Restarts the websocket connection task if it is closed, and reinvokes itself."""
|
||||||
|
try:
|
||||||
|
await self.websocket.send(message=message, text=text) # type: ignore - we want this to error if `self.websocket` is none
|
||||||
|
except websockets.exceptions.ConnectionClosed as e:
|
||||||
|
logger.error("WebSocket connection closed: %s", e)
|
||||||
|
self.maybe_cancel_task()
|
||||||
|
self.task = self._get_task()
|
||||||
try:
|
try:
|
||||||
await self.websocket.send(msg)
|
await asyncio.wait_for(fut=self.task, timeout=60)
|
||||||
except websockets.exceptions.ConnectionClosed as e:
|
await self._send(message=message, text=text)
|
||||||
logger.error("WebSocket connection closed: %s", e)
|
except asyncio.TimeoutError:
|
||||||
self.task.cancel()
|
logger.error("Timeout while waiting for websocket connection")
|
||||||
self.retry_counter = 0
|
raise
|
||||||
self.task = self.get_task()
|
|
||||||
|
|
||||||
async def get_topic(self) -> str:
|
async def get_topic(self) -> str:
|
||||||
topic: str = await config.topic()
|
topic: str = await config.topic()
|
||||||
|
@ -193,7 +205,7 @@ class Pterodactyl(commands.Cog):
|
||||||
|
|
||||||
async def get_player_list_embed(self, ctx: Union[commands.Context, discord.Interaction]) -> Optional[discord.Embed]:
|
async def get_player_list_embed(self, ctx: Union[commands.Context, discord.Interaction]) -> Optional[discord.Embed]:
|
||||||
player_list = await self.get_player_list()
|
player_list = await self.get_player_list()
|
||||||
if player_list:
|
if player_list and isinstance(ctx.channel, discord.abc.Messageable):
|
||||||
embed = discord.Embed(color=await self.bot.get_embed_color(ctx.channel), title="Players Online")
|
embed = discord.Embed(color=await self.bot.get_embed_color(ctx.channel), title="Players Online")
|
||||||
embed.description = player_list[0]
|
embed.description = player_list[0]
|
||||||
return embed
|
return embed
|
||||||
|
@ -206,10 +218,12 @@ class Pterodactyl(commands.Cog):
|
||||||
current_status = await config.current_status()
|
current_status = await config.current_status()
|
||||||
|
|
||||||
if current_status == action_ing:
|
if current_status == action_ing:
|
||||||
return await ctx.send(f"Server is already {action_ing}.", ephemeral=True)
|
await ctx.send(f"Server is already {action_ing}.", ephemeral=True)
|
||||||
|
return
|
||||||
|
|
||||||
if current_status in ["starting", "stopping"] and action != "kill":
|
if current_status in ["starting", "stopping"] and action != "kill":
|
||||||
return await ctx.send("Another power action is already in progress.", ephemeral=True)
|
await ctx.send("Another power action is already in progress.", ephemeral=True)
|
||||||
|
return
|
||||||
|
|
||||||
view = ConfirmView(ctx.author, disable_buttons=True)
|
view = ConfirmView(ctx.author, disable_buttons=True)
|
||||||
|
|
||||||
|
@ -220,13 +234,13 @@ class Pterodactyl(commands.Cog):
|
||||||
if view.result is True:
|
if view.result is True:
|
||||||
await message.edit(content=f"Sending websocket command to {action} server...", view=None)
|
await message.edit(content=f"Sending websocket command to {action} server...", view=None)
|
||||||
|
|
||||||
await self.websocket.send(json.dumps({"event": "set state", "args": [action]}))
|
await self._websocket_send(json.dumps({"event": "set state", "args": [action]}))
|
||||||
|
|
||||||
await message.edit(content=f"Server {action_ing}", view=None)
|
await message.edit(content=f"Server {action_ing}", view=None)
|
||||||
return None
|
return
|
||||||
|
|
||||||
await message.edit(content="Cancelled.", view=None)
|
await message.edit(content="Cancelled.", view=None)
|
||||||
return None
|
return
|
||||||
|
|
||||||
async def send_command(self, ctx: Union[discord.Interaction, commands.Context], command: str):
|
async def send_command(self, ctx: Union[discord.Interaction, commands.Context], command: str):
|
||||||
channel = self.bot.get_channel(await config.console_channel())
|
channel = self.bot.get_channel(await config.console_channel())
|
||||||
|
@ -234,23 +248,15 @@ class Pterodactyl(commands.Cog):
|
||||||
ctx = await self.bot.get_context(ctx)
|
ctx = await self.bot.get_context(ctx)
|
||||||
if channel:
|
if channel:
|
||||||
await channel.send(f"Received console command from {ctx.author.id}: {command[:1900]}", allowed_mentions=discord.AllowedMentions.none())
|
await channel.send(f"Received console command from {ctx.author.id}: {command[:1900]}", allowed_mentions=discord.AllowedMentions.none())
|
||||||
try:
|
await self._websocket_send(json.dumps({"event": "send command", "args": [command]}))
|
||||||
await self.websocket.send(json.dumps({"event": "send command", "args": [command]}))
|
await ctx.send(f"Command sent to server. {box(command, 'json')}")
|
||||||
await ctx.send(f"Command sent to server. {box(command, 'json')}")
|
|
||||||
except websockets.exceptions.ConnectionClosed as e:
|
|
||||||
logger.error("WebSocket connection closed: %s", e)
|
|
||||||
await ctx.send(error("WebSocket connection closed."))
|
|
||||||
self.task.cancel()
|
|
||||||
self.retry_counter = 0
|
|
||||||
self.task = self.get_task()
|
|
||||||
|
|
||||||
@commands.Cog.listener()
|
@commands.Cog.listener()
|
||||||
async def on_red_api_tokens_update(self, service_name: str, api_tokens: Mapping[str, str]): # pylint: disable=unused-argument
|
async def on_red_api_tokens_update(self, service_name: str, api_tokens: Mapping[str, str]): # pylint: disable=unused-argument
|
||||||
if service_name == "pterodactyl":
|
if service_name == "pterodactyl":
|
||||||
logger.info("Configuration value set: api_key\nRestarting task...")
|
logger.info("Configuration value set: api_key\nRestarting task...")
|
||||||
self.task.cancel()
|
self.maybe_cancel_task(reset_retry_counter=True)
|
||||||
self.retry_counter = 0
|
self.task = self._get_task()
|
||||||
self.task = self.get_task()
|
|
||||||
|
|
||||||
slash_pterodactyl = app_commands.Group(name="pterodactyl", description="Pterodactyl allows you to manage your Pterodactyl Panel from Discord.")
|
slash_pterodactyl = app_commands.Group(name="pterodactyl", description="Pterodactyl allows you to manage your Pterodactyl Panel from Discord.")
|
||||||
|
|
||||||
|
@ -346,9 +352,8 @@ class Pterodactyl(commands.Cog):
|
||||||
await config.base_url.set(base_url)
|
await config.base_url.set(base_url)
|
||||||
await ctx.send(f"Base URL set to {base_url}")
|
await ctx.send(f"Base URL set to {base_url}")
|
||||||
logger.info("Configuration value set: base_url = %s\nRestarting task...", base_url)
|
logger.info("Configuration value set: base_url = %s\nRestarting task...", base_url)
|
||||||
self.task.cancel()
|
self.maybe_cancel_task(reset_retry_counter=True)
|
||||||
self.retry_counter = 0
|
self.task = self._get_task()
|
||||||
self.task = self.get_task()
|
|
||||||
|
|
||||||
@pterodactyl_config.command(name="serverid")
|
@pterodactyl_config.command(name="serverid")
|
||||||
async def pterodactyl_config_server_id(self, ctx: commands.Context, *, server_id: str) -> None:
|
async def pterodactyl_config_server_id(self, ctx: commands.Context, *, server_id: str) -> None:
|
||||||
|
@ -356,9 +361,8 @@ class Pterodactyl(commands.Cog):
|
||||||
await config.server_id.set(server_id)
|
await config.server_id.set(server_id)
|
||||||
await ctx.send(f"Server ID set to {server_id}")
|
await ctx.send(f"Server ID set to {server_id}")
|
||||||
logger.info("Configuration value set: server_id = %s\nRestarting task...", server_id)
|
logger.info("Configuration value set: server_id = %s\nRestarting task...", server_id)
|
||||||
self.task.cancel()
|
self.maybe_cancel_task(reset_retry_counter=True)
|
||||||
self.retry_counter = 0
|
self.task = self._get_task()
|
||||||
self.task = self.get_task()
|
|
||||||
|
|
||||||
@pterodactyl_config.group(name="console")
|
@pterodactyl_config.group(name="console")
|
||||||
async def pterodactyl_config_console(self, ctx: commands.Context):
|
async def pterodactyl_config_console(self, ctx: commands.Context):
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
import json
|
import json
|
||||||
import re
|
import re
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional, Tuple, Union
|
from typing import Any, Optional, Tuple, Union
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
import discord
|
import discord
|
||||||
|
@ -56,7 +56,9 @@ async def establish_websocket_connection(coginstance: Pterodactyl) -> None:
|
||||||
content = mask_ip(content)
|
content = mask_ip(content)
|
||||||
|
|
||||||
console_channel = coginstance.bot.get_channel(await config.console_channel())
|
console_channel = coginstance.bot.get_channel(await config.console_channel())
|
||||||
|
assert isinstance(console_channel, discord.abc.Messageable)
|
||||||
chat_channel = coginstance.bot.get_channel(await config.chat_channel())
|
chat_channel = coginstance.bot.get_channel(await config.chat_channel())
|
||||||
|
assert isinstance(chat_channel, discord.abc.Messageable)
|
||||||
if console_channel is not None:
|
if console_channel is not None:
|
||||||
if content.startswith("["):
|
if content.startswith("["):
|
||||||
pagified_content = pagify(content, delims=[" ", "\n"])
|
pagified_content = pagify(content, delims=[" ", "\n"])
|
||||||
|
@ -83,7 +85,7 @@ async def establish_websocket_connection(coginstance: Pterodactyl) -> None:
|
||||||
embed, img = await generate_join_leave_embed(coginstance=coginstance, username=join_message, join=True)
|
embed, img = await generate_join_leave_embed(coginstance=coginstance, username=join_message, join=True)
|
||||||
if img:
|
if img:
|
||||||
with open(img, "rb") as file:
|
with open(img, "rb") as file:
|
||||||
await chat_channel.send(embed=embed, file=file)
|
await chat_channel.send(embed=embed, file=discord.File(fp=file))
|
||||||
else:
|
else:
|
||||||
await chat_channel.send(embed=embed)
|
await chat_channel.send(embed=embed)
|
||||||
else:
|
else:
|
||||||
|
@ -96,7 +98,7 @@ async def establish_websocket_connection(coginstance: Pterodactyl) -> None:
|
||||||
embed, img = await generate_join_leave_embed(coginstance=coginstance, username=leave_message, join=False)
|
embed, img = await generate_join_leave_embed(coginstance=coginstance, username=leave_message, join=False)
|
||||||
if img:
|
if img:
|
||||||
with open(img, "rb") as file:
|
with open(img, "rb") as file:
|
||||||
await chat_channel.send(embed=embed, file=file)
|
await chat_channel.send(embed=embed, file=discord.File(fp=file))
|
||||||
else:
|
else:
|
||||||
await chat_channel.send(embed=embed)
|
await chat_channel.send(embed=embed)
|
||||||
else:
|
else:
|
||||||
|
@ -106,7 +108,11 @@ async def establish_websocket_connection(coginstance: Pterodactyl) -> None:
|
||||||
if achievement_message:
|
if achievement_message:
|
||||||
if chat_channel is not None:
|
if chat_channel is not None:
|
||||||
if coginstance.bot.embed_requested(chat_channel):
|
if coginstance.bot.embed_requested(chat_channel):
|
||||||
await chat_channel.send(embed=await generate_achievement_embed(coginstance, achievement_message["username"], achievement_message["achievement"], achievement_message["challenge"]))
|
embed, img = await generate_achievement_embed(coginstance, achievement_message["username"], achievement_message["achievement"], achievement_message["challenge"])
|
||||||
|
if img:
|
||||||
|
await chat_channel.send(embed=embed, file=discord.File(fp=img))
|
||||||
|
else:
|
||||||
|
await chat_channel.send(embed=embed)
|
||||||
else:
|
else:
|
||||||
await chat_channel.send(f"{achievement_message['username']} has {'completed the challenge' if achievement_message['challenge'] else 'made the advancement'} {achievement_message['achievement']}")
|
await chat_channel.send(f"{achievement_message['username']} has {'completed the challenge' if achievement_message['challenge'] else 'made the advancement'} {achievement_message['achievement']}")
|
||||||
|
|
||||||
|
@ -130,24 +136,27 @@ async def establish_websocket_connection(coginstance: Pterodactyl) -> None:
|
||||||
await chat.send(await config.shutdown_msg())
|
await chat.send(await config.shutdown_msg())
|
||||||
|
|
||||||
|
|
||||||
async def retrieve_websocket_credentials(coginstance: Pterodactyl) -> Optional[dict]:
|
async def retrieve_websocket_credentials(coginstance: Pterodactyl) -> dict:
|
||||||
pterodactyl_keys = await coginstance.bot.get_shared_api_tokens("pterodactyl")
|
pterodactyl_keys = await coginstance.bot.get_shared_api_tokens("pterodactyl")
|
||||||
api_key = pterodactyl_keys.get("api_key")
|
api_key = pterodactyl_keys.get("api_key")
|
||||||
if api_key is None:
|
if api_key is None:
|
||||||
coginstance.task.cancel()
|
coginstance.maybe_cancel_task()
|
||||||
raise ValueError("Pterodactyl API key not set. Please set it using `[p]set api`.")
|
raise ValueError("Pterodactyl API key not set. Please set it using `[p]set api`.")
|
||||||
base_url = await config.base_url()
|
base_url = await config.base_url()
|
||||||
if base_url is None:
|
if base_url is None:
|
||||||
coginstance.task.cancel()
|
coginstance.maybe_cancel_task()
|
||||||
raise ValueError("Pterodactyl base URL not set. Please set it using `[p]pterodactyl config url`.")
|
raise ValueError("Pterodactyl base URL not set. Please set it using `[p]pterodactyl config url`.")
|
||||||
server_id = await config.server_id()
|
server_id = await config.server_id()
|
||||||
if server_id is None:
|
if server_id is None:
|
||||||
coginstance.task.cancel()
|
coginstance.maybe_cancel_task()
|
||||||
raise ValueError("Pterodactyl server ID not set. Please set it using `[p]pterodactyl config serverid`.")
|
raise ValueError("Pterodactyl server ID not set. Please set it using `[p]pterodactyl config serverid`.")
|
||||||
|
|
||||||
client = PterodactylClient(base_url, api_key).client
|
client = PterodactylClient(base_url, api_key).client
|
||||||
coginstance.client = client
|
coginstance.client = client
|
||||||
websocket_credentials = client.servers.get_websocket(server_id)
|
websocket_credentials: dict[str, Any] = client.servers.get_websocket(server_id).json()
|
||||||
|
if not websocket_credentials:
|
||||||
|
coginstance.maybe_cancel_task()
|
||||||
|
raise ValueError("Failed to retrieve websocket credentials. Please ensure the API details are correctly configured.")
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"""Websocket connection details retrieved:
|
"""Websocket connection details retrieved:
|
||||||
Socket: %s
|
Socket: %s
|
||||||
|
@ -165,44 +174,44 @@ def remove_ansi_escape_codes(text: str) -> str:
|
||||||
return ansi_escape.sub("", text)
|
return ansi_escape.sub("", text)
|
||||||
|
|
||||||
|
|
||||||
async def check_if_server_message(text: str) -> Union[bool, str]:
|
async def check_if_server_message(text: str) -> Optional[str]:
|
||||||
regex = await config.server_regex()
|
regex = await config.server_regex()
|
||||||
match: Optional[re.Match[str]] = re.match(regex, text)
|
match: Optional[re.Match[str]] = re.match(regex, text)
|
||||||
if match:
|
if match:
|
||||||
logger.trace("Message is a server message")
|
logger.trace("Message is a server message")
|
||||||
return match.group(1)
|
return match.group(1)
|
||||||
return False
|
return None
|
||||||
|
|
||||||
|
|
||||||
async def check_if_chat_message(text: str) -> Union[bool, dict]:
|
async def check_if_chat_message(text: str) -> Optional[dict]:
|
||||||
regex = await config.chat_regex()
|
regex = await config.chat_regex()
|
||||||
match: Optional[re.Match[str]] = re.match(regex, text)
|
match: Optional[re.Match[str]] = re.match(regex, text)
|
||||||
if match:
|
if match:
|
||||||
groups = {"username": match.group(1), "message": match.group(2)}
|
groups = {"username": match.group(1), "message": match.group(2)}
|
||||||
logger.trace("Message is a chat message\n%s", json.dumps(groups))
|
logger.trace("Message is a chat message\n%s", json.dumps(groups))
|
||||||
return groups
|
return groups
|
||||||
return False
|
return None
|
||||||
|
|
||||||
|
|
||||||
async def check_if_join_message(text: str) -> Union[bool, str]:
|
async def check_if_join_message(text: str) -> Optional[str]:
|
||||||
regex = await config.join_regex()
|
regex = await config.join_regex()
|
||||||
match: Optional[re.Match[str]] = re.match(regex, text)
|
match: Optional[re.Match[str]] = re.match(regex, text)
|
||||||
if match:
|
if match:
|
||||||
logger.trace("Message is a join message")
|
logger.trace("Message is a join message")
|
||||||
return match.group(1)
|
return match.group(1)
|
||||||
return False
|
return None
|
||||||
|
|
||||||
|
|
||||||
async def check_if_leave_message(text: str) -> Union[bool, str]:
|
async def check_if_leave_message(text: str) -> Optional[str]:
|
||||||
regex = await config.leave_regex()
|
regex = await config.leave_regex()
|
||||||
match: Optional[re.Match[str]] = re.match(regex, text)
|
match: Optional[re.Match[str]] = re.match(regex, text)
|
||||||
if match:
|
if match:
|
||||||
logger.trace("Message is a leave message")
|
logger.trace("Message is a leave message")
|
||||||
return match.group(1)
|
return match.group(1)
|
||||||
return False
|
return None
|
||||||
|
|
||||||
|
|
||||||
async def check_if_achievement_message(text: str) -> Union[bool, dict]:
|
async def check_if_achievement_message(text: str) -> Optional[dict]:
|
||||||
regex = await config.achievement_regex()
|
regex = await config.achievement_regex()
|
||||||
match: Optional[re.Match[str]] = re.match(regex, text)
|
match: Optional[re.Match[str]] = re.match(regex, text)
|
||||||
if match:
|
if match:
|
||||||
|
@ -213,7 +222,7 @@ async def check_if_achievement_message(text: str) -> Union[bool, dict]:
|
||||||
groups["challenge"] = False
|
groups["challenge"] = False
|
||||||
logger.trace("Message is an achievement message")
|
logger.trace("Message is an achievement message")
|
||||||
return groups
|
return groups
|
||||||
return False
|
return None
|
||||||
|
|
||||||
|
|
||||||
async def get_info(username: str) -> Optional[dict]:
|
async def get_info(username: str) -> Optional[dict]:
|
||||||
|
|
|
@ -42,7 +42,7 @@ class SeaUtils(commands.Cog):
|
||||||
|
|
||||||
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
|
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
|
||||||
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
|
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
|
||||||
__version__ = "1.0.1"
|
__version__ = "1.0.2"
|
||||||
__documentation__ = "https://seacogs.coastalcommits.com/seautils/"
|
__documentation__ = "https://seacogs.coastalcommits.com/seautils/"
|
||||||
|
|
||||||
def __init__(self, bot: Red) -> None:
|
def __init__(self, bot: Red) -> None:
|
||||||
|
@ -74,7 +74,7 @@ class SeaUtils(commands.Cog):
|
||||||
src = obj.function
|
src = obj.function
|
||||||
return inspect.getsource(object=src)
|
return inspect.getsource(object=src)
|
||||||
|
|
||||||
@commands.command(aliases=["source", "src", "code", "showsource"])
|
@commands.command(aliases=["source", "src", "code", "showsource"]) # type: ignore
|
||||||
@commands.is_owner()
|
@commands.is_owner()
|
||||||
async def showcode(self, ctx: commands.Context, *, object: str) -> None: # pylint: disable=redefined-builtin # noqa: A002
|
async def showcode(self, ctx: commands.Context, *, object: str) -> None: # pylint: disable=redefined-builtin # noqa: A002
|
||||||
"""Show the code for a particular object."""
|
"""Show the code for a particular object."""
|
||||||
|
@ -102,7 +102,7 @@ class SeaUtils(commands.Cog):
|
||||||
else:
|
else:
|
||||||
await ctx.send(content="Object not found!", reference=ctx.message.to_reference(fail_if_not_exists=False))
|
await ctx.send(content="Object not found!", reference=ctx.message.to_reference(fail_if_not_exists=False))
|
||||||
|
|
||||||
@commands.command(name="dig", aliases=["dnslookup", "nslookup"])
|
@commands.command(name="dig", aliases=["dnslookup", "nslookup"]) # type: ignore
|
||||||
@commands.is_owner()
|
@commands.is_owner()
|
||||||
async def dig(self, ctx: commands.Context, name: str, record_type: str | None = None, server: str | None = None, port: int = 53) -> None:
|
async def dig(self, ctx: commands.Context, name: str, record_type: str | None = None, server: str | None = None, port: int = 53) -> None:
|
||||||
"""Retrieve DNS information for a domain.
|
"""Retrieve DNS information for a domain.
|
||||||
|
@ -110,7 +110,7 @@ class SeaUtils(commands.Cog):
|
||||||
Uses `dig` to perform a DNS query. Will fall back to `nslookup` if `dig` is not installed on the system.
|
Uses `dig` to perform a DNS query. Will fall back to `nslookup` if `dig` is not installed on the system.
|
||||||
`nslookup` does not provide as much information as `dig`, so only the `name` parameter will be used if `nslookup` is used.
|
`nslookup` does not provide as much information as `dig`, so only the `name` parameter will be used if `nslookup` is used.
|
||||||
Will return the A, AAAA, and CNAME records for a domain by default. You can specify a different record type with the `type` parameter."""
|
Will return the A, AAAA, and CNAME records for a domain by default. You can specify a different record type with the `type` parameter."""
|
||||||
command_opts: list[str | int] = ["dig"]
|
command_opts: list[str] = ["dig"]
|
||||||
query_types: list[str] = [record_type] if record_type else ["A", "AAAA", "CNAME"]
|
query_types: list[str] = [record_type] if record_type else ["A", "AAAA", "CNAME"]
|
||||||
if server:
|
if server:
|
||||||
command_opts.extend(["@", server])
|
command_opts.extend(["@", server])
|
||||||
|
@ -176,7 +176,7 @@ class SeaUtils(commands.Cog):
|
||||||
embed.add_field(name="Authority Section", value=f"{cf.box(text=authority_section, lang='prolog')}", inline=False)
|
embed.add_field(name="Authority Section", value=f"{cf.box(text=authority_section, lang='prolog')}", inline=False)
|
||||||
await ctx.send(embed=embed)
|
await ctx.send(embed=embed)
|
||||||
else:
|
else:
|
||||||
await ctx.send(content=cf.box(text=stdout, lang="yaml"))
|
await ctx.send(content=cf.box(text=str(stdout), lang="yaml"))
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
try:
|
try:
|
||||||
ns_process = await asyncio.create_subprocess_exec("nslookup", name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
|
ns_process = await asyncio.create_subprocess_exec("nslookup", name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
|
||||||
|
@ -208,7 +208,7 @@ class SeaUtils(commands.Cog):
|
||||||
html = await response.text()
|
html = await response.text()
|
||||||
soup = BeautifulSoup(html, "html.parser")
|
soup = BeautifulSoup(html, "html.parser")
|
||||||
pre_tags = soup.find_all("pre")
|
pre_tags = soup.find_all("pre")
|
||||||
content: list[Embed | str] = []
|
content: list[str | Embed] = []
|
||||||
for pre_tag in pre_tags:
|
for pre_tag in pre_tags:
|
||||||
text = format_rfc_text(md(pre_tag), number)
|
text = format_rfc_text(md(pre_tag), number)
|
||||||
if len(text) > 4096:
|
if len(text) > 4096:
|
||||||
|
@ -227,6 +227,6 @@ class SeaUtils(commands.Cog):
|
||||||
if await ctx.embed_requested():
|
if await ctx.embed_requested():
|
||||||
for embed in content:
|
for embed in content:
|
||||||
embed.set_footer(text=f"Page {content.index(embed) + 1}/{len(content)}")
|
embed.set_footer(text=f"Page {content.index(embed) + 1}/{len(content)}")
|
||||||
await SimpleMenu(pages=content, disable_after_timeout=True, timeout=300).start(ctx)
|
await SimpleMenu(pages=content, disable_after_timeout=True, timeout=300).start(ctx) # type: ignore
|
||||||
else:
|
else:
|
||||||
await ctx.maybe_send_embed(message=cf.error(f"An error occurred while fetching RFC {number}. Status code: {response.status}."))
|
await ctx.maybe_send_embed(message=cf.error(f"An error occurred while fetching RFC {number}. Status code: {response.status}."))
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue