diff --git a/seautils/seautils.py b/seautils/seautils.py index a895c35..272112d 100644 --- a/seautils/seautils.py +++ b/seautils/seautils.py @@ -29,12 +29,14 @@ from redbot.core.utils.views import SimpleMenu def md(soup: BeautifulSoup, **options) -> Any | str: return MarkdownConverter(**options).convert_soup(soup=soup) + def format_rfc_text(text: str, number: int) -> str: one: str = re.sub(r"\(\.\/rfc(\d+)", r"(https://www.rfc-editor.org/rfc/rfc\1.html", text) two: str = re.sub(r"\((#(?:section|page)-\d+(?:.\d+)?)\)", f"(https://www.rfc-editor.org/rfc/rfc{number}.html\1)", one) three: str = re.sub(r"\n{3,}", "\n\n", two) return three + class SeaUtils(commands.Cog): """A collection of random utilities.""" @@ -57,7 +59,6 @@ class SeaUtils(commands.Cog): ] return "\n".join(text) - def format_src(self, obj: Any) -> str: """A large portion of this code is repurposed from Zephyrkul's RTFS cog. https://github.com/Zephyrkul/FluffyCogs/blob/master/rtfs/rtfs.py""" @@ -75,7 +76,7 @@ class SeaUtils(commands.Cog): @commands.command(aliases=["source", "src", "code", "showsource"]) @commands.is_owner() - async def showcode(self, ctx: commands.Context, *, object: str) -> None: # pylint: disable=redefined-builtin + async def showcode(self, ctx: commands.Context, *, object: str) -> None: # pylint: disable=redefined-builtin """Show the code for a particular object.""" try: if object.startswith("/") and (obj := ctx.bot.tree.get_command(object[1:])): @@ -86,11 +87,7 @@ class SeaUtils(commands.Cog): text = self.format_src(obj) else: raise AttributeError - temp_content = cf.pagify( - text=cleanup_code(text), - escape_mass_mentions=True, - page_length = 1977 - ) + temp_content = cf.pagify(text=cleanup_code(text), escape_mass_mentions=True, page_length=1977) content = [] max_i = operator.length_hint(temp_content) i = 1 @@ -105,7 +102,7 @@ class SeaUtils(commands.Cog): else: await ctx.send(content="Object not found!", reference=ctx.message.to_reference(fail_if_not_exists=False)) - @commands.command(name='dig', aliases=['dnslookup', 'nslookup']) + @commands.command(name="dig", aliases=["dnslookup", "nslookup"]) @commands.is_owner() async def dig(self, ctx: commands.Context, name: str, record_type: str | None = None, server: str | None = None, port: int = 53) -> None: """Retrieve DNS information for a domain. @@ -113,13 +110,13 @@ class SeaUtils(commands.Cog): Uses `dig` to perform a DNS query. Will fall back to `nslookup` if `dig` is not installed on the system. `nslookup` does not provide as much information as `dig`, so only the `name` parameter will be used if `nslookup` is used. Will return the A, AAAA, and CNAME records for a domain by default. You can specify a different record type with the `type` parameter.""" - command_opts: list[str | int] = ['dig'] - query_types: list[str] = [record_type] if record_type else ['A', 'AAAA', 'CNAME'] + command_opts: list[str | int] = ["dig"] + query_types: list[str] = [record_type] if record_type else ["A", "AAAA", "CNAME"] if server: - command_opts.extend(['@', server]) + command_opts.extend(["@", server]) for query_type in query_types: command_opts.extend([name, query_type]) - command_opts.extend(['-p', str(port), '+yaml']) + command_opts.extend(["-p", str(port), "+yaml"]) try: process: Process = await asyncio.create_subprocess_exec(*command_opts, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) @@ -128,22 +125,18 @@ class SeaUtils(commands.Cog): await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=stderr.decode())) else: data = yaml.safe_load(stdout.decode()) - message_data: dict = data[0]['message'] - response_data: dict = message_data['response_message_data'] + message_data: dict = data[0]["message"] + response_data: dict = message_data["response_message_data"] if ctx.embed_requested(): - embed = Embed( - title="DNS Query Result", - color=await ctx.embed_color(), - timestamp=message_data['response_time'] - ) - embed.add_field(name="Response Address", value=message_data['response_address'], inline=True) - embed.add_field(name="Response Port", value=message_data['response_port'], inline=True) - embed.add_field(name="Query Address", value=message_data['query_address'], inline=True) - embed.add_field(name="Query Port", value=message_data['query_port'], inline=True) - embed.add_field(name="Status", value=response_data['status'], inline=True) - embed.add_field(name="Flags", value=response_data['flags'], inline=True) + embed = Embed(title="DNS Query Result", color=await ctx.embed_color(), timestamp=message_data["response_time"]) + embed.add_field(name="Response Address", value=message_data["response_address"], inline=True) + embed.add_field(name="Response Port", value=message_data["response_port"], inline=True) + embed.add_field(name="Query Address", value=message_data["query_address"], inline=True) + embed.add_field(name="Query Port", value=message_data["query_port"], inline=True) + embed.add_field(name="Status", value=response_data["status"], inline=True) + embed.add_field(name="Flags", value=response_data["flags"], inline=True) - if response_data.get('status') != 'NOERROR': + if response_data.get("status") != "NOERROR": embed.colour = Color.red() embed.description = cf.error("Dig query did not return `NOERROR` status.") @@ -151,19 +144,19 @@ class SeaUtils(commands.Cog): answers = [] authorities = [] for m in data: - response = m['message']['response_message_data'] - if 'QUESTION_SECTION' in response: - for question in response['QUESTION_SECTION']: + response = m["message"]["response_message_data"] + if "QUESTION_SECTION" in response: + for question in response["QUESTION_SECTION"]: if question not in questions: questions.append(question) - if 'ANSWER_SECTION' in response: - for answer in response['ANSWER_SECTION']: + if "ANSWER_SECTION" in response: + for answer in response["ANSWER_SECTION"]: if answer not in answers: answers.append(answer) - if 'AUTHORITY_SECTION' in response: - for authority in response['AUTHORITY_SECTION']: + if "AUTHORITY_SECTION" in response: + for authority in response["AUTHORITY_SECTION"]: if authority not in authorities: authorities.append(authority) @@ -183,26 +176,22 @@ class SeaUtils(commands.Cog): embed.add_field(name="Authority Section", value=f"{cf.box(text=authority_section, lang='prolog')}", inline=False) await ctx.send(embed=embed) else: - await ctx.send(content=cf.box(text=stdout, lang='yaml')) - except (FileNotFoundError): + await ctx.send(content=cf.box(text=stdout, lang="yaml")) + except FileNotFoundError: try: - ns_process = await asyncio.create_subprocess_exec('nslookup', name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) + ns_process = await asyncio.create_subprocess_exec("nslookup", name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) ns_stdout, ns_stderr = await ns_process.communicate() if ns_stderr: await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=ns_stderr.decode())) else: warning = cf.warning("`dig` is not installed! Defaulting to `nslookup`.\nThis command provides more information when `dig` is installed on the system.\n") if await ctx.embed_requested(): - embed = Embed( - title="DNS Query Result", - color=await ctx.embed_color(), - timestamp=ctx.message.created_at - ) + embed = Embed(title="DNS Query Result", color=await ctx.embed_color(), timestamp=ctx.message.created_at) embed.description = warning + cf.box(text=ns_stdout.decode()) await ctx.send(embed=embed) else: - await ctx.send(content = warning + cf.box(text=ns_stdout.decode())) - except (FileNotFoundError): + await ctx.send(content=warning + cf.box(text=ns_stdout.decode())) + except FileNotFoundError: await ctx.maybe_send_embed(message=cf.error("Neither `dig` nor `nslookup` are installed on the system. Unable to resolve DNS query.")) @commands.command() @@ -210,15 +199,15 @@ class SeaUtils(commands.Cog): """Retrieve the text of an RFC document. This command uses the [RFC Editor website](https://www.rfc-editor.org/) to fetch the text of an RFC document. - A [Request for Comments (RFC)](https://en.wikipedia.org/wiki/Request_for_Comments) is a publication in a series from the principal technical development and standards-setting bodies for the [Internet](https://en.wikipedia.org/wiki/Internet), most prominently the [Internet Engineering Task Force](https://en.wikipedia.org/wiki/Internet_Engineering_Task_Force). An RFC is authored by individuals or groups of engineers and [computer scientists](https://en.wikipedia.org/wiki/Computer_scientist) in the form of a [memorandum](https://en.wikipedia.org/wiki/Memorandum) describing methods, behaviors, research, or innovations applicable to the working of the Internet and Internet-connected systems. It is submitted either for [peer review](https://en.wikipedia.org/wiki/Peer_review) or to convey new concepts, information, or, occasionally, engineering humor.""" # noqa: E501 + A [Request for Comments (RFC)](https://en.wikipedia.org/wiki/Request_for_Comments) is a publication in a series from the principal technical development and standards-setting bodies for the [Internet](https://en.wikipedia.org/wiki/Internet), most prominently the [Internet Engineering Task Force](https://en.wikipedia.org/wiki/Internet_Engineering_Task_Force). An RFC is authored by individuals or groups of engineers and [computer scientists](https://en.wikipedia.org/wiki/Computer_scientist) in the form of a [memorandum](https://en.wikipedia.org/wiki/Memorandum) describing methods, behaviors, research, or innovations applicable to the working of the Internet and Internet-connected systems. It is submitted either for [peer review](https://en.wikipedia.org/wiki/Peer_review) or to convey new concepts, information, or, occasionally, engineering humor.""" # noqa: E501 url = f"https://www.rfc-editor.org/rfc/rfc{number}.html" datatracker_url = f"https://datatracker.ietf.org/doc/rfc{number}" async with aiohttp.ClientSession() as session: async with session.get(url=url) as response: if response.status == 200: html = await response.text() - soup = BeautifulSoup(html, 'html.parser') - pre_tags = soup.find_all('pre') + soup = BeautifulSoup(html, "html.parser") + pre_tags = soup.find_all("pre") content: list[Embed | str] = [] for pre_tag in pre_tags: text = format_rfc_text(md(pre_tag), number) @@ -226,26 +215,15 @@ class SeaUtils(commands.Cog): pagified_text = cf.pagify(text, delims=["\n\n"], page_length=4096) for page in pagified_text: if await ctx.embed_requested(): - embed = Embed( - title=f"RFC Document {number}", - url=datatracker_url, - description=page, - color=await ctx.embed_color() - ) + embed = Embed(title=f"RFC Document {number}", url=datatracker_url, description=page, color=await ctx.embed_color()) content.append(embed) else: content.append(page) + elif await ctx.embed_requested(): + embed = Embed(title=f"RFC Document {number}", url=datatracker_url, description=text, color=await ctx.embed_color()) + content.append(embed) else: - if await ctx.embed_requested(): - embed = Embed( - title=f"RFC Document {number}", - url=datatracker_url, - description=text, - color=await ctx.embed_color() - ) - content.append(embed) - else: - content.append(text) + content.append(text) if await ctx.embed_requested(): for embed in content: embed.set_footer(text=f"Page {content.index(embed) + 1}/{len(content)}")