Compare commits
4 commits
e5210420cb
...
3d882625d2
Author | SHA1 | Date | |
---|---|---|---|
3d882625d2 | |||
6c25d61dfd | |||
6ffa81fdee | |||
06e678f26f |
5 changed files with 65 additions and 87 deletions
|
@ -91,11 +91,11 @@ class Bible(commands.Cog):
|
|||
response.status,
|
||||
)
|
||||
if response.status == 401:
|
||||
raise bible.errors.Unauthorized()
|
||||
raise bible.errors.UnauthorizedError()
|
||||
if response.status == 403:
|
||||
raise bible.errors.BibleAccessError()
|
||||
if response.status == 503:
|
||||
raise bible.errors.ServiceUnavailable()
|
||||
raise bible.errors.ServiceUnavailableError()
|
||||
return Version(
|
||||
bible_id,
|
||||
data["data"]["abbreviation"],
|
||||
|
@ -137,13 +137,13 @@ class Bible(commands.Cog):
|
|||
if response.status == 400:
|
||||
raise bible.errors.InexplicableError()
|
||||
if response.status == 401:
|
||||
raise bible.errors.Unauthorized()
|
||||
raise bible.errors.UnauthorizedError()
|
||||
if response.status == 403:
|
||||
raise bible.errors.BibleAccessError()
|
||||
if response.status == 404:
|
||||
raise bible.errors.NotFound()
|
||||
raise bible.errors.NotFoundError()
|
||||
if response.status == 503:
|
||||
raise bible.errors.ServiceUnavailable()
|
||||
raise bible.errors.ServiceUnavailableError()
|
||||
|
||||
fums_url = "https://fums.api.bible/f3"
|
||||
fums_params = {
|
||||
|
@ -176,11 +176,11 @@ class Bible(commands.Cog):
|
|||
response.status,
|
||||
)
|
||||
if response.status == 401:
|
||||
raise bible.errors.Unauthorized()
|
||||
raise bible.errors.UnauthorizedError()
|
||||
if response.status == 403:
|
||||
raise bible.errors.BibleAccessError()
|
||||
if response.status == 503:
|
||||
raise bible.errors.ServiceUnavailable()
|
||||
raise bible.errors.ServiceUnavailableError()
|
||||
return data["data"]
|
||||
|
||||
async def _get_chapters(self, bible_id: str, book_id: str) -> dict:
|
||||
|
@ -195,11 +195,11 @@ class Bible(commands.Cog):
|
|||
response.status,
|
||||
)
|
||||
if response.status == 401:
|
||||
raise bible.errors.Unauthorized()
|
||||
raise bible.errors.UnauthorizedError()
|
||||
if response.status == 403:
|
||||
raise bible.errors.BibleAccessError()
|
||||
if response.status == 503:
|
||||
raise bible.errors.ServiceUnavailable()
|
||||
raise bible.errors.ServiceUnavailableError()
|
||||
return data["data"]
|
||||
|
||||
async def _get_verses(self, bible_id: str, book_id: str, chapter: int) -> dict:
|
||||
|
@ -214,11 +214,11 @@ class Bible(commands.Cog):
|
|||
response.status,
|
||||
)
|
||||
if response.status == 401:
|
||||
raise bible.errors.Unauthorized()
|
||||
raise bible.errors.UnauthorizedError()
|
||||
if response.status == 403:
|
||||
raise bible.errors.BibleAccessError()
|
||||
if response.status == 503:
|
||||
raise bible.errors.ServiceUnavailable()
|
||||
raise bible.errors.ServiceUnavailableError()
|
||||
return data["data"]
|
||||
|
||||
@commands.group(autohelp=True)
|
||||
|
@ -251,10 +251,10 @@ class Bible(commands.Cog):
|
|||
passage = await self._get_passage(ctx, bible_id, f"{book_id}.{passage.replace(':', '.')}", False)
|
||||
except (
|
||||
bible.errors.BibleAccessError,
|
||||
bible.errors.NotFound,
|
||||
bible.errors.NotFoundError,
|
||||
bible.errors.InexplicableError,
|
||||
bible.errors.ServiceUnavailable,
|
||||
bible.errors.Unauthorized,
|
||||
bible.errors.ServiceUnavailableError,
|
||||
bible.errors.UnauthorizedError,
|
||||
) as e:
|
||||
await ctx.send(e.message)
|
||||
return
|
||||
|
@ -294,10 +294,10 @@ class Bible(commands.Cog):
|
|||
passage = await self._get_passage(ctx, bible_id, verse, False)
|
||||
except (
|
||||
bible.errors.BibleAccessError,
|
||||
bible.errors.NotFound,
|
||||
bible.errors.NotFoundError,
|
||||
bible.errors.InexplicableError,
|
||||
bible.errors.ServiceUnavailable,
|
||||
bible.errors.Unauthorized,
|
||||
bible.errors.ServiceUnavailableError,
|
||||
bible.errors.UnauthorizedError,
|
||||
) as e:
|
||||
await ctx.send(e.message)
|
||||
return
|
||||
|
|
|
@ -10,7 +10,7 @@ class BibleAccessError(Exception):
|
|||
self.message = message
|
||||
|
||||
|
||||
class Unauthorized(Exception):
|
||||
class UnauthorizedError(Exception):
|
||||
def __init__(
|
||||
self,
|
||||
message: str = error("The API key for API.Bible is missing or invalid. Please report this to the bot owner.\nIf you are the bot owner, please check the documentation [here](<https://seacogs.coastalcommits.com/bible/#setup>)."),
|
||||
|
@ -19,7 +19,7 @@ class Unauthorized(Exception):
|
|||
self.message = message
|
||||
|
||||
|
||||
class NotFound(Exception):
|
||||
class NotFoundError(Exception):
|
||||
def __init__(
|
||||
self,
|
||||
message: str = error("The requested passage was not found."),
|
||||
|
@ -28,7 +28,7 @@ class NotFound(Exception):
|
|||
self.message = message
|
||||
|
||||
|
||||
class ServiceUnavailable(Exception):
|
||||
class ServiceUnavailableError(Exception):
|
||||
def __init__(
|
||||
self,
|
||||
message: str = error("The API.Bible service is currently unavailable."),
|
||||
|
|
|
@ -97,13 +97,13 @@ class HotReloadHandler(RegexMatchingEventHandler):
|
|||
else:
|
||||
dest = ""
|
||||
|
||||
self.logger.info(f"File {event.src_path} has been {event.event_type}{dest}.")
|
||||
self.logger.info("File %s has been %s%s.", event.src_path, event.event_type, dest)
|
||||
|
||||
run_coroutine_threadsafe(self.reload_cogs(cogs_to_reload), loop=self.bot.loop)
|
||||
|
||||
async def reload_cogs(self, cog_names: Sequence[str]) -> None:
|
||||
"""Reload modified cog."""
|
||||
core_logic = CoreLogic(bot=self.bot)
|
||||
self.logger.info(f"Reloading cogs: {humanize_list(cog_names, style='unit')}")
|
||||
self.logger.info("Reloading cogs: %s", humanize_list(cog_names, style="unit"))
|
||||
await core_logic._reload(pkg_names=cog_names)
|
||||
self.logger.info(f"Reloaded cogs: {humanize_list(cog_names, style='unit')}")
|
||||
self.logger.info("Reloaded cogs: %s", humanize_list(cog_names, style="unit"))
|
||||
|
|
|
@ -84,8 +84,8 @@ target-version = "py311"
|
|||
# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default.
|
||||
# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or
|
||||
# McCabe complexity (`C901`) by default.
|
||||
select = ["F", "W", "E", "C901"]
|
||||
ignore = ["C901"]
|
||||
select = ["I", "N", "F", "W", "E", "G", "INP", "T20", "PLC", "PLE", "PLW", "PLR"]
|
||||
ignore = ["PLR0912", "PLR0915", "PLR2004"]
|
||||
|
||||
# Allow fix for all enabled rules (when `--fix`) is provided.
|
||||
fixable = ["ALL"]
|
||||
|
|
|
@ -29,12 +29,14 @@ from redbot.core.utils.views import SimpleMenu
|
|||
def md(soup: BeautifulSoup, **options) -> Any | str:
|
||||
return MarkdownConverter(**options).convert_soup(soup=soup)
|
||||
|
||||
|
||||
def format_rfc_text(text: str, number: int) -> str:
|
||||
one: str = re.sub(r"\(\.\/rfc(\d+)", r"(https://www.rfc-editor.org/rfc/rfc\1.html", text)
|
||||
two: str = re.sub(r"\((#(?:section|page)-\d+(?:.\d+)?)\)", f"(https://www.rfc-editor.org/rfc/rfc{number}.html\1)", one)
|
||||
three: str = re.sub(r"\n{3,}", "\n\n", two)
|
||||
return three
|
||||
|
||||
|
||||
class SeaUtils(commands.Cog):
|
||||
"""A collection of random utilities."""
|
||||
|
||||
|
@ -57,7 +59,6 @@ class SeaUtils(commands.Cog):
|
|||
]
|
||||
return "\n".join(text)
|
||||
|
||||
|
||||
def format_src(self, obj: Any) -> str:
|
||||
"""A large portion of this code is repurposed from Zephyrkul's RTFS cog.
|
||||
https://github.com/Zephyrkul/FluffyCogs/blob/master/rtfs/rtfs.py"""
|
||||
|
@ -75,7 +76,7 @@ class SeaUtils(commands.Cog):
|
|||
|
||||
@commands.command(aliases=["source", "src", "code", "showsource"])
|
||||
@commands.is_owner()
|
||||
async def showcode(self, ctx: commands.Context, *, object: str) -> None: # pylint: disable=redefined-builtin
|
||||
async def showcode(self, ctx: commands.Context, *, object: str) -> None: # pylint: disable=redefined-builtin
|
||||
"""Show the code for a particular object."""
|
||||
try:
|
||||
if object.startswith("/") and (obj := ctx.bot.tree.get_command(object[1:])):
|
||||
|
@ -86,11 +87,7 @@ class SeaUtils(commands.Cog):
|
|||
text = self.format_src(obj)
|
||||
else:
|
||||
raise AttributeError
|
||||
temp_content = cf.pagify(
|
||||
text=cleanup_code(text),
|
||||
escape_mass_mentions=True,
|
||||
page_length = 1977
|
||||
)
|
||||
temp_content = cf.pagify(text=cleanup_code(text), escape_mass_mentions=True, page_length=1977)
|
||||
content = []
|
||||
max_i = operator.length_hint(temp_content)
|
||||
i = 1
|
||||
|
@ -105,7 +102,7 @@ class SeaUtils(commands.Cog):
|
|||
else:
|
||||
await ctx.send(content="Object not found!", reference=ctx.message.to_reference(fail_if_not_exists=False))
|
||||
|
||||
@commands.command(name='dig', aliases=['dnslookup', 'nslookup'])
|
||||
@commands.command(name="dig", aliases=["dnslookup", "nslookup"])
|
||||
@commands.is_owner()
|
||||
async def dig(self, ctx: commands.Context, name: str, record_type: str | None = None, server: str | None = None, port: int = 53) -> None:
|
||||
"""Retrieve DNS information for a domain.
|
||||
|
@ -113,13 +110,13 @@ class SeaUtils(commands.Cog):
|
|||
Uses `dig` to perform a DNS query. Will fall back to `nslookup` if `dig` is not installed on the system.
|
||||
`nslookup` does not provide as much information as `dig`, so only the `name` parameter will be used if `nslookup` is used.
|
||||
Will return the A, AAAA, and CNAME records for a domain by default. You can specify a different record type with the `type` parameter."""
|
||||
command_opts: list[str | int] = ['dig']
|
||||
query_types: list[str] = [record_type] if record_type else ['A', 'AAAA', 'CNAME']
|
||||
command_opts: list[str | int] = ["dig"]
|
||||
query_types: list[str] = [record_type] if record_type else ["A", "AAAA", "CNAME"]
|
||||
if server:
|
||||
command_opts.extend(['@', server])
|
||||
command_opts.extend(["@", server])
|
||||
for query_type in query_types:
|
||||
command_opts.extend([name, query_type])
|
||||
command_opts.extend(['-p', str(port), '+yaml'])
|
||||
command_opts.extend(["-p", str(port), "+yaml"])
|
||||
|
||||
try:
|
||||
process: Process = await asyncio.create_subprocess_exec(*command_opts, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
|
||||
|
@ -128,22 +125,18 @@ class SeaUtils(commands.Cog):
|
|||
await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=stderr.decode()))
|
||||
else:
|
||||
data = yaml.safe_load(stdout.decode())
|
||||
message_data: dict = data[0]['message']
|
||||
response_data: dict = message_data['response_message_data']
|
||||
message_data: dict = data[0]["message"]
|
||||
response_data: dict = message_data["response_message_data"]
|
||||
if ctx.embed_requested():
|
||||
embed = Embed(
|
||||
title="DNS Query Result",
|
||||
color=await ctx.embed_color(),
|
||||
timestamp=message_data['response_time']
|
||||
)
|
||||
embed.add_field(name="Response Address", value=message_data['response_address'], inline=True)
|
||||
embed.add_field(name="Response Port", value=message_data['response_port'], inline=True)
|
||||
embed.add_field(name="Query Address", value=message_data['query_address'], inline=True)
|
||||
embed.add_field(name="Query Port", value=message_data['query_port'], inline=True)
|
||||
embed.add_field(name="Status", value=response_data['status'], inline=True)
|
||||
embed.add_field(name="Flags", value=response_data['flags'], inline=True)
|
||||
embed = Embed(title="DNS Query Result", color=await ctx.embed_color(), timestamp=message_data["response_time"])
|
||||
embed.add_field(name="Response Address", value=message_data["response_address"], inline=True)
|
||||
embed.add_field(name="Response Port", value=message_data["response_port"], inline=True)
|
||||
embed.add_field(name="Query Address", value=message_data["query_address"], inline=True)
|
||||
embed.add_field(name="Query Port", value=message_data["query_port"], inline=True)
|
||||
embed.add_field(name="Status", value=response_data["status"], inline=True)
|
||||
embed.add_field(name="Flags", value=response_data["flags"], inline=True)
|
||||
|
||||
if response_data.get('status') != 'NOERROR':
|
||||
if response_data.get("status") != "NOERROR":
|
||||
embed.colour = Color.red()
|
||||
embed.description = cf.error("Dig query did not return `NOERROR` status.")
|
||||
|
||||
|
@ -151,19 +144,19 @@ class SeaUtils(commands.Cog):
|
|||
answers = []
|
||||
authorities = []
|
||||
for m in data:
|
||||
response = m['message']['response_message_data']
|
||||
if 'QUESTION_SECTION' in response:
|
||||
for question in response['QUESTION_SECTION']:
|
||||
response = m["message"]["response_message_data"]
|
||||
if "QUESTION_SECTION" in response:
|
||||
for question in response["QUESTION_SECTION"]:
|
||||
if question not in questions:
|
||||
questions.append(question)
|
||||
|
||||
if 'ANSWER_SECTION' in response:
|
||||
for answer in response['ANSWER_SECTION']:
|
||||
if "ANSWER_SECTION" in response:
|
||||
for answer in response["ANSWER_SECTION"]:
|
||||
if answer not in answers:
|
||||
answers.append(answer)
|
||||
|
||||
if 'AUTHORITY_SECTION' in response:
|
||||
for authority in response['AUTHORITY_SECTION']:
|
||||
if "AUTHORITY_SECTION" in response:
|
||||
for authority in response["AUTHORITY_SECTION"]:
|
||||
if authority not in authorities:
|
||||
authorities.append(authority)
|
||||
|
||||
|
@ -183,26 +176,22 @@ class SeaUtils(commands.Cog):
|
|||
embed.add_field(name="Authority Section", value=f"{cf.box(text=authority_section, lang='prolog')}", inline=False)
|
||||
await ctx.send(embed=embed)
|
||||
else:
|
||||
await ctx.send(content=cf.box(text=stdout, lang='yaml'))
|
||||
except (FileNotFoundError):
|
||||
await ctx.send(content=cf.box(text=stdout, lang="yaml"))
|
||||
except FileNotFoundError:
|
||||
try:
|
||||
ns_process = await asyncio.create_subprocess_exec('nslookup', name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
|
||||
ns_process = await asyncio.create_subprocess_exec("nslookup", name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
|
||||
ns_stdout, ns_stderr = await ns_process.communicate()
|
||||
if ns_stderr:
|
||||
await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=ns_stderr.decode()))
|
||||
else:
|
||||
warning = cf.warning("`dig` is not installed! Defaulting to `nslookup`.\nThis command provides more information when `dig` is installed on the system.\n")
|
||||
if await ctx.embed_requested():
|
||||
embed = Embed(
|
||||
title="DNS Query Result",
|
||||
color=await ctx.embed_color(),
|
||||
timestamp=ctx.message.created_at
|
||||
)
|
||||
embed = Embed(title="DNS Query Result", color=await ctx.embed_color(), timestamp=ctx.message.created_at)
|
||||
embed.description = warning + cf.box(text=ns_stdout.decode())
|
||||
await ctx.send(embed=embed)
|
||||
else:
|
||||
await ctx.send(content = warning + cf.box(text=ns_stdout.decode()))
|
||||
except (FileNotFoundError):
|
||||
await ctx.send(content=warning + cf.box(text=ns_stdout.decode()))
|
||||
except FileNotFoundError:
|
||||
await ctx.maybe_send_embed(message=cf.error("Neither `dig` nor `nslookup` are installed on the system. Unable to resolve DNS query."))
|
||||
|
||||
@commands.command()
|
||||
|
@ -210,15 +199,15 @@ class SeaUtils(commands.Cog):
|
|||
"""Retrieve the text of an RFC document.
|
||||
|
||||
This command uses the [RFC Editor website](https://www.rfc-editor.org/) to fetch the text of an RFC document.
|
||||
A [Request for Comments (RFC)](https://en.wikipedia.org/wiki/Request_for_Comments) is a publication in a series from the principal technical development and standards-setting bodies for the [Internet](https://en.wikipedia.org/wiki/Internet), most prominently the [Internet Engineering Task Force](https://en.wikipedia.org/wiki/Internet_Engineering_Task_Force). An RFC is authored by individuals or groups of engineers and [computer scientists](https://en.wikipedia.org/wiki/Computer_scientist) in the form of a [memorandum](https://en.wikipedia.org/wiki/Memorandum) describing methods, behaviors, research, or innovations applicable to the working of the Internet and Internet-connected systems. It is submitted either for [peer review](https://en.wikipedia.org/wiki/Peer_review) or to convey new concepts, information, or, occasionally, engineering humor.""" # noqa: E501
|
||||
A [Request for Comments (RFC)](https://en.wikipedia.org/wiki/Request_for_Comments) is a publication in a series from the principal technical development and standards-setting bodies for the [Internet](https://en.wikipedia.org/wiki/Internet), most prominently the [Internet Engineering Task Force](https://en.wikipedia.org/wiki/Internet_Engineering_Task_Force). An RFC is authored by individuals or groups of engineers and [computer scientists](https://en.wikipedia.org/wiki/Computer_scientist) in the form of a [memorandum](https://en.wikipedia.org/wiki/Memorandum) describing methods, behaviors, research, or innovations applicable to the working of the Internet and Internet-connected systems. It is submitted either for [peer review](https://en.wikipedia.org/wiki/Peer_review) or to convey new concepts, information, or, occasionally, engineering humor.""" # noqa: E501
|
||||
url = f"https://www.rfc-editor.org/rfc/rfc{number}.html"
|
||||
datatracker_url = f"https://datatracker.ietf.org/doc/rfc{number}"
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url=url) as response:
|
||||
if response.status == 200:
|
||||
html = await response.text()
|
||||
soup = BeautifulSoup(html, 'html.parser')
|
||||
pre_tags = soup.find_all('pre')
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
pre_tags = soup.find_all("pre")
|
||||
content: list[Embed | str] = []
|
||||
for pre_tag in pre_tags:
|
||||
text = format_rfc_text(md(pre_tag), number)
|
||||
|
@ -226,26 +215,15 @@ class SeaUtils(commands.Cog):
|
|||
pagified_text = cf.pagify(text, delims=["\n\n"], page_length=4096)
|
||||
for page in pagified_text:
|
||||
if await ctx.embed_requested():
|
||||
embed = Embed(
|
||||
title=f"RFC Document {number}",
|
||||
url=datatracker_url,
|
||||
description=page,
|
||||
color=await ctx.embed_color()
|
||||
)
|
||||
embed = Embed(title=f"RFC Document {number}", url=datatracker_url, description=page, color=await ctx.embed_color())
|
||||
content.append(embed)
|
||||
else:
|
||||
content.append(page)
|
||||
elif await ctx.embed_requested():
|
||||
embed = Embed(title=f"RFC Document {number}", url=datatracker_url, description=text, color=await ctx.embed_color())
|
||||
content.append(embed)
|
||||
else:
|
||||
if await ctx.embed_requested():
|
||||
embed = Embed(
|
||||
title=f"RFC Document {number}",
|
||||
url=datatracker_url,
|
||||
description=text,
|
||||
color=await ctx.embed_color()
|
||||
)
|
||||
content.append(embed)
|
||||
else:
|
||||
content.append(text)
|
||||
content.append(text)
|
||||
if await ctx.embed_requested():
|
||||
for embed in content:
|
||||
embed.set_footer(text=f"Page {content.index(embed) + 1}/{len(content)}")
|
||||
|
|
Loading…
Add table
Reference in a new issue