Merge branch 'main' into aurora/v3
Some checks failed
Actions / Build Documentation (MkDocs) (pull_request) Successful in 43s
Actions / Lint Code (Ruff & Pylint) (pull_request) Failing after 52s

This commit is contained in:
cswimr 2025-01-26 09:17:06 -05:00
commit ec5978350f
Signed by: CoastalCommits
GPG key ID: 7E73189F651A553F
17 changed files with 671 additions and 709 deletions

2
.gitignore vendored
View file

@ -3,3 +3,5 @@ site
.venv .venv
.data .data
__pycache__ __pycache__
.mypy_cache/
.ruff_cache/

11
.vscode/settings.json vendored
View file

@ -1,6 +1,5 @@
{ {
"[python]": { "[python]": {
"editor.formatOnSave": true,
"editor.codeActionsOnSave": { "editor.codeActionsOnSave": {
"source.fixAll": "explicit" "source.fixAll": "explicit"
}, },
@ -8,5 +7,15 @@
}, },
"[json]": { "[json]": {
"editor.defaultFormatter": "vscode.json-language-features" "editor.defaultFormatter": "vscode.json-language-features"
},
"[jsonc]": {
"editor.defaultFormatter": "vscode.json-language-features"
},
"editor.formatOnSave": true,
"files.exclude": {
"**/.git": true,
"**/__pycache__": true,
"**/.ruff_cache": true,
"**/.mypy_cache": true
} }
} }

View file

@ -17,13 +17,16 @@ from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import bold, error, humanize_list, text_to_file from redbot.core.utils.chat_formatting import bold, error, humanize_list, text_to_file
# Disable Ruff & Pylint complaining about accessing private members
# That's kind of necessary for this cog to function because the Downloader cog has a limited public API
# ruff: noqa: SLF001 # Private member access
# pylint: disable=protected-access # pylint: disable=protected-access
class Backup(commands.Cog): class Backup(commands.Cog):
"""A utility to make reinstalling repositories and cogs after migrating the bot far easier.""" """A utility to make reinstalling repositories and cogs after migrating the bot far easier."""
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"] __author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs" __git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
__version__ = "1.1.1" __version__ = "1.1.2"
__documentation__ = "https://seacogs.coastalcommits.com/backup/" __documentation__ = "https://seacogs.coastalcommits.com/backup/"
def __init__(self, bot: Red): def __init__(self, bot: Red):
@ -53,11 +56,7 @@ class Backup(commands.Cog):
"""Export your installed repositories and cogs to a file.""" """Export your installed repositories and cogs to a file."""
downloader = ctx.bot.get_cog("Downloader") downloader = ctx.bot.get_cog("Downloader")
if downloader is None: if downloader is None:
await ctx.send( await ctx.send(error(f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again."))
error(
f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again."
)
)
return return
all_repos = list(downloader._repo_manager.repos) all_repos = list(downloader._repo_manager.repos)
@ -78,7 +77,7 @@ class Backup(commands.Cog):
if cog.repo_name == repo.name: if cog.repo_name == repo.name:
cog_dict = { cog_dict = {
"name": cog.name, "name": cog.name,
# "loaded": cog.name in ctx.bot.extensions.keys(), # "loaded": cog.name in ctx.bot.extensions.keys(), # noqa: ERA001
# this functionality was planned but never implemented due to Red limitations # this functionality was planned but never implemented due to Red limitations
# and the possibility of restoration functionality being added to Core # and the possibility of restoration functionality being added to Core
"pinned": cog.pinned, "pinned": cog.pinned,
@ -88,9 +87,7 @@ class Backup(commands.Cog):
export_data.append(repo_dict) export_data.append(repo_dict)
await ctx.send( await ctx.send(file=text_to_file(json.dumps(export_data, indent=4), "backup.json"))
file=text_to_file(json.dumps(export_data, indent=4), "backup.json")
)
@backup.command(name="import") @backup.command(name="import")
@commands.is_owner() @commands.is_owner()
@ -107,11 +104,7 @@ class Backup(commands.Cog):
downloader = ctx.bot.get_cog("Downloader") downloader = ctx.bot.get_cog("Downloader")
if downloader is None: if downloader is None:
await ctx.send( await ctx.send(error(f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again."))
error(
f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again."
)
)
return return
repo_s = [] repo_s = []
@ -133,32 +126,20 @@ class Backup(commands.Cog):
repo_e.append("PyLav cogs are not supported.") repo_e.append("PyLav cogs are not supported.")
continue continue
if name.startswith(".") or name.endswith("."): if name.startswith(".") or name.endswith("."):
repo_e.append( repo_e.append(f"Invalid repository name: {name}\nRepository names cannot start or end with a dot.")
f"Invalid repository name: {name}\nRepository names cannot start or end with a dot."
)
continue continue
if re.match(r"^[a-zA-Z0-9_\-\.]+$", name) is None: if re.match(r"^[a-zA-Z0-9_\-\.]+$", name) is None:
repo_e.append( repo_e.append(f"Invalid repository name: {name}\nRepository names may only contain letters, numbers, underscores, hyphens, and dots.")
f"Invalid repository name: {name}\nRepository names may only contain letters, numbers, underscores, hyphens, and dots."
)
continue continue
try: try:
repository = await downloader._repo_manager.add_repo( repository = await downloader._repo_manager.add_repo(url, name, branch)
url, name, branch repo_s.append(f"Added repository {name} from {url} on branch {branch}.")
) self.logger.debug("Added repository %s from %s on branch %s", name, url, branch)
repo_s.append(
f"Added repository {name} from {url} on branch {branch}."
)
self.logger.debug(
"Added repository %s from %s on branch %s", name, url, branch
)
except errors.ExistingGitRepo: except errors.ExistingGitRepo:
repo_e.append(f"Repository {name} already exists.") repo_e.append(f"Repository {name} already exists.")
repository = downloader._repo_manager.get_repo( repository = downloader._repo_manager.get_repo(name)
name
)
self.logger.debug("Repository %s already exists", name) self.logger.debug("Repository %s already exists", name)
except errors.AuthenticationError as err: except errors.AuthenticationError as err:
@ -172,9 +153,7 @@ class Backup(commands.Cog):
continue continue
except errors.CloningError as err: except errors.CloningError as err:
repo_e.append( repo_e.append(f"Cloning error while adding repository {name}. See logs for more information.")
f"Cloning error while adding repository {name}. See logs for more information."
)
self.logger.exception( self.logger.exception(
"Something went wrong whilst cloning %s (to revision %s)", "Something went wrong whilst cloning %s (to revision %s)",
url, url,
@ -184,9 +163,7 @@ class Backup(commands.Cog):
continue continue
except OSError: except OSError:
repo_e.append( repo_e.append(f"OS error while adding repository {name}. See logs for more information.")
f"OS error while adding repository {name}. See logs for more information."
)
self.logger.exception( self.logger.exception(
"Something went wrong trying to add repo %s under name %s", "Something went wrong trying to add repo %s under name %s",
url, url,
@ -206,23 +183,19 @@ class Backup(commands.Cog):
continue continue
cog_modules.append(cog_module) cog_modules.append(cog_module)
for cog in set(cog.name for cog in cog_modules): for cog in {cog.name for cog in cog_modules}:
poss_installed_path = (await downloader.cog_install_path()) / cog poss_installed_path = (await downloader.cog_install_path()) / cog
if poss_installed_path.exists(): if poss_installed_path.exists():
with contextlib.suppress(commands.ExtensionNotLoaded): with contextlib.suppress(commands.ExtensionNotLoaded):
await ctx.bot.unload_extension(cog) await ctx.bot.unload_extension(cog)
await ctx.bot.remove_loaded_package(cog) await ctx.bot.remove_loaded_package(cog)
await downloader._delete_cog( await downloader._delete_cog(poss_installed_path)
poss_installed_path
)
uninstall_s.append(f"Uninstalled {cog}") uninstall_s.append(f"Uninstalled {cog}")
self.logger.debug("Uninstalled %s", cog) self.logger.debug("Uninstalled %s", cog)
else: else:
uninstall_e.append(f"Failed to uninstall {cog}") uninstall_e.append(f"Failed to uninstall {cog}")
self.logger.warning("Failed to uninstall %s", cog) self.logger.warning("Failed to uninstall %s", cog)
await downloader._remove_from_installed( await downloader._remove_from_installed(cog_modules)
cog_modules
)
for cog in cogs: for cog in cogs:
cog_name = cog["name"] cog_name = cog["name"]
@ -236,25 +209,15 @@ class Backup(commands.Cog):
if cog_name == "backup" and "cswimr/SeaCogs" in url: if cog_name == "backup" and "cswimr/SeaCogs" in url:
continue continue
async with repository.checkout( async with repository.checkout(commit, exit_to_rev=repository.branch):
commit, exit_to_rev=repository.branch cogs_c, message = await downloader._filter_incorrect_cogs_by_names(repository, [cog_name])
):
cogs_c, message = (
await downloader._filter_incorrect_cogs_by_names(
repository, [cog_name]
)
)
if not cogs_c: if not cogs_c:
install_e.append(message) install_e.append(message)
self.logger.error(message) self.logger.error(message)
continue continue
failed_reqs = await downloader._install_requirements( failed_reqs = await downloader._install_requirements(cogs_c)
cogs_c
)
if failed_reqs: if failed_reqs:
install_e.append( install_e.append(f"Failed to install {cog_name} due to missing requirements: {failed_reqs}")
f"Failed to install {cog_name} due to missing requirements: {failed_reqs}"
)
self.logger.error( self.logger.error(
"Failed to install %s due to missing requirements: %s", "Failed to install %s due to missing requirements: %s",
cog_name, cog_name,
@ -262,51 +225,37 @@ class Backup(commands.Cog):
) )
continue continue
installed_cogs, failed_cogs = await downloader._install_cogs( installed_cogs, failed_cogs = await downloader._install_cogs(cogs_c)
cogs_c
)
if repository.available_libraries: if repository.available_libraries:
installed_libs, failed_libs = ( installed_libs, failed_libs = await repository.install_libraries(
await repository.install_libraries( target_dir=downloader.SHAREDLIB_PATH,
target_dir=downloader.SHAREDLIB_PATH, req_target_dir=downloader.LIB_PATH,
req_target_dir=downloader.LIB_PATH,
)
) )
else: else:
installed_libs = None installed_libs = None
failed_libs = None failed_libs = None
if cog_pinned: if cog_pinned:
for cog in installed_cogs: for cog in installed_cogs: # noqa: PLW2901
cog.pinned = True cog.pinned = True
await downloader._save_to_installed( await downloader._save_to_installed(installed_cogs + installed_libs if installed_libs else installed_cogs)
installed_cogs + installed_libs
if installed_libs
else installed_cogs
)
if installed_cogs: if installed_cogs:
installed_cog_name = installed_cogs[0].name installed_cog_name = installed_cogs[0].name
install_s.append(f"Installed {installed_cog_name}") install_s.append(f"Installed {installed_cog_name}")
self.logger.debug("Installed %s", installed_cog_name) self.logger.debug("Installed %s", installed_cog_name)
if installed_libs: if installed_libs:
for lib in installed_libs: for lib in installed_libs:
install_s.append( install_s.append(f"Installed {lib.name} required for {cog_name}")
f"Installed {lib.name} required for {cog_name}" self.logger.debug("Installed %s required for %s", lib.name, cog_name)
)
self.logger.debug(
"Installed %s required for %s", lib.name, cog_name
)
if failed_cogs: if failed_cogs:
failed_cog_name = failed_cogs[0].name failed_cog_name = failed_cogs[0].name
install_e.append(f"Failed to install {failed_cog_name}") install_e.append(f"Failed to install {failed_cog_name}")
self.logger.error("Failed to install %s", failed_cog_name) self.logger.error("Failed to install %s", failed_cog_name)
if failed_libs: if failed_libs:
for lib in failed_libs: for lib in failed_libs:
install_e.append( install_e.append(f"Failed to install {lib.name} required for {cog_name}")
f"Failed to install {lib.name} required for {cog_name}"
)
self.logger.error( self.logger.error(
"Failed to install %s required for %s", "Failed to install %s required for %s",
lib.name, lib.name,

View file

@ -1,15 +1,21 @@
{ {
"author" : ["cswimr"], "author": [
"install_msg" : "Thank you for installing Backup!\nYou can find the source code of this cog [here](https://coastalcommits.com/cswimr/SeaCogs).", "cswimr"
"name" : "Backup", ],
"short" : "A utility to make reinstalling repositories and cogs after migrating the bot far easier.", "install_msg": "Thank you for installing Backup!\nYou can find the source code of this cog [here](https://coastalcommits.com/cswimr/SeaCogs).",
"description" : "A utility to make reinstalling repositories and cogs after migrating the bot far easier.", "name": "Backup",
"end_user_data_statement" : "This cog does not store end user data.", "short": "A utility to make reinstalling repositories and cogs after migrating the bot far easier.",
"description": "A utility to make reinstalling repositories and cogs after migrating the bot far easier.",
"end_user_data_statement": "This cog does not store end user data.",
"hidden": false, "hidden": false,
"disabled": false, "disabled": false,
"min_bot_version": "3.5.6", "min_bot_version": "3.5.6",
"max_bot_version": "3.5.13", "max_bot_version": "3.5.14",
"min_python_version": [3, 9, 0], "min_python_version": [
3,
9,
0
],
"tags": [ "tags": [
"utility", "utility",
"backup", "backup",

View file

@ -27,7 +27,7 @@ class Bible(commands.Cog):
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"] __author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs" __git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
__version__ = "1.1.2" __version__ = "1.1.3"
__documentation__ = "https://seacogs.coastalcommits.com/pterodactyl/" __documentation__ = "https://seacogs.coastalcommits.com/pterodactyl/"
def __init__(self, bot: Red): def __init__(self, bot: Red):
@ -91,20 +91,20 @@ class Bible(commands.Cog):
response.status, response.status,
) )
if response.status == 401: if response.status == 401:
raise bible.errors.Unauthorized() raise bible.errors.UnauthorizedError
if response.status == 403: if response.status == 403:
raise bible.errors.BibleAccessError() raise bible.errors.BibleAccessError
if response.status == 503: if response.status == 503:
raise bible.errors.ServiceUnavailable() raise bible.errors.ServiceUnavailableError
return Version( return Version(
bible_id, bible_id=bible_id,
data["data"]["abbreviation"], abbreviation=data["data"]["abbreviation"],
data["data"]["language"]["name"], language=data["data"]["language"]["name"],
data["data"]["abbreviationLocal"], abbreviation_local=data["data"]["abbreviationLocal"],
data["data"]["language"]["nameLocal"], language_local=data["data"]["language"]["nameLocal"],
data["data"]["description"], description=data["data"]["description"],
data["data"]["descriptionLocal"], description_local=data["data"]["descriptionLocal"],
data["data"]["copyright"], version_copyright=data["data"]["copyright"],
) )
async def _get_passage( async def _get_passage(
@ -135,15 +135,15 @@ class Bible(commands.Cog):
response.status, response.status,
) )
if response.status == 400: if response.status == 400:
raise bible.errors.InexplicableError() raise bible.errors.InexplicableError
if response.status == 401: if response.status == 401:
raise bible.errors.Unauthorized() raise bible.errors.UnauthorizedError
if response.status == 403: if response.status == 403:
raise bible.errors.BibleAccessError() raise bible.errors.BibleAccessError
if response.status == 404: if response.status == 404:
raise bible.errors.NotFound() raise bible.errors.NotFoundError
if response.status == 503: if response.status == 503:
raise bible.errors.ServiceUnavailable() raise bible.errors.ServiceUnavailableError
fums_url = "https://fums.api.bible/f3" fums_url = "https://fums.api.bible/f3"
fums_params = { fums_params = {
@ -176,11 +176,11 @@ class Bible(commands.Cog):
response.status, response.status,
) )
if response.status == 401: if response.status == 401:
raise bible.errors.Unauthorized() raise bible.errors.UnauthorizedError
if response.status == 403: if response.status == 403:
raise bible.errors.BibleAccessError() raise bible.errors.BibleAccessError
if response.status == 503: if response.status == 503:
raise bible.errors.ServiceUnavailable() raise bible.errors.ServiceUnavailableError
return data["data"] return data["data"]
async def _get_chapters(self, bible_id: str, book_id: str) -> dict: async def _get_chapters(self, bible_id: str, book_id: str) -> dict:
@ -195,11 +195,11 @@ class Bible(commands.Cog):
response.status, response.status,
) )
if response.status == 401: if response.status == 401:
raise bible.errors.Unauthorized() raise bible.errors.UnauthorizedError
if response.status == 403: if response.status == 403:
raise bible.errors.BibleAccessError() raise bible.errors.BibleAccessError
if response.status == 503: if response.status == 503:
raise bible.errors.ServiceUnavailable() raise bible.errors.ServiceUnavailableError
return data["data"] return data["data"]
async def _get_verses(self, bible_id: str, book_id: str, chapter: int) -> dict: async def _get_verses(self, bible_id: str, book_id: str, chapter: int) -> dict:
@ -214,11 +214,11 @@ class Bible(commands.Cog):
response.status, response.status,
) )
if response.status == 401: if response.status == 401:
raise bible.errors.Unauthorized() raise bible.errors.UnauthorizedError
if response.status == 403: if response.status == 403:
raise bible.errors.BibleAccessError() raise bible.errors.BibleAccessError
if response.status == 503: if response.status == 503:
raise bible.errors.ServiceUnavailable() raise bible.errors.ServiceUnavailableError
return data["data"] return data["data"]
@commands.group(autohelp=True) @commands.group(autohelp=True)
@ -251,10 +251,10 @@ class Bible(commands.Cog):
passage = await self._get_passage(ctx, bible_id, f"{book_id}.{passage.replace(':', '.')}", False) passage = await self._get_passage(ctx, bible_id, f"{book_id}.{passage.replace(':', '.')}", False)
except ( except (
bible.errors.BibleAccessError, bible.errors.BibleAccessError,
bible.errors.NotFound, bible.errors.NotFoundError,
bible.errors.InexplicableError, bible.errors.InexplicableError,
bible.errors.ServiceUnavailable, bible.errors.ServiceUnavailableError,
bible.errors.Unauthorized, bible.errors.UnauthorizedError,
) as e: ) as e:
await ctx.send(e.message) await ctx.send(e.message)
return return
@ -270,7 +270,7 @@ class Bible(commands.Cog):
description=passage["content"].replace("", ""), description=passage["content"].replace("", ""),
color=await ctx.embed_color(), color=await ctx.embed_color(),
) )
embed.set_footer(text=f"{ctx.prefix}bible passage - Powered by API.Bible - {version.abbreviationLocal} ({version.languageLocal}, {version.descriptionLocal})", icon_url="attachment://icon.png") embed.set_footer(text=f"{ctx.prefix}bible passage - Powered by API.Bible - {version.abbreviation_local} ({version.language_local}, {version.description_local})", icon_url="attachment://icon.png")
await ctx.send(embed=embed, file=icon) await ctx.send(embed=embed, file=icon)
else: else:
await ctx.send(f"## {passage['reference']}\n{passage['content']}") await ctx.send(f"## {passage['reference']}\n{passage['content']}")
@ -294,10 +294,10 @@ class Bible(commands.Cog):
passage = await self._get_passage(ctx, bible_id, verse, False) passage = await self._get_passage(ctx, bible_id, verse, False)
except ( except (
bible.errors.BibleAccessError, bible.errors.BibleAccessError,
bible.errors.NotFound, bible.errors.NotFoundError,
bible.errors.InexplicableError, bible.errors.InexplicableError,
bible.errors.ServiceUnavailable, bible.errors.ServiceUnavailableError,
bible.errors.Unauthorized, bible.errors.UnauthorizedError,
) as e: ) as e:
await ctx.send(e.message) await ctx.send(e.message)
return return
@ -309,7 +309,7 @@ class Bible(commands.Cog):
description=passage["content"].replace("", ""), description=passage["content"].replace("", ""),
color=await ctx.embed_color(), color=await ctx.embed_color(),
) )
embed.set_footer(text=f"{ctx.prefix}bible random - Powered by API.Bible - {version.abbreviationLocal} ({version.languageLocal}, {version.descriptionLocal})", icon_url="attachment://icon.png") embed.set_footer(text=f"{ctx.prefix}bible random - Powered by API.Bible - {version.abbreviation_local} ({version.language_local}, {version.description_local})", icon_url="attachment://icon.png")
await ctx.send(embed=embed, file=icon) await ctx.send(embed=embed, file=icon)
else: else:
await ctx.send(f"## {passage['reference']}\n{passage['content']}") await ctx.send(f"## {passage['reference']}\n{passage['content']}")

View file

@ -10,7 +10,7 @@ class BibleAccessError(Exception):
self.message = message self.message = message
class Unauthorized(Exception): class UnauthorizedError(Exception):
def __init__( def __init__(
self, self,
message: str = error("The API key for API.Bible is missing or invalid. Please report this to the bot owner.\nIf you are the bot owner, please check the documentation [here](<https://seacogs.coastalcommits.com/bible/#setup>)."), message: str = error("The API key for API.Bible is missing or invalid. Please report this to the bot owner.\nIf you are the bot owner, please check the documentation [here](<https://seacogs.coastalcommits.com/bible/#setup>)."),
@ -19,7 +19,7 @@ class Unauthorized(Exception):
self.message = message self.message = message
class NotFound(Exception): class NotFoundError(Exception):
def __init__( def __init__(
self, self,
message: str = error("The requested passage was not found."), message: str = error("The requested passage was not found."),
@ -28,7 +28,7 @@ class NotFound(Exception):
self.message = message self.message = message
class ServiceUnavailable(Exception): class ServiceUnavailableError(Exception):
def __init__( def __init__(
self, self,
message: str = error("The API.Bible service is currently unavailable."), message: str = error("The API.Bible service is currently unavailable."),

View file

@ -4,23 +4,23 @@ class Version:
bible_id, bible_id,
abbreviation, abbreviation,
language, language,
abbreviationLocal, abbreviation_local,
languageLocal, language_local,
description, description,
descriptionLocal, description_local,
version_copyright, version_copyright,
): ):
self.bible_id = bible_id self.bible_id = bible_id
self.abbreviation = abbreviation self.abbreviation = abbreviation
self.language = language self.language = language
self.abbreviationLocal = abbreviationLocal self.abbreviation_local = abbreviation_local
self.languageLocal = languageLocal self.language_local = language_local
self.description = description self.description = description
self.descriptionLocal = descriptionLocal self.description_local = description_local
self.copyright = version_copyright self.copyright = version_copyright
def __str__(self): def __str__(self):
return self.abbreviationLocal return self.abbreviation_local
def __repr__(self): def __repr__(self):
return f'bible.models.Version("{self.bible_id}", "{self.abbreviation}", "{self.language}", "{self.abbreviationLocal}", "{self.languageLocal}", "{self.description}", "{self.descriptionLocal}", "{self.copyright}")' return f'bible.models.Version("{self.bible_id}", "{self.abbreviation}", "{self.language}", "{self.abbreviation_local}", "{self.language_local}", "{self.description}", "{self.description_local}", "{self.copyright}")'

View file

@ -16,13 +16,13 @@ class EmojiInfo(commands.Cog):
__author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"] __author__ = ["[cswimr](https://www.coastalcommits.com/cswimr)"]
__git__ = "https://www.coastalcommits.com/cswimr/SeaCogs" __git__ = "https://www.coastalcommits.com/cswimr/SeaCogs"
__version__ = "1.0.1" __version__ = "1.0.2"
__documentation__ = "https://seacogs.coastalcommits.com/emojiinfo/" __documentation__ = "https://seacogs.coastalcommits.com/emojiinfo/"
def __init__(self, bot: Red) -> None: def __init__(self, bot: Red) -> None:
super().__init__() super().__init__()
self.bot: Red = bot self.bot: Red = bot
self.logger: RedTraceLogger = getLogger(name="red.SeaCogs.Emoji") self.logger: RedTraceLogger = getLogger(name="red.SeaCogs.EmojiInfo")
def format_help_for_context(self, ctx: commands.Context) -> str: def format_help_for_context(self, ctx: commands.Context) -> str:
pre_processed = super().format_help_for_context(ctx) or "" pre_processed = super().format_help_for_context(ctx) or ""
@ -35,14 +35,12 @@ class EmojiInfo(commands.Cog):
] ]
return "\n".join(text) return "\n".join(text)
async def fetch_twemoji(self, unicode_emoji) -> str: async def fetch_twemoji(self, unicode_emoji) -> str:
base_url = "https://cdn.jsdelivr.net/gh/jdecked/twemoji@latest/assets/72x72/" base_url = "https://cdn.jsdelivr.net/gh/jdecked/twemoji@latest/assets/72x72/"
emoji_codepoint = "-".join([hex(ord(char))[2:] for char in unicode_emoji]) emoji_codepoint = "-".join([hex(ord(char))[2:] for char in unicode_emoji])
segments = emoji_codepoint.split("-") segments = emoji_codepoint.split("-")
valid_segments = [seg for seg in segments if len(seg) >= 4] valid_segments = [seg for seg in segments if len(seg) >= 4]
emoji_url = f"{base_url}{valid_segments[0]}.png" return f"{base_url}{valid_segments[0]}.png"
return emoji_url
async def fetch_primary_color(self, emoji_url: str) -> discord.Color | None: async def fetch_primary_color(self, emoji_url: str) -> discord.Color | None:
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
@ -51,8 +49,7 @@ class EmojiInfo(commands.Cog):
return None return None
image = await response.read() image = await response.read()
dominant_color = ColorThief(io.BytesIO(image)).get_color(quality=1) dominant_color = ColorThief(io.BytesIO(image)).get_color(quality=1)
color = discord.Color.from_rgb(*dominant_color) return discord.Color.from_rgb(*dominant_color)
return color
async def get_emoji_info(self, emoji: PartialEmoji) -> tuple[str, str]: async def get_emoji_info(self, emoji: PartialEmoji) -> tuple[str, str]:
if emoji.is_unicode_emoji(): if emoji.is_unicode_emoji():
@ -76,55 +73,51 @@ class EmojiInfo(commands.Cog):
aliases = f"{bold('Aliases:')} {', '.join(emoji.aliases)}\n" if emoji.aliases else "" aliases = f"{bold('Aliases:')} {', '.join(emoji.aliases)}\n" if emoji.aliases else ""
group = f"{bold('Group:')} {emoji.group}\n" group = f"{bold('Group:')} {emoji.group}\n"
return ( return (f"{name}{emoji_id}{bold('Native:')} {emoji.is_unicode_emoji()}\n{group}{aliases}{bold('Animated:')} {emoji.animated}\n{bold('Markdown:')} {markdown}\n{bold('URL:')} [Click Here]({emoji_url})"), emoji_url
f"{name}"
f"{emoji_id}"
f"{bold('Native:')} {emoji.is_unicode_emoji()}\n"
f"{group}"
f"{aliases}"
f"{bold('Animated:')} {emoji.animated}\n"
f"{bold('Markdown:')} {markdown}\n"
f"{bold('URL:')} [Click Here]({emoji_url})"
), emoji_url
@app_commands.command(name="emoji") @app_commands.command(name="emoji")
@app_commands.describe( @app_commands.describe(emoji="What emoji would you like to get information on?", ephemeral="Would you like the response to be hidden?")
emoji="What emoji would you like to get information on?",
ephemeral="Would you like the response to be hidden?"
)
async def emoji_slash(self, interaction: discord.Interaction, emoji: str, ephemeral: bool = True) -> None: async def emoji_slash(self, interaction: discord.Interaction, emoji: str, ephemeral: bool = True) -> None:
"""Retrieve information about an emoji.""" """Retrieve information about an emoji."""
await interaction.response.defer(ephemeral=ephemeral) await interaction.response.defer(ephemeral=ephemeral)
try: try:
emoji: PartialEmoji = PartialEmoji.from_str(self, value=emoji) emoji: PartialEmoji = PartialEmoji.from_str(self, value=emoji)
string, emoji_url, = await self.get_emoji_info(emoji) (
string,
emoji_url,
) = await self.get_emoji_info(emoji)
self.logger.verbose(f"Emoji:\n{string}") self.logger.verbose(f"Emoji:\n{string}")
except (IndexError, UnboundLocalError): except (IndexError, UnboundLocalError):
return await interaction.followup.send("Please provide a valid emoji!") return await interaction.followup.send("Please provide a valid emoji!")
if await self.bot.embed_requested(channel=interaction.channel): if await self.bot.embed_requested(channel=interaction.channel):
embed = embed = discord.Embed(title="Emoji Information", description=string, color = await self.fetch_primary_color(emoji_url) or await self.bot.get_embed_color(interaction.channel)) embed = discord.Embed(title="Emoji Information", description=string, color=await self.fetch_primary_color(emoji_url) or await self.bot.get_embed_color(interaction.channel))
embed.set_thumbnail(url=emoji_url) embed.set_thumbnail(url=emoji_url)
await interaction.followup.send(embed=embed) await interaction.followup.send(embed=embed)
else: return None
await interaction.followup.send(content=string) await interaction.followup.send(content=string)
return None
@commands.command(name="emoji") @commands.command(name="emoji")
async def emoji(self, ctx: commands.Context, *, emoji: str) -> None: async def emoji(self, ctx: commands.Context, *, emoji: str) -> None:
"""Retrieve information about an emoji.""" """Retrieve information about an emoji."""
try: try:
emoji: PartialEmoji = PartialEmoji.from_str(self, value=emoji) emoji: PartialEmoji = PartialEmoji.from_str(self, value=emoji)
string, emoji_url, = await self.get_emoji_info(emoji) (
string,
emoji_url,
) = await self.get_emoji_info(emoji)
self.logger.verbose(f"Emoji:\n{string}") self.logger.verbose(f"Emoji:\n{string}")
except (IndexError, UnboundLocalError): except (IndexError, UnboundLocalError):
return await ctx.send("Please provide a valid emoji!") return await ctx.send("Please provide a valid emoji!")
if await ctx.embed_requested(): if await ctx.embed_requested():
embed = embed = discord.Embed(title="Emoji Information", description=string, color = await self.fetch_primary_color(emoji_url) or await ctx.embed_color) embed = discord.Embed(title="Emoji Information", description=string, color=await self.fetch_primary_color(emoji_url) or await ctx.embed_color)
embed.set_thumbnail(url=emoji_url) embed.set_thumbnail(url=emoji_url)
await ctx.send(embed=embed) await ctx.send(embed=embed)
else: return None
await ctx.send(content=string) await ctx.send(content=string)
return None

View file

@ -39,7 +39,7 @@ class PartialEmoji(discord.PartialEmoji):
The group name of the emoji if it is a native emoji. The group name of the emoji if it is a native emoji.
""" """
def __init__(self, *, name: str, animated: bool = False, id: int | None = None, group: str | None = None, aliases: list | None = None) -> None: # pylint: disable=redefined-builtin def __init__(self, *, name: str, animated: bool = False, id: int | None = None, group: str | None = None, aliases: list | None = None) -> None: # pylint: disable=redefined-builtin # noqa: A002
super().__init__(name=name, animated=animated, id=id) super().__init__(name=name, animated=animated, id=id)
self.group = group self.group = group
self.aliases = aliases self.aliases = aliases
@ -72,9 +72,9 @@ class PartialEmoji(discord.PartialEmoji):
match = cls._CUSTOM_EMOJI_RE.match(value) match = cls._CUSTOM_EMOJI_RE.match(value)
if match is not None: if match is not None:
groups = match.groupdict() groups = match.groupdict()
animated = bool(groups['animated']) animated = bool(groups["animated"])
emoji_id = int(groups['id']) emoji_id = int(groups["id"])
name = groups['name'] name = groups["name"]
return cls(name=name, animated=animated, id=emoji_id) return cls(name=name, animated=animated, id=emoji_id)
path: data_manager.Path = data_manager.bundled_data_path(coginstance) / "emojis.json" path: data_manager.Path = data_manager.bundled_data_path(coginstance) / "emojis.json"

View file

@ -52,7 +52,7 @@ class HotReload(commands.Cog):
async def get_paths(self) -> tuple[Path]: async def get_paths(self) -> tuple[Path]:
"""Retrieve user defined paths.""" """Retrieve user defined paths."""
cog_manager = self.bot._cog_mgr cog_manager = self.bot._cog_mgr # noqa: SLF001 # We have to use this private method because there is no public API to get user defined paths
cog_paths = await cog_manager.user_defined_paths() cog_paths = await cog_manager.user_defined_paths()
return (Path(path) for path in cog_paths) return (Path(path) for path in cog_paths)
@ -97,13 +97,13 @@ class HotReloadHandler(RegexMatchingEventHandler):
else: else:
dest = "" dest = ""
self.logger.info(f"File {event.src_path} has been {event.event_type}{dest}.") self.logger.info("File %s has been %s%s.", event.src_path, event.event_type, dest)
run_coroutine_threadsafe(self.reload_cogs(cogs_to_reload), loop=self.bot.loop) run_coroutine_threadsafe(self.reload_cogs(cogs_to_reload), loop=self.bot.loop)
async def reload_cogs(self, cog_names: Sequence[str]) -> None: async def reload_cogs(self, cog_names: Sequence[str]) -> None:
"""Reload modified cog.""" """Reload modified cog."""
core_logic = CoreLogic(bot=self.bot) core_logic = CoreLogic(bot=self.bot)
self.logger.info(f"Reloading cogs: {humanize_list(cog_names, style='unit')}") self.logger.info("Reloading cogs: %s", humanize_list(cog_names, style="unit"))
await core_logic._reload(pkg_names=cog_names) await core_logic._reload(pkg_names=cog_names) # noqa: SLF001 # We have to use this private method because there is no public API to reload other cogs
self.logger.info(f"Reloaded cogs: {humanize_list(cog_names, style='unit')}") self.logger.info("Reloaded cogs: %s", humanize_list(cog_names, style="unit"))

View file

@ -40,13 +40,13 @@ class Nerdify(commands.Cog):
@commands.command(aliases=["nerd"]) @commands.command(aliases=["nerd"])
async def nerdify( async def nerdify(
self, ctx: commands.Context, *, text: Optional[str] = None self, ctx: commands.Context, *, text: Optional[str] = None,
) -> None: ) -> None:
"""Nerdify the replied to message, previous message, or your own text.""" """Nerdify the replied to message, previous message, or your own text."""
if not text: if not text:
if hasattr(ctx.message, "reference") and ctx.message.reference: if hasattr(ctx.message, "reference") and ctx.message.reference:
with suppress( with suppress(
discord.Forbidden, discord.NotFound, discord.HTTPException discord.Forbidden, discord.NotFound, discord.HTTPException,
): ):
message_id = ctx.message.reference.message_id message_id = ctx.message.reference.message_id
if message_id: if message_id:
@ -62,7 +62,7 @@ class Nerdify(commands.Cog):
ctx.channel, ctx.channel,
self.nerdify_text(text), self.nerdify_text(text),
allowed_mentions=discord.AllowedMentions( allowed_mentions=discord.AllowedMentions(
everyone=False, users=False, roles=False everyone=False, users=False, roles=False,
), ),
) )
@ -77,7 +77,7 @@ class Nerdify(commands.Cog):
return f'"{text}" 🤓' return f'"{text}" 🤓'
async def type_message( async def type_message(
self, destination: discord.abc.Messageable, content: str, **kwargs: Any self, destination: discord.abc.Messageable, content: str, **kwargs: Any,
) -> Union[discord.Message, None]: ) -> Union[discord.Message, None]:
"""Simulate typing and sending a message to a destination. """Simulate typing and sending a message to a destination.

View file

@ -3,8 +3,8 @@ import aiohttp
async def get_status(host: str, port: int = 25565) -> tuple[bool, dict]: async def get_status(host: str, port: int = 25565) -> tuple[bool, dict]:
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get(f'https://api.mcsrvstat.us/2/{host}:{port}') as response: async with session.get(f"https://api.mcsrvstat.us/2/{host}:{port}") as response:
response = await response.json() response = await response.json() # noqa: PLW2901
if response['online']: if response["online"]:
return (True, response) return (True, response)
return (False, response) return (False, response)

View file

@ -67,20 +67,21 @@ class Pterodactyl(commands.Cog):
self.update_topic.cancel() self.update_topic.cancel()
self.task.cancel() self.task.cancel()
self.retry_counter = 0 self.retry_counter = 0
await self.client._session.close() # pylint: disable=protected-access await self.client._session.close() # pylint: disable=protected-access # noqa: SLF001
def get_task(self) -> asyncio.Task: def get_task(self) -> asyncio.Task:
from pterodactyl.websocket import establish_websocket_connection from pterodactyl.websocket import establish_websocket_connection
task = self.bot.loop.create_task(establish_websocket_connection(self), name="Pterodactyl Websocket Connection") task = self.bot.loop.create_task(establish_websocket_connection(self), name="Pterodactyl Websocket Connection")
task.add_done_callback(self.error_callback) task.add_done_callback(self.error_callback)
return task return task
def error_callback(self, fut) -> None: #NOTE - Thanks flame442 and zephyrkul for helping me figure this out def error_callback(self, fut) -> None: # NOTE - Thanks flame442 and zephyrkul for helping me figure this out
try: try:
fut.result() fut.result()
except asyncio.CancelledError: except asyncio.CancelledError:
logger.info("WebSocket task has been cancelled.") logger.info("WebSocket task has been cancelled.")
except Exception as e: # pylint: disable=broad-exception-caught except Exception as e: # pylint: disable=broad-exception-caught
logger.error("WebSocket task has failed: %s", e, exc_info=e) logger.error("WebSocket task has failed: %s", e, exc_info=e)
self.task.cancel() self.task.cancel()
if self.retry_counter < 5: if self.retry_counter < 5:
@ -141,23 +142,27 @@ class Pterodactyl(commands.Cog):
if await config.api_endpoint() == "minecraft": if await config.api_endpoint() == "minecraft":
status, response = await mcsrvstatus.get_status(await config.topic_hostname(), await config.topic_port()) status, response = await mcsrvstatus.get_status(await config.topic_hostname(), await config.topic_port())
if status: if status:
placeholders.update({ placeholders.update(
"I": response['ip'], {
"M": str(response['players']['max']), "I": response["ip"],
"P": str(response['players']['online']), "M": str(response["players"]["max"]),
"V": response['version'], "P": str(response["players"]["online"]),
"D": response['motd']['clean'][0] if response['motd']['clean'] else "unset", "V": response["version"],
}) "D": response["motd"]["clean"][0] if response["motd"]["clean"] else "unset",
},
)
else: else:
placeholders.update({ placeholders.update(
"I": response['ip'], {
"M": "0", "I": response["ip"],
"P": "0", "M": "0",
"V": "Server Offline", "P": "0",
"D": "Server Offline", "V": "Server Offline",
}) "D": "Server Offline",
},
)
for key, value in placeholders.items(): for key, value in placeholders.items():
topic = topic.replace('.$' + key, value) topic = topic.replace(".$" + key, value)
return topic return topic
async def get_chat_command(self, message: discord.Message) -> str: async def get_chat_command(self, message: discord.Message) -> str:
@ -166,22 +171,23 @@ class Pterodactyl(commands.Cog):
"C": str(message.author.color), "C": str(message.author.color),
"D": message.author.discriminator, "D": message.author.discriminator,
"I": str(message.author.id), "I": str(message.author.id),
"M": message.content.replace('"','').replace("\n", " "), "M": message.content.replace('"', "").replace("\n", " "),
"N": message.author.display_name, "N": message.author.display_name,
"U": message.author.name, "U": message.author.name,
"V": await config.invite() or "use [p]pterodactyl config invite to change me", "V": await config.invite() or "use [p]pterodactyl config invite to change me",
} }
for key, value in placeholders.items(): for key, value in placeholders.items():
command = command.replace('.$' + key, value) command = command.replace(".$" + key, value)
return command return command
async def get_player_list(self) -> Optional[Tuple[str, list]]: async def get_player_list(self) -> Optional[Tuple[str, list]]:
if await config.api_endpoint() == "minecraft": if await config.api_endpoint() == "minecraft":
status, response = await mcsrvstatus.get_status(await config.topic_hostname(), await config.topic_port()) status, response = await mcsrvstatus.get_status(await config.topic_hostname(), await config.topic_port())
if status and 'list' in response['players']: if status and "list" in response["players"]:
output_str = '\n'.join(response['players']['list']) output_str = "\n".join(response["players"]["list"])
return output_str, response['players']['list'] return output_str, response["players"]["list"]
return None return None
return None
async def get_player_list_embed(self, ctx: Union[commands.Context, discord.Interaction]) -> Optional[discord.Embed]: async def get_player_list_embed(self, ctx: Union[commands.Context, discord.Interaction]) -> Optional[discord.Embed]:
player_list = await self.get_player_list() player_list = await self.get_player_list()
@ -191,7 +197,7 @@ class Pterodactyl(commands.Cog):
return embed return embed
return None return None
async def power(self, ctx: Union[discord.Interaction, commands.Context], action: str, action_ing: str, warning: str = '') -> None: async def power(self, ctx: Union[discord.Interaction, commands.Context], action: str, action_ing: str, warning: str = "") -> None:
if isinstance(ctx, discord.Interaction): if isinstance(ctx, discord.Interaction):
ctx = await self.bot.get_context(ctx) ctx = await self.bot.get_context(ctx)
@ -215,9 +221,10 @@ class Pterodactyl(commands.Cog):
await self.websocket.send(json.dumps({"event": "set state", "args": [action]})) await self.websocket.send(json.dumps({"event": "set state", "args": [action]}))
await message.edit(content=f"Server {action_ing}", view=None) await message.edit(content=f"Server {action_ing}", view=None)
return None
else: await message.edit(content="Cancelled.", view=None)
await message.edit(content="Cancelled.", view=None) return None
async def send_command(self, ctx: Union[discord.Interaction, commands.Context], command: str): async def send_command(self, ctx: Union[discord.Interaction, commands.Context], command: str):
channel = self.bot.get_channel(await config.console_channel()) channel = self.bot.get_channel(await config.console_channel())
@ -236,7 +243,7 @@ class Pterodactyl(commands.Cog):
self.task = self.get_task() self.task = self.get_task()
@commands.Cog.listener() @commands.Cog.listener()
async def on_red_api_tokens_update(self, service_name: str, api_tokens: Mapping[str,str]): # pylint: disable=unused-argument async def on_red_api_tokens_update(self, service_name: str, api_tokens: Mapping[str, str]): # pylint: disable=unused-argument
if service_name == "pterodactyl": if service_name == "pterodactyl":
logger.info("Configuration value set: api_key\nRestarting task...") logger.info("Configuration value set: api_key\nRestarting task...")
self.task.cancel() self.task.cancel()
@ -245,7 +252,7 @@ class Pterodactyl(commands.Cog):
slash_pterodactyl = app_commands.Group(name="pterodactyl", description="Pterodactyl allows you to manage your Pterodactyl Panel from Discord.") slash_pterodactyl = app_commands.Group(name="pterodactyl", description="Pterodactyl allows you to manage your Pterodactyl Panel from Discord.")
@slash_pterodactyl.command(name = "command", description = "Send a command to the server console.") @slash_pterodactyl.command(name="command", description="Send a command to the server console.")
async def slash_pterodactyl_command(self, interaction: discord.Interaction, command: str) -> None: async def slash_pterodactyl_command(self, interaction: discord.Interaction, command: str) -> None:
"""Send a command to the server console. """Send a command to the server console.
@ -255,7 +262,7 @@ class Pterodactyl(commands.Cog):
The command to send to the server.""" The command to send to the server."""
return await self.send_command(interaction, command) return await self.send_command(interaction, command)
@slash_pterodactyl.command(name = "players", description = "Retrieve a list of players on the server.") @slash_pterodactyl.command(name="players", description="Retrieve a list of players on the server.")
async def slash_pterodactyl_players(self, interaction: discord.Interaction) -> None: async def slash_pterodactyl_players(self, interaction: discord.Interaction) -> None:
"""Retrieve a list of players on the server.""" """Retrieve a list of players on the server."""
e = await self.get_player_list_embed(interaction) e = await self.get_player_list_embed(interaction)
@ -264,13 +271,8 @@ class Pterodactyl(commands.Cog):
else: else:
await interaction.response.send_message("No players online.", ephemeral=True) await interaction.response.send_message("No players online.", ephemeral=True)
@slash_pterodactyl.command(name = "power", description = "Send power actions to the server.") @slash_pterodactyl.command(name="power", description="Send power actions to the server.")
@app_commands.choices(action=[ @app_commands.choices(action=[Choice(name="Start", value="start"), Choice(name="Stop", value="stop"), Choice(name="Restart", value="restart"), Choice(name="⚠️ Kill ⚠️", value="kill")])
Choice(name="Start", value="start"),
Choice(name="Stop", value="stop"),
Choice(name="Restart", value="restart"),
Choice(name="⚠️ Kill ⚠️", value="kill")
])
async def slash_pterodactyl_power(self, interaction: discord.Interaction, action: app_commands.Choice[str]) -> None: async def slash_pterodactyl_power(self, interaction: discord.Interaction, action: app_commands.Choice[str]) -> None:
"""Send power actions to the server. """Send power actions to the server.
@ -284,11 +286,11 @@ class Pterodactyl(commands.Cog):
return await self.power(interaction, action.value, "stopping...") return await self.power(interaction, action.value, "stopping...")
return await self.power(interaction, action.value, f"{action.value}ing...") return await self.power(interaction, action.value, f"{action.value}ing...")
@commands.group(autohelp = True, name = "pterodactyl", aliases = ["ptero"]) @commands.group(autohelp=True, name="pterodactyl", aliases=["ptero"])
async def pterodactyl(self, ctx: commands.Context) -> None: async def pterodactyl(self, ctx: commands.Context) -> None:
"""Pterodactyl allows you to manage your Pterodactyl Panel from Discord.""" """Pterodactyl allows you to manage your Pterodactyl Panel from Discord."""
@pterodactyl.command(name = "players", aliases=["list", "online", "playerlist", "who"]) @pterodactyl.command(name="players", aliases=["list", "online", "playerlist", "who"])
async def pterodactyl_players(self, ctx: commands.Context) -> None: async def pterodactyl_players(self, ctx: commands.Context) -> None:
"""Retrieve a list of players on the server.""" """Retrieve a list of players on the server."""
e = await self.get_player_list_embed(ctx) e = await self.get_player_list_embed(ctx)
@ -297,43 +299,43 @@ class Pterodactyl(commands.Cog):
else: else:
await ctx.send("No players online.") await ctx.send("No players online.")
@pterodactyl.command(name = "command", aliases = ["cmd", "execute", "exec"]) @pterodactyl.command(name="command", aliases=["cmd", "execute", "exec"])
@commands.admin() @commands.admin()
async def pterodactyl_command(self, ctx: commands.Context, *, command: str) -> None: async def pterodactyl_command(self, ctx: commands.Context, *, command: str) -> None:
"""Send a command to the server console.""" """Send a command to the server console."""
return await self.send_command(ctx, command) return await self.send_command(ctx, command)
@pterodactyl.group(autohelp = True, name = "power") @pterodactyl.group(autohelp=True, name="power")
@commands.admin() @commands.admin()
async def pterodactyl_power(self, ctx: commands.Context) -> None: async def pterodactyl_power(self, ctx: commands.Context) -> None:
"""Send power actions to the server.""" """Send power actions to the server."""
@pterodactyl_power.command(name = "start") @pterodactyl_power.command(name="start")
async def pterodactyl_power_start(self, ctx: commands.Context) -> Optional[discord.Message]: async def pterodactyl_power_start(self, ctx: commands.Context) -> Optional[discord.Message]:
"""Start the server.""" """Start the server."""
return await self.power(ctx, "start", "starting...") return await self.power(ctx, "start", "starting...")
@pterodactyl_power.command(name = "stop") @pterodactyl_power.command(name="stop")
async def pterodactyl_power_stop(self, ctx: commands.Context) -> Optional[discord.Message]: async def pterodactyl_power_stop(self, ctx: commands.Context) -> Optional[discord.Message]:
"""Stop the server.""" """Stop the server."""
return await self.power(ctx, "stop", "stopping...") return await self.power(ctx, "stop", "stopping...")
@pterodactyl_power.command(name = "restart") @pterodactyl_power.command(name="restart")
async def pterodactyl_power_restart(self, ctx: commands.Context) -> Optional[discord.Message]: async def pterodactyl_power_restart(self, ctx: commands.Context) -> Optional[discord.Message]:
"""Restart the server.""" """Restart the server."""
return await self.power(ctx, "restart", "restarting...") return await self.power(ctx, "restart", "restarting...")
@pterodactyl_power.command(name = "kill") @pterodactyl_power.command(name="kill")
async def pterodactyl_power_kill(self, ctx: commands.Context) -> Optional[discord.Message]: async def pterodactyl_power_kill(self, ctx: commands.Context) -> Optional[discord.Message]:
"""Kill the server.""" """Kill the server."""
return await self.power(ctx, "kill", "stopping... (forcefully killed)", warning="**⚠️ Forcefully killing the server process can corrupt data in some cases. ⚠️**\n") return await self.power(ctx, "kill", "stopping... (forcefully killed)", warning="**⚠️ Forcefully killing the server process can corrupt data in some cases. ⚠️**\n")
@pterodactyl.group(autohelp = True, name = "config", aliases = ["settings", "set"]) @pterodactyl.group(autohelp=True, name="config", aliases=["settings", "set"])
@commands.is_owner() @commands.is_owner()
async def pterodactyl_config(self, ctx: commands.Context) -> None: async def pterodactyl_config(self, ctx: commands.Context) -> None:
"""Configure Pterodactyl settings.""" """Configure Pterodactyl settings."""
@pterodactyl_config.command(name = "url") @pterodactyl_config.command(name="url")
async def pterodactyl_config_base_url(self, ctx: commands.Context, *, base_url: str) -> None: async def pterodactyl_config_base_url(self, ctx: commands.Context, *, base_url: str) -> None:
"""Set the base URL of your Pterodactyl Panel. """Set the base URL of your Pterodactyl Panel.
@ -346,7 +348,7 @@ class Pterodactyl(commands.Cog):
self.retry_counter = 0 self.retry_counter = 0
self.task = self.get_task() self.task = self.get_task()
@pterodactyl_config.command(name = "serverid") @pterodactyl_config.command(name="serverid")
async def pterodactyl_config_server_id(self, ctx: commands.Context, *, server_id: str) -> None: async def pterodactyl_config_server_id(self, ctx: commands.Context, *, server_id: str) -> None:
"""Set the ID of your server.""" """Set the ID of your server."""
await config.server_id.set(server_id) await config.server_id.set(server_id)
@ -356,45 +358,45 @@ class Pterodactyl(commands.Cog):
self.retry_counter = 0 self.retry_counter = 0
self.task = self.get_task() self.task = self.get_task()
@pterodactyl_config.group(name = "console") @pterodactyl_config.group(name="console")
async def pterodactyl_config_console(self, ctx: commands.Context): async def pterodactyl_config_console(self, ctx: commands.Context):
"""Configure console settings.""" """Configure console settings."""
@pterodactyl_config_console.command(name = "channel") @pterodactyl_config_console.command(name="channel")
async def pterodactyl_config_console_channel(self, ctx: commands.Context, channel: discord.TextChannel) -> None: async def pterodactyl_config_console_channel(self, ctx: commands.Context, channel: discord.TextChannel) -> None:
"""Set the channel to send console output to.""" """Set the channel to send console output to."""
await config.console_channel.set(channel.id) await config.console_channel.set(channel.id)
await ctx.send(f"Console channel set to {channel.mention}") await ctx.send(f"Console channel set to {channel.mention}")
@pterodactyl_config_console.command(name = "commands") @pterodactyl_config_console.command(name="commands")
async def pterodactyl_config_console_commands(self, ctx: commands.Context, enabled: bool) -> None: async def pterodactyl_config_console_commands(self, ctx: commands.Context, enabled: bool) -> None:
"""Enable or disable console commands.""" """Enable or disable console commands."""
await config.console_commands_enabled.set(enabled) await config.console_commands_enabled.set(enabled)
await ctx.send(f"Console commands set to {enabled}") await ctx.send(f"Console commands set to {enabled}")
@pterodactyl_config.command(name = "invite") @pterodactyl_config.command(name="invite")
async def pterodactyl_config_invite(self, ctx: commands.Context, invite: str) -> None: async def pterodactyl_config_invite(self, ctx: commands.Context, invite: str) -> None:
"""Set the invite link for your server.""" """Set the invite link for your server."""
await config.invite.set(invite) await config.invite.set(invite)
await ctx.send(f"Invite link set to {invite}") await ctx.send(f"Invite link set to {invite}")
@pterodactyl_config.group(name = "topic") @pterodactyl_config.group(name="topic")
async def pterodactyl_config_topic(self, ctx: commands.Context): async def pterodactyl_config_topic(self, ctx: commands.Context):
"""Set the topic for the console and chat channels.""" """Set the topic for the console and chat channels."""
@pterodactyl_config_topic.command(name = "host", aliases = ["hostname", "ip"]) @pterodactyl_config_topic.command(name="host", aliases=["hostname", "ip"])
async def pterodactyl_config_topic_host(self, ctx: commands.Context, host: str) -> None: async def pterodactyl_config_topic_host(self, ctx: commands.Context, host: str) -> None:
"""Set the hostname or IP address of your server.""" """Set the hostname or IP address of your server."""
await config.topic_hostname.set(host) await config.topic_hostname.set(host)
await ctx.send(f"Hostname/IP set to `{host}`") await ctx.send(f"Hostname/IP set to `{host}`")
@pterodactyl_config_topic.command(name = "port") @pterodactyl_config_topic.command(name="port")
async def pterodactyl_config_topic_port(self, ctx: commands.Context, port: int) -> None: async def pterodactyl_config_topic_port(self, ctx: commands.Context, port: int) -> None:
"""Set the port of your server.""" """Set the port of your server."""
await config.topic_port.set(port) await config.topic_port.set(port)
await ctx.send(f"Port set to `{port}`") await ctx.send(f"Port set to `{port}`")
@pterodactyl_config_topic.command(name = "text") @pterodactyl_config_topic.command(name="text")
async def pterodactyl_config_topic_text(self, ctx: commands.Context, *, text: str) -> None: async def pterodactyl_config_topic_text(self, ctx: commands.Context, *, text: str) -> None:
"""Set the text for the console and chat channels. """Set the text for the console and chat channels.
@ -410,17 +412,17 @@ class Pterodactyl(commands.Cog):
await config.topic.set(text) await config.topic.set(text)
await ctx.send(f"Topic set to:\n{box(text, 'yaml')}") await ctx.send(f"Topic set to:\n{box(text, 'yaml')}")
@pterodactyl_config.group(name = "chat") @pterodactyl_config.group(name="chat")
async def pterodactyl_config_chat(self, ctx: commands.Context): async def pterodactyl_config_chat(self, ctx: commands.Context):
"""Configure chat settings.""" """Configure chat settings."""
@pterodactyl_config_chat.command(name = "channel") @pterodactyl_config_chat.command(name="channel")
async def pterodactyl_config_chat_channel(self, ctx: commands.Context, channel: discord.TextChannel) -> None: async def pterodactyl_config_chat_channel(self, ctx: commands.Context, channel: discord.TextChannel) -> None:
"""Set the channel to send chat output to.""" """Set the channel to send chat output to."""
await config.chat_channel.set(channel.id) await config.chat_channel.set(channel.id)
await ctx.send(f"Chat channel set to {channel.mention}") await ctx.send(f"Chat channel set to {channel.mention}")
@pterodactyl_config_chat.command(name = "command") @pterodactyl_config_chat.command(name="command")
async def pterodactyl_config_chat_command(self, ctx: commands.Context, *, command: str) -> None: async def pterodactyl_config_chat_command(self, ctx: commands.Context, *, command: str) -> None:
"""Set the command that will be used to send messages from Discord. """Set the command that will be used to send messages from Discord.
@ -429,11 +431,11 @@ class Pterodactyl(commands.Cog):
await config.chat_command.set(command) await config.chat_command.set(command)
await ctx.send(f"Chat command set to:\n{box(command, 'json')}") await ctx.send(f"Chat command set to:\n{box(command, 'json')}")
@pterodactyl_config.group(name = "regex") @pterodactyl_config.group(name="regex")
async def pterodactyl_config_regex(self, ctx: commands.Context) -> None: async def pterodactyl_config_regex(self, ctx: commands.Context) -> None:
"""Set regex patterns.""" """Set regex patterns."""
@pterodactyl_config_regex.command(name = "chat") @pterodactyl_config_regex.command(name="chat")
async def pterodactyl_config_regex_chat(self, ctx: commands.Context, *, regex: str) -> None: async def pterodactyl_config_regex_chat(self, ctx: commands.Context, *, regex: str) -> None:
"""Set the regex pattern to match chat messages on the server. """Set the regex pattern to match chat messages on the server.
@ -441,7 +443,7 @@ class Pterodactyl(commands.Cog):
await config.chat_regex.set(regex) await config.chat_regex.set(regex)
await ctx.send(f"Chat regex set to:\n{box(regex, 'regex')}") await ctx.send(f"Chat regex set to:\n{box(regex, 'regex')}")
@pterodactyl_config_regex.command(name = "server") @pterodactyl_config_regex.command(name="server")
async def pterodactyl_config_regex_server(self, ctx: commands.Context, *, regex: str) -> None: async def pterodactyl_config_regex_server(self, ctx: commands.Context, *, regex: str) -> None:
"""Set the regex pattern to match server messages on the server. """Set the regex pattern to match server messages on the server.
@ -449,7 +451,7 @@ class Pterodactyl(commands.Cog):
await config.server_regex.set(regex) await config.server_regex.set(regex)
await ctx.send(f"Server regex set to:\n{box(regex, 'regex')}") await ctx.send(f"Server regex set to:\n{box(regex, 'regex')}")
@pterodactyl_config_regex.command(name = "join") @pterodactyl_config_regex.command(name="join")
async def pterodactyl_config_regex_join(self, ctx: commands.Context, *, regex: str) -> None: async def pterodactyl_config_regex_join(self, ctx: commands.Context, *, regex: str) -> None:
"""Set the regex pattern to match join messages on the server. """Set the regex pattern to match join messages on the server.
@ -457,7 +459,7 @@ class Pterodactyl(commands.Cog):
await config.join_regex.set(regex) await config.join_regex.set(regex)
await ctx.send(f"Join regex set to:\n{box(regex, 'regex')}") await ctx.send(f"Join regex set to:\n{box(regex, 'regex')}")
@pterodactyl_config_regex.command(name = "leave") @pterodactyl_config_regex.command(name="leave")
async def pterodactyl_config_regex_leave(self, ctx: commands.Context, *, regex: str) -> None: async def pterodactyl_config_regex_leave(self, ctx: commands.Context, *, regex: str) -> None:
"""Set the regex pattern to match leave messages on the server. """Set the regex pattern to match leave messages on the server.
@ -465,7 +467,7 @@ class Pterodactyl(commands.Cog):
await config.leave_regex.set(regex) await config.leave_regex.set(regex)
await ctx.send(f"Leave regex set to:\n{box(regex, 'regex')}") await ctx.send(f"Leave regex set to:\n{box(regex, 'regex')}")
@pterodactyl_config_regex.command(name = "achievement") @pterodactyl_config_regex.command(name="achievement")
async def pterodactyl_config_regex_achievement(self, ctx: commands.Context, *, regex: str) -> None: async def pterodactyl_config_regex_achievement(self, ctx: commands.Context, *, regex: str) -> None:
"""Set the regex pattern to match achievement messages on the server. """Set the regex pattern to match achievement messages on the server.
@ -473,41 +475,41 @@ class Pterodactyl(commands.Cog):
await config.achievement_regex.set(regex) await config.achievement_regex.set(regex)
await ctx.send(f"Achievement regex set to:\n{box(regex, 'regex')}") await ctx.send(f"Achievement regex set to:\n{box(regex, 'regex')}")
@pterodactyl_config.group(name = "messages", aliases = ['msg', 'msgs', 'message']) @pterodactyl_config.group(name="messages", aliases=["msg", "msgs", "message"])
async def pterodactyl_config_messages(self, ctx: commands.Context): async def pterodactyl_config_messages(self, ctx: commands.Context):
"""Configure message settings.""" """Configure message settings."""
@pterodactyl_config_messages.command(name = "startup") @pterodactyl_config_messages.command(name="startup")
async def pterodactyl_config_messages_startup(self, ctx: commands.Context, *, message: str) -> None: async def pterodactyl_config_messages_startup(self, ctx: commands.Context, *, message: str) -> None:
"""Set the message that will be sent when the server starts.""" """Set the message that will be sent when the server starts."""
await config.startup_msg.set(message) await config.startup_msg.set(message)
await ctx.send(f"Startup message set to: {message}") await ctx.send(f"Startup message set to: {message}")
@pterodactyl_config_messages.command(name = "shutdown") @pterodactyl_config_messages.command(name="shutdown")
async def pterodactyl_config_messages_shutdown(self, ctx: commands.Context, *, message: str) -> None: async def pterodactyl_config_messages_shutdown(self, ctx: commands.Context, *, message: str) -> None:
"""Set the message that will be sent when the server stops.""" """Set the message that will be sent when the server stops."""
await config.shutdown_msg.set(message) await config.shutdown_msg.set(message)
await ctx.send(f"Shutdown message set to: {message}") await ctx.send(f"Shutdown message set to: {message}")
@pterodactyl_config_messages.command(name = "join") @pterodactyl_config_messages.command(name="join")
async def pterodactyl_config_messages_join(self, ctx: commands.Context, *, message: str) -> None: async def pterodactyl_config_messages_join(self, ctx: commands.Context, *, message: str) -> None:
"""Set the message that will be sent when a user joins the server. This is only shown in embeds.""" """Set the message that will be sent when a user joins the server. This is only shown in embeds."""
await config.join_msg.set(message) await config.join_msg.set(message)
await ctx.send(f"Join message set to: {message}") await ctx.send(f"Join message set to: {message}")
@pterodactyl_config_messages.command(name = "leave") @pterodactyl_config_messages.command(name="leave")
async def pterodactyl_config_messages_leave(self, ctx: commands.Context, *, message: str) -> None: async def pterodactyl_config_messages_leave(self, ctx: commands.Context, *, message: str) -> None:
"""Set the message that will be sent when a user leaves the server. This is only shown in embeds.""" """Set the message that will be sent when a user leaves the server. This is only shown in embeds."""
await config.leave_msg.set(message) await config.leave_msg.set(message)
await ctx.send(f"Leave message set to: {message}") await ctx.send(f"Leave message set to: {message}")
@pterodactyl_config.command(name = "ip") @pterodactyl_config.command(name="ip")
async def pterodactyl_config_mask_ip(self, ctx: commands.Context, mask: bool) -> None: async def pterodactyl_config_mask_ip(self, ctx: commands.Context, mask: bool) -> None:
"""Mask the IP addresses of users in console messages.""" """Mask the IP addresses of users in console messages."""
await config.mask_ip.set(mask) await config.mask_ip.set(mask)
await ctx.send(f"IP masking set to {mask}") await ctx.send(f"IP masking set to {mask}")
@pterodactyl_config.command(name = "api") @pterodactyl_config.command(name="api")
async def pterodactyl_config_api(self, ctx: commands.Context, endpoint: str) -> None: async def pterodactyl_config_api(self, ctx: commands.Context, endpoint: str) -> None:
"""Set the API endpoint for retrieving user avatars. """Set the API endpoint for retrieving user avatars.
@ -516,11 +518,14 @@ class Pterodactyl(commands.Cog):
await config.api_endpoint.set(endpoint) await config.api_endpoint.set(endpoint)
await ctx.send(f"API endpoint set to {endpoint}") await ctx.send(f"API endpoint set to {endpoint}")
@pterodactyl_config_regex.group(name = "blacklist", aliases = ['block', 'blocklist'],) @pterodactyl_config_regex.group(
name="blacklist",
aliases=["block", "blocklist"],
)
async def pterodactyl_config_regex_blacklist(self, ctx: commands.Context): async def pterodactyl_config_regex_blacklist(self, ctx: commands.Context):
"""Blacklist regex patterns.""" """Blacklist regex patterns."""
@pterodactyl_config_regex_blacklist.command(name = "add") @pterodactyl_config_regex_blacklist.command(name="add")
async def pterodactyl_config_regex_blacklist_add(self, ctx: commands.Context, name: str, *, regex: str) -> None: async def pterodactyl_config_regex_blacklist_add(self, ctx: commands.Context, name: str, *, regex: str) -> None:
"""Add a regex pattern to the blacklist.""" """Add a regex pattern to the blacklist."""
async with config.regex_blacklist() as blacklist: async with config.regex_blacklist() as blacklist:
@ -538,7 +543,7 @@ class Pterodactyl(commands.Cog):
else: else:
await msg.edit(content="Cancelled.") await msg.edit(content="Cancelled.")
@pterodactyl_config_regex_blacklist.command(name = "remove") @pterodactyl_config_regex_blacklist.command(name="remove")
async def pterodactyl_config_regex_blacklist_remove(self, ctx: commands.Context, name: str) -> None: async def pterodactyl_config_regex_blacklist_remove(self, ctx: commands.Context, name: str) -> None:
"""Remove a regex pattern from the blacklist.""" """Remove a regex pattern from the blacklist."""
async with config.regex_blacklist() as blacklist: async with config.regex_blacklist() as blacklist:
@ -555,7 +560,7 @@ class Pterodactyl(commands.Cog):
else: else:
await ctx.send(f"Name `{name}` does not exist in the blacklist.") await ctx.send(f"Name `{name}` does not exist in the blacklist.")
@pterodactyl_config.command(name = 'view', aliases = ['show']) @pterodactyl_config.command(name="view", aliases=["show"])
async def pterodactyl_config_view(self, ctx: commands.Context) -> None: async def pterodactyl_config_view(self, ctx: commands.Context) -> None:
"""View the current configuration.""" """View the current configuration."""
base_url = await config.base_url() base_url = await config.base_url()
@ -580,7 +585,7 @@ class Pterodactyl(commands.Cog):
topic_text = await config.topic() topic_text = await config.topic()
topic_hostname = await config.topic_hostname() topic_hostname = await config.topic_hostname()
topic_port = await config.topic_port() topic_port = await config.topic_port()
embed = discord.Embed(color = await ctx.embed_color(), title="Pterodactyl Configuration") embed = discord.Embed(color=await ctx.embed_color(), title="Pterodactyl Configuration")
embed.description = f"""**Base URL:** {base_url} embed.description = f"""**Base URL:** {base_url}
**Server ID:** `{server_id}` **Server ID:** `{server_id}`
**Console Channel:** <#{console_channel}> **Console Channel:** <#{console_channel}>
@ -596,19 +601,19 @@ class Pterodactyl(commands.Cog):
**Topic Hostname:** `{topic_hostname}` **Topic Hostname:** `{topic_hostname}`
**Topic Port:** `{topic_port}` **Topic Port:** `{topic_port}`
**Topic Text:** {box(topic_text, 'yaml')} **Topic Text:** {box(topic_text, "yaml")}
**Chat Command:** {box(chat_command, 'json')} **Chat Command:** {box(chat_command, "json")}
**Chat Regex:** {box(chat_regex, 're')} **Chat Regex:** {box(chat_regex, "re")}
**Server Regex:** {box(server_regex, 're')} **Server Regex:** {box(server_regex, "re")}
**Join Regex:** {box(join_regex, 're')} **Join Regex:** {box(join_regex, "re")}
**Leave Regex:** {box(leave_regex, 're')} **Leave Regex:** {box(leave_regex, "re")}
**Achievement Regex:** {box(achievement_regex, 're')}""" **Achievement Regex:** {box(achievement_regex, "re")}"""
await ctx.send(embed=embed) await ctx.send(embed=embed)
if not len(regex_blacklist) == 0: if not len(regex_blacklist) == 0:
regex_blacklist_embed = discord.Embed(color = await ctx.embed_color(), title="Regex Blacklist") regex_blacklist_embed = discord.Embed(color=await ctx.embed_color(), title="Regex Blacklist")
for name, regex in regex_blacklist.items(): for name, regex in regex_blacklist.items():
regex_blacklist_embed.add_field(name=name, value=box(regex, 're'), inline=False) regex_blacklist_embed.add_field(name=name, value=box(regex, "re"), inline=False)
await ctx.send(embed=regex_blacklist_embed) await ctx.send(embed=regex_blacklist_embed)
def get_bool_str(self, inp: bool) -> str: def get_bool_str(self, inp: bool) -> str:

View file

@ -151,7 +151,7 @@ async def retrieve_websocket_credentials(coginstance: Pterodactyl) -> Optional[d
Socket: %s Socket: %s
Token: %s...""", Token: %s...""",
websocket_credentials['data']['socket'], websocket_credentials['data']['socket'],
websocket_credentials['data']['token'][:20] websocket_credentials['data']['token'][:20],
) )
return websocket_credentials return websocket_credentials
#NOTE - The token is truncated to prevent it from being logged in its entirety, for security reasons #NOTE - The token is truncated to prevent it from being logged in its entirety, for security reasons
@ -262,6 +262,5 @@ async def generate_achievement_embed(coginstance: Pterodactyl, username: str, ac
def mask_ip(string: str) -> str: def mask_ip(string: str) -> str:
def check(match: re.Match[str]): def check(match: re.Match[str]):
ip = match.group(0) ip = match.group(0)
masked_ip = '.'.join(r'\*' * len(octet) for octet in ip.split('.')) return '.'.join(r'\*' * len(octet) for octet in ip.split('.'))
return masked_ip
return re.sub(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', check, string) return re.sub(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', check, string)

View file

@ -2,8 +2,8 @@
name = "seacogs" name = "seacogs"
version = "0.1.0" version = "0.1.0"
description = "My assorted cogs for Red-DiscordBot." description = "My assorted cogs for Red-DiscordBot."
authors = [{name = "cswimr", email = "seaswimmerthefsh@gmail.com"}] authors = [{ name = "cswimr", email = "seaswimmerthefsh@gmail.com" }]
license = {file="LICENSE"} license = { file = "LICENSE" }
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"
dependencies = [ dependencies = [
@ -33,11 +33,7 @@ documentation = [
] ]
[tool.uv] [tool.uv]
dev-dependencies = [ dev-dependencies = ["pylint>=3.3.1", "ruff>=0.6.9", "sqlite-web>=0.6.4"]
"pylint>=3.3.1",
"ruff>=0.6.9",
"sqlite-web>=0.6.4",
]
[tool.uv.sources] [tool.uv.sources]
py-dactyl = { git = "https://github.com/cswimr/pydactyl" } py-dactyl = { git = "https://github.com/cswimr/pydactyl" }
@ -84,8 +80,32 @@ target-version = "py311"
# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default. # Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default.
# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or # Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or
# McCabe complexity (`C901`) by default. # McCabe complexity (`C901`) by default.
select = ["F", "W", "E", "C901"] select = [
ignore = ["C901"] "I",
"N",
"F",
"W",
"E",
"G",
"A",
"COM",
"INP",
"T20",
"PLC",
"PLE",
"PLW",
"PLR",
"LOG",
"SLF",
"ERA",
"FIX",
"PERF",
"C4",
"EM",
"RET",
"RSE",
]
ignore = ["PLR0912", "PLR0915", "PLR2004", "PLR0913", "EM101"]
# Allow fix for all enabled rules (when `--fix`) is provided. # Allow fix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"] fixable = ["ALL"]

View file

@ -29,12 +29,14 @@ from redbot.core.utils.views import SimpleMenu
def md(soup: BeautifulSoup, **options) -> Any | str: def md(soup: BeautifulSoup, **options) -> Any | str:
return MarkdownConverter(**options).convert_soup(soup=soup) return MarkdownConverter(**options).convert_soup(soup=soup)
def format_rfc_text(text: str, number: int) -> str: def format_rfc_text(text: str, number: int) -> str:
one: str = re.sub(r"\(\.\/rfc(\d+)", r"(https://www.rfc-editor.org/rfc/rfc\1.html", text) one: str = re.sub(r"\(\.\/rfc(\d+)", r"(https://www.rfc-editor.org/rfc/rfc\1.html", text)
two: str = re.sub(r"\((#(?:section|page)-\d+(?:.\d+)?)\)", f"(https://www.rfc-editor.org/rfc/rfc{number}.html\1)", one) two: str = re.sub(r"\((#(?:section|page)-\d+(?:.\d+)?)\)", f"(https://www.rfc-editor.org/rfc/rfc{number}.html\1)", one)
three: str = re.sub(r"\n{3,}", "\n\n", two) three: str = re.sub(r"\n{3,}", "\n\n", two)
return three return three
class SeaUtils(commands.Cog): class SeaUtils(commands.Cog):
"""A collection of random utilities.""" """A collection of random utilities."""
@ -57,7 +59,6 @@ class SeaUtils(commands.Cog):
] ]
return "\n".join(text) return "\n".join(text)
def format_src(self, obj: Any) -> str: def format_src(self, obj: Any) -> str:
"""A large portion of this code is repurposed from Zephyrkul's RTFS cog. """A large portion of this code is repurposed from Zephyrkul's RTFS cog.
https://github.com/Zephyrkul/FluffyCogs/blob/master/rtfs/rtfs.py""" https://github.com/Zephyrkul/FluffyCogs/blob/master/rtfs/rtfs.py"""
@ -75,7 +76,7 @@ class SeaUtils(commands.Cog):
@commands.command(aliases=["source", "src", "code", "showsource"]) @commands.command(aliases=["source", "src", "code", "showsource"])
@commands.is_owner() @commands.is_owner()
async def showcode(self, ctx: commands.Context, *, object: str) -> None: # pylint: disable=redefined-builtin async def showcode(self, ctx: commands.Context, *, object: str) -> None: # pylint: disable=redefined-builtin # noqa: A002
"""Show the code for a particular object.""" """Show the code for a particular object."""
try: try:
if object.startswith("/") and (obj := ctx.bot.tree.get_command(object[1:])): if object.startswith("/") and (obj := ctx.bot.tree.get_command(object[1:])):
@ -86,11 +87,7 @@ class SeaUtils(commands.Cog):
text = self.format_src(obj) text = self.format_src(obj)
else: else:
raise AttributeError raise AttributeError
temp_content = cf.pagify( temp_content = cf.pagify(text=cleanup_code(text), escape_mass_mentions=True, page_length=1977)
text=cleanup_code(text),
escape_mass_mentions=True,
page_length = 1977
)
content = [] content = []
max_i = operator.length_hint(temp_content) max_i = operator.length_hint(temp_content)
i = 1 i = 1
@ -105,7 +102,7 @@ class SeaUtils(commands.Cog):
else: else:
await ctx.send(content="Object not found!", reference=ctx.message.to_reference(fail_if_not_exists=False)) await ctx.send(content="Object not found!", reference=ctx.message.to_reference(fail_if_not_exists=False))
@commands.command(name='dig', aliases=['dnslookup', 'nslookup']) @commands.command(name="dig", aliases=["dnslookup", "nslookup"])
@commands.is_owner() @commands.is_owner()
async def dig(self, ctx: commands.Context, name: str, record_type: str | None = None, server: str | None = None, port: int = 53) -> None: async def dig(self, ctx: commands.Context, name: str, record_type: str | None = None, server: str | None = None, port: int = 53) -> None:
"""Retrieve DNS information for a domain. """Retrieve DNS information for a domain.
@ -113,13 +110,13 @@ class SeaUtils(commands.Cog):
Uses `dig` to perform a DNS query. Will fall back to `nslookup` if `dig` is not installed on the system. Uses `dig` to perform a DNS query. Will fall back to `nslookup` if `dig` is not installed on the system.
`nslookup` does not provide as much information as `dig`, so only the `name` parameter will be used if `nslookup` is used. `nslookup` does not provide as much information as `dig`, so only the `name` parameter will be used if `nslookup` is used.
Will return the A, AAAA, and CNAME records for a domain by default. You can specify a different record type with the `type` parameter.""" Will return the A, AAAA, and CNAME records for a domain by default. You can specify a different record type with the `type` parameter."""
command_opts: list[str | int] = ['dig'] command_opts: list[str | int] = ["dig"]
query_types: list[str] = [record_type] if record_type else ['A', 'AAAA', 'CNAME'] query_types: list[str] = [record_type] if record_type else ["A", "AAAA", "CNAME"]
if server: if server:
command_opts.extend(['@', server]) command_opts.extend(["@", server])
for query_type in query_types: for query_type in query_types:
command_opts.extend([name, query_type]) command_opts.extend([name, query_type])
command_opts.extend(['-p', str(port), '+yaml']) command_opts.extend(["-p", str(port), "+yaml"])
try: try:
process: Process = await asyncio.create_subprocess_exec(*command_opts, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) process: Process = await asyncio.create_subprocess_exec(*command_opts, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
@ -128,22 +125,18 @@ class SeaUtils(commands.Cog):
await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=stderr.decode())) await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=stderr.decode()))
else: else:
data = yaml.safe_load(stdout.decode()) data = yaml.safe_load(stdout.decode())
message_data: dict = data[0]['message'] message_data: dict = data[0]["message"]
response_data: dict = message_data['response_message_data'] response_data: dict = message_data["response_message_data"]
if ctx.embed_requested(): if ctx.embed_requested():
embed = Embed( embed = Embed(title="DNS Query Result", color=await ctx.embed_color(), timestamp=message_data["response_time"])
title="DNS Query Result", embed.add_field(name="Response Address", value=message_data["response_address"], inline=True)
color=await ctx.embed_color(), embed.add_field(name="Response Port", value=message_data["response_port"], inline=True)
timestamp=message_data['response_time'] embed.add_field(name="Query Address", value=message_data["query_address"], inline=True)
) embed.add_field(name="Query Port", value=message_data["query_port"], inline=True)
embed.add_field(name="Response Address", value=message_data['response_address'], inline=True) embed.add_field(name="Status", value=response_data["status"], inline=True)
embed.add_field(name="Response Port", value=message_data['response_port'], inline=True) embed.add_field(name="Flags", value=response_data["flags"], inline=True)
embed.add_field(name="Query Address", value=message_data['query_address'], inline=True)
embed.add_field(name="Query Port", value=message_data['query_port'], inline=True)
embed.add_field(name="Status", value=response_data['status'], inline=True)
embed.add_field(name="Flags", value=response_data['flags'], inline=True)
if response_data.get('status') != 'NOERROR': if response_data.get("status") != "NOERROR":
embed.colour = Color.red() embed.colour = Color.red()
embed.description = cf.error("Dig query did not return `NOERROR` status.") embed.description = cf.error("Dig query did not return `NOERROR` status.")
@ -151,19 +144,19 @@ class SeaUtils(commands.Cog):
answers = [] answers = []
authorities = [] authorities = []
for m in data: for m in data:
response = m['message']['response_message_data'] response = m["message"]["response_message_data"]
if 'QUESTION_SECTION' in response: if "QUESTION_SECTION" in response:
for question in response['QUESTION_SECTION']: for question in response["QUESTION_SECTION"]:
if question not in questions: if question not in questions:
questions.append(question) questions.append(question)
if 'ANSWER_SECTION' in response: if "ANSWER_SECTION" in response:
for answer in response['ANSWER_SECTION']: for answer in response["ANSWER_SECTION"]:
if answer not in answers: if answer not in answers:
answers.append(answer) answers.append(answer)
if 'AUTHORITY_SECTION' in response: if "AUTHORITY_SECTION" in response:
for authority in response['AUTHORITY_SECTION']: for authority in response["AUTHORITY_SECTION"]:
if authority not in authorities: if authority not in authorities:
authorities.append(authority) authorities.append(authority)
@ -183,26 +176,22 @@ class SeaUtils(commands.Cog):
embed.add_field(name="Authority Section", value=f"{cf.box(text=authority_section, lang='prolog')}", inline=False) embed.add_field(name="Authority Section", value=f"{cf.box(text=authority_section, lang='prolog')}", inline=False)
await ctx.send(embed=embed) await ctx.send(embed=embed)
else: else:
await ctx.send(content=cf.box(text=stdout, lang='yaml')) await ctx.send(content=cf.box(text=stdout, lang="yaml"))
except (FileNotFoundError): except FileNotFoundError:
try: try:
ns_process = await asyncio.create_subprocess_exec('nslookup', name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) ns_process = await asyncio.create_subprocess_exec("nslookup", name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
ns_stdout, ns_stderr = await ns_process.communicate() ns_stdout, ns_stderr = await ns_process.communicate()
if ns_stderr: if ns_stderr:
await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=ns_stderr.decode())) await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=ns_stderr.decode()))
else: else:
warning = cf.warning("`dig` is not installed! Defaulting to `nslookup`.\nThis command provides more information when `dig` is installed on the system.\n") warning = cf.warning("`dig` is not installed! Defaulting to `nslookup`.\nThis command provides more information when `dig` is installed on the system.\n")
if await ctx.embed_requested(): if await ctx.embed_requested():
embed = Embed( embed = Embed(title="DNS Query Result", color=await ctx.embed_color(), timestamp=ctx.message.created_at)
title="DNS Query Result",
color=await ctx.embed_color(),
timestamp=ctx.message.created_at
)
embed.description = warning + cf.box(text=ns_stdout.decode()) embed.description = warning + cf.box(text=ns_stdout.decode())
await ctx.send(embed=embed) await ctx.send(embed=embed)
else: else:
await ctx.send(content = warning + cf.box(text=ns_stdout.decode())) await ctx.send(content=warning + cf.box(text=ns_stdout.decode()))
except (FileNotFoundError): except FileNotFoundError:
await ctx.maybe_send_embed(message=cf.error("Neither `dig` nor `nslookup` are installed on the system. Unable to resolve DNS query.")) await ctx.maybe_send_embed(message=cf.error("Neither `dig` nor `nslookup` are installed on the system. Unable to resolve DNS query."))
@commands.command() @commands.command()
@ -210,15 +199,15 @@ class SeaUtils(commands.Cog):
"""Retrieve the text of an RFC document. """Retrieve the text of an RFC document.
This command uses the [RFC Editor website](https://www.rfc-editor.org/) to fetch the text of an RFC document. This command uses the [RFC Editor website](https://www.rfc-editor.org/) to fetch the text of an RFC document.
A [Request for Comments (RFC)](https://en.wikipedia.org/wiki/Request_for_Comments) is a publication in a series from the principal technical development and standards-setting bodies for the [Internet](https://en.wikipedia.org/wiki/Internet), most prominently the [Internet Engineering Task Force](https://en.wikipedia.org/wiki/Internet_Engineering_Task_Force). An RFC is authored by individuals or groups of engineers and [computer scientists](https://en.wikipedia.org/wiki/Computer_scientist) in the form of a [memorandum](https://en.wikipedia.org/wiki/Memorandum) describing methods, behaviors, research, or innovations applicable to the working of the Internet and Internet-connected systems. It is submitted either for [peer review](https://en.wikipedia.org/wiki/Peer_review) or to convey new concepts, information, or, occasionally, engineering humor.""" # noqa: E501 A [Request for Comments (RFC)](https://en.wikipedia.org/wiki/Request_for_Comments) is a publication in a series from the principal technical development and standards-setting bodies for the [Internet](https://en.wikipedia.org/wiki/Internet), most prominently the [Internet Engineering Task Force](https://en.wikipedia.org/wiki/Internet_Engineering_Task_Force). An RFC is authored by individuals or groups of engineers and [computer scientists](https://en.wikipedia.org/wiki/Computer_scientist) in the form of a [memorandum](https://en.wikipedia.org/wiki/Memorandum) describing methods, behaviors, research, or innovations applicable to the working of the Internet and Internet-connected systems. It is submitted either for [peer review](https://en.wikipedia.org/wiki/Peer_review) or to convey new concepts, information, or, occasionally, engineering humor.""" # noqa: E501
url = f"https://www.rfc-editor.org/rfc/rfc{number}.html" url = f"https://www.rfc-editor.org/rfc/rfc{number}.html"
datatracker_url = f"https://datatracker.ietf.org/doc/rfc{number}" datatracker_url = f"https://datatracker.ietf.org/doc/rfc{number}"
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get(url=url) as response: async with session.get(url=url) as response:
if response.status == 200: if response.status == 200:
html = await response.text() html = await response.text()
soup = BeautifulSoup(html, 'html.parser') soup = BeautifulSoup(html, "html.parser")
pre_tags = soup.find_all('pre') pre_tags = soup.find_all("pre")
content: list[Embed | str] = [] content: list[Embed | str] = []
for pre_tag in pre_tags: for pre_tag in pre_tags:
text = format_rfc_text(md(pre_tag), number) text = format_rfc_text(md(pre_tag), number)
@ -226,26 +215,15 @@ class SeaUtils(commands.Cog):
pagified_text = cf.pagify(text, delims=["\n\n"], page_length=4096) pagified_text = cf.pagify(text, delims=["\n\n"], page_length=4096)
for page in pagified_text: for page in pagified_text:
if await ctx.embed_requested(): if await ctx.embed_requested():
embed = Embed( embed = Embed(title=f"RFC Document {number}", url=datatracker_url, description=page, color=await ctx.embed_color())
title=f"RFC Document {number}",
url=datatracker_url,
description=page,
color=await ctx.embed_color()
)
content.append(embed) content.append(embed)
else: else:
content.append(page) content.append(page)
elif await ctx.embed_requested():
embed = Embed(title=f"RFC Document {number}", url=datatracker_url, description=text, color=await ctx.embed_color())
content.append(embed)
else: else:
if await ctx.embed_requested(): content.append(text)
embed = Embed(
title=f"RFC Document {number}",
url=datatracker_url,
description=text,
color=await ctx.embed_color()
)
content.append(embed)
else:
content.append(text)
if await ctx.embed_requested(): if await ctx.embed_requested():
for embed in content: for embed in content:
embed.set_footer(text=f"Page {content.index(embed) + 1}/{len(content)}") embed.set_footer(text=f"Page {content.index(embed) + 1}/{len(content)}")

737
uv.lock generated

File diff suppressed because it is too large Load diff