diff --git a/.forgejo/workflows/config/.pylintrc b/.forgejo/workflows/config/.pylintrc index 94010af..2dd59c9 100644 --- a/.forgejo/workflows/config/.pylintrc +++ b/.forgejo/workflows/config/.pylintrc @@ -18,5 +18,4 @@ import-self, relative-beyond-top-level, too-many-instance-attributes, - duplicate-code, - too-many-nested-blocks + duplicate-code diff --git a/.forgejo/workflows/workflow.yaml b/.forgejo/workflows/workflow.yaml index 98b12fc..f88c687 100644 --- a/.forgejo/workflows/workflow.yaml +++ b/.forgejo/workflows/workflow.yaml @@ -58,7 +58,7 @@ jobs: npx -p "@getmeli/cli" meli upload ./site \ --url "https://pages.coastalcommits.com" \ --site "${{ vars.MELI_SITE_ID }}" \ - --token "${{ secrets.MELI_SECRET }}" \ + --token "${{ secrets.MELI_SITE_SECRET }}" \ --release "$CI_ACTION_REF_NAME_SLUG/${{ env.GITHUB_SHA }}" \ --branch "$CI_ACTION_REF_NAME_SLUG" diff --git a/backup/backup.py b/backup/backup.py index 6202e3a..7fe9b4d 100644 --- a/backup/backup.py +++ b/backup/backup.py @@ -100,7 +100,7 @@ class Backup(commands.Cog): except (json.JSONDecodeError, IndexError): try: export = json.loads(await ctx.message.reference.resolved.attachments[0].read()) - except (json.JSONDecodeError, IndexError, AttributeError): + except (json.JSONDecodeError, IndexError): await ctx.send(error("Please provide a valid JSON export file.")) return diff --git a/emojiinfo/model.py b/emojiinfo/model.py index cc8a468..af457d1 100644 --- a/emojiinfo/model.py +++ b/emojiinfo/model.py @@ -81,7 +81,6 @@ class PartialEmoji(discord.PartialEmoji): with open(path, "r", encoding="UTF-8") as file: emojis: dict = json.load(file) emoji_aliases = [] - emoji_group = None for dict_name, group in emojis.items(): for k, v in group.items(): if v == value: diff --git a/poetry.lock b/poetry.lock index 017d640..50be601 100644 --- a/poetry.lock +++ b/poetry.lock @@ -228,27 +228,6 @@ files = [ [package.extras] dev = ["freezegun (>=1.0,<2.0)", "pytest (>=6.0)", "pytest-cov"] -[[package]] -name = "beautifulsoup4" -version = "4.12.3" -description = "Screen-scraping library" -optional = false -python-versions = ">=3.6.0" -files = [ - {file = "beautifulsoup4-4.12.3-py3-none-any.whl", hash = "sha256:b80878c9f40111313e55da8ba20bdba06d8fa3969fc68304167741bbf9e082ed"}, - {file = "beautifulsoup4-4.12.3.tar.gz", hash = "sha256:74e3d1928edc070d21748185c46e3fb33490f22f52a3addee9aee0f4f7781051"}, -] - -[package.dependencies] -soupsieve = ">1.2" - -[package.extras] -cchardet = ["cchardet"] -chardet = ["chardet"] -charset-normalizer = ["charset-normalizer"] -html5lib = ["html5lib"] -lxml = ["lxml"] - [[package]] name = "brotli" version = "1.1.0" @@ -911,21 +890,6 @@ profiling = ["gprof2dot"] rtd = ["jupyter_sphinx", "mdit-py-plugins", "myst-parser", "pyyaml", "sphinx", "sphinx-copybutton", "sphinx-design", "sphinx_book_theme"] testing = ["coverage", "pytest", "pytest-cov", "pytest-regressions"] -[[package]] -name = "markdownify" -version = "0.12.1" -description = "Convert HTML to markdown." -optional = false -python-versions = "*" -files = [ - {file = "markdownify-0.12.1-py3-none-any.whl", hash = "sha256:a3805abd8166dbb7b27783c5599d91f54f10d79894b2621404d85b333c7ce561"}, - {file = "markdownify-0.12.1.tar.gz", hash = "sha256:1fb08c618b30e0ee7a31a39b998f44a18fb28ab254f55f4af06b6d35a2179e27"}, -] - -[package.dependencies] -beautifulsoup4 = ">=4.9,<5" -six = ">=1.15,<2" - [[package]] name = "markupsafe" version = "2.1.5" @@ -2147,17 +2111,6 @@ files = [ {file = "smmap-5.0.1.tar.gz", hash = "sha256:dceeb6c0028fdb6734471eb07c0cd2aae706ccaecab45965ee83f11c8d3b1f62"}, ] -[[package]] -name = "soupsieve" -version = "2.5" -description = "A modern CSS selector implementation for Beautiful Soup." -optional = false -python-versions = ">=3.8" -files = [ - {file = "soupsieve-2.5-py3-none-any.whl", hash = "sha256:eaa337ff55a1579b6549dc679565eac1e3d000563bcb1c8ab0d0fefbc0c2cdc7"}, - {file = "soupsieve-2.5.tar.gz", hash = "sha256:5663d5a7b3bfaeee0bc4372e7fc48f9cff4940b3eec54a6451cc5299f1097690"}, -] - [[package]] name = "tinycss2" version = "1.2.1" @@ -2498,4 +2451,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = ">=3.11,<3.12" -content-hash = "229d7fd39618cf708f3cd5409dde2e6e25b822e4f936e14b3ade9800bf00daab" +content-hash = "0ac382e0399d9c23c5f89a0ffeb3aae056dc8b28e864b22f815c0e3eb34175bd" diff --git a/pyproject.toml b/pyproject.toml index 872ccdf..245364d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,8 +15,6 @@ websockets = "^12.0" pillow = "^10.3.0" numpy = "^1.26.4" colorthief = "^0.2.1" -beautifulsoup4 = "^4.12.3" -markdownify = "^0.12.1" [tool.poetry.group.dev] optional = true diff --git a/seautils/info.json b/seautils/info.json index 7356137..f331eac 100644 --- a/seautils/info.json +++ b/seautils/info.json @@ -8,6 +8,5 @@ "hidden": true, "disabled": false, "min_bot_version": "3.5.0", - "min_python_version": [3, 8, 0], - "requirements": ["beautifulsoup4", "markdownify"] + "min_python_version": [3, 8, 0] } diff --git a/seautils/seautils.py b/seautils/seautils.py index 3bbf5cf..c575f6b 100644 --- a/seautils/seautils.py +++ b/seautils/seautils.py @@ -5,20 +5,13 @@ # ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ | # |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_| -import asyncio import inspect import operator -import re -from asyncio.subprocess import Process from functools import partial, partialmethod from typing import Any -import aiohttp -import yaml -from bs4 import BeautifulSoup -from discord import Color, Embed, app_commands +from discord import Embed, app_commands from discord.utils import CachedSlotProperty, cached_property -from markdownify import MarkdownConverter from redbot.core import commands from redbot.core.bot import Red from redbot.core.dev_commands import cleanup_code @@ -26,38 +19,29 @@ from redbot.core.utils import chat_formatting as cf from redbot.core.utils.views import SimpleMenu -def md(soup: BeautifulSoup, **options) -> Any | str: - return MarkdownConverter(**options).convert_soup(soup=soup) - -def format_rfc_text(text: str, number: int) -> str: - one: str = re.sub(r"\(\.\/rfc(\d+)", r"(https://www.rfc-editor.org/rfc/rfc\1.html", text) - two: str = re.sub(r"\((#(?:section|page)-\d+(?:.\d+)?)\)", f"(https://www.rfc-editor.org/rfc/rfc{number}.html\1)", one) - three: str = re.sub(r"\n{3,}", "\n\n", two) - return three - class SeaUtils(commands.Cog): """A collection of random utilities.""" __author__ = ["SeaswimmerTheFsh"] __version__ = "1.0.0" - def __init__(self, bot: Red) -> None: + def __init__(self, bot: Red): self.bot = bot def format_help_for_context(self, ctx: commands.Context) -> str: - pre_processed = super().format_help_for_context(ctx=ctx) or "" + pre_processed = super().format_help_for_context(ctx) or "" n = "\n" if "\n\n" not in pre_processed else "" text = [ f"{pre_processed}{n}", f"Cog Version: **{self.__version__}**", - f"Author: {cf.humanize_list(items=self.__author__)}" + f"Author: {cf.humanize_list(self.__author__)}" ] return "\n".join(text) def format_src(self, obj: Any) -> str: """A large portion of this code is repurposed from Zephyrkul's RTFS cog. https://github.com/Zephyrkul/FluffyCogs/blob/master/rtfs/rtfs.py""" - obj = inspect.unwrap(func=obj) + obj = inspect.unwrap(obj) src: Any = getattr(obj, "__func__", obj) if isinstance(obj, (commands.Command, app_commands.Command)): src = obj.callback @@ -67,11 +51,11 @@ class SeaUtils(commands.Cog): src = obj.fget elif isinstance(obj, (cached_property, CachedSlotProperty)): src = obj.function - return inspect.getsource(object=src) + return inspect.getsource(src) @commands.command(aliases=["source", "src", "code", "showsource"]) @commands.is_owner() - async def showcode(self, ctx: commands.Context, *, object: str) -> None: # pylint: disable=redefined-builtin + async def showcode(self, ctx: commands.Context, *, object: str): # pylint: disable=redefined-builtin """Show the code for a particular object.""" try: if object.startswith("/") and (obj := ctx.bot.tree.get_command(object[1:])): @@ -80,8 +64,6 @@ class SeaUtils(commands.Cog): text = self.format_src(type(obj)) elif obj := ctx.bot.get_command(object): text = self.format_src(obj) - else: - raise AttributeError temp_content = cf.pagify( text=cleanup_code(text), escape_mass_mentions=True, @@ -100,151 +82,3 @@ class SeaUtils(commands.Cog): await ctx.send(embed=embed, reference=ctx.message.to_reference(fail_if_not_exists=False)) else: await ctx.send(content="Object not found!", reference=ctx.message.to_reference(fail_if_not_exists=False)) - - @commands.command(name='dig', aliases=['dnslookup', 'nslookup']) - @commands.is_owner() - async def dig(self, ctx: commands.Context, name: str, record_type: str | None = None, server: str | None = None, port: int = 53) -> None: - """Retrieve DNS information for a domain. - - Uses `dig` to perform a DNS query. Will fall back to `nslookup` if `dig` is not installed on the system. - `nslookup` does not provide as much information as `dig`, so only the `name` parameter will be used if `nslookup` is used. - Will return the A, AAAA, and CNAME records for a domain by default. You can specify a different record type with the `type` parameter.""" - command_opts: list[str | int] = ['dig'] - query_types: list[str] = [record_type] if record_type else ['A', 'AAAA', 'CNAME'] - if server: - command_opts.extend(['@', server]) - for query_type in query_types: - command_opts.extend([name, query_type]) - command_opts.extend(['-p', str(port), '+yaml']) - - try: - process: Process = await asyncio.create_subprocess_exec(*command_opts, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) - stdout, stderr = await process.communicate() - if stderr: - await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=stderr.decode())) - else: - data = yaml.safe_load(stdout.decode()) - message_data: dict = data[0]['message'] - response_data: dict = message_data['response_message_data'] - if ctx.embed_requested(): - embed = Embed( - title="DNS Query Result", - color=await ctx.embed_color(), - timestamp=message_data['response_time'] - ) - embed.add_field(name="Response Address", value=message_data['response_address'], inline=True) - embed.add_field(name="Response Port", value=message_data['response_port'], inline=True) - embed.add_field(name="Query Address", value=message_data['query_address'], inline=True) - embed.add_field(name="Query Port", value=message_data['query_port'], inline=True) - embed.add_field(name="Status", value=response_data['status'], inline=True) - embed.add_field(name="Flags", value=response_data['flags'], inline=True) - - if response_data.get('status') != 'NOERROR': - embed.colour = Color.red() - embed.description = cf.error("Dig query did not return `NOERROR` status.") - - questions = [] - answers = [] - authorities = [] - for m in data: - response = m['message']['response_message_data'] - if 'QUESTION_SECTION' in response: - for question in response['QUESTION_SECTION']: - if question not in questions: - questions.append(question) - - if 'ANSWER_SECTION' in response: - for answer in response['ANSWER_SECTION']: - if answer not in answers: - answers.append(answer) - - if 'AUTHORITY_SECTION' in response: - for authority in response['AUTHORITY_SECTION']: - if authority not in authorities: - authorities.append(authority) - - if questions: - question_section = "\n".join(questions) - embed.add_field(name="Question Section", value=f"{cf.box(text=question_section, lang='prolog')}", inline=False) - - if answers: - answer_section = "\n".join(answers) - if len(answer_section) > 1024: - embed.description = cf.warning("Answer section is too long to fit within embed field, falling back to description.") + cf.box(answer_section) - else: - embed.add_field(name="Answer Section", value=f"{cf.box(text=answer_section, lang='prolog')}", inline=False) - - if authorities: - authority_section = "\n".join(authorities) - embed.add_field(name="Authority Section", value=f"{cf.box(text=authority_section, lang='prolog')}", inline=False) - await ctx.send(embed=embed) - else: - await ctx.send(content=cf.box(text=stdout, lang='yaml')) - except (FileNotFoundError): - try: - ns_process = await asyncio.create_subprocess_exec('nslookup', name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) - ns_stdout, ns_stderr = await ns_process.communicate() - if ns_stderr: - await ctx.maybe_send_embed(message="An error was encountered!\n" + cf.box(text=ns_stderr.decode())) - else: - warning = cf.warning("`dig` is not installed! Defaulting to `nslookup`.\nThis command provides more information when `dig` is installed on the system.\n") - if await ctx.embed_requested(): - embed = Embed( - title="DNS Query Result", - color=await ctx.embed_color(), - timestamp=ctx.message.created_at - ) - embed.description = warning + cf.box(text=ns_stdout.decode()) - await ctx.send(embed=embed) - else: - await ctx.send(content = warning + cf.box(text=ns_stdout.decode())) - except (FileNotFoundError): - await ctx.maybe_send_embed(message=cf.error("Neither `dig` nor `nslookup` are installed on the system. Unable to resolve DNS query.")) - - @commands.command() - async def rfc(self, ctx: commands.Context, number: int) -> None: - """Retrieve the text of an RFC document. - - This command uses the [RFC Editor website](https://www.rfc-editor.org/) to fetch the text of an RFC document. - A [Request for Comments (RFC)](https://en.wikipedia.org/wiki/Request_for_Comments) is a publication in a series from the principal technical development and standards-setting bodies for the [Internet](https://en.wikipedia.org/wiki/Internet), most prominently the [Internet Engineering Task Force](https://en.wikipedia.org/wiki/Internet_Engineering_Task_Force). An RFC is authored by individuals or groups of engineers and [computer scientists](https://en.wikipedia.org/wiki/Computer_scientist) in the form of a [memorandum](https://en.wikipedia.org/wiki/Memorandum) describing methods, behaviors, research, or innovations applicable to the working of the Internet and Internet-connected systems. It is submitted either for [peer review](https://en.wikipedia.org/wiki/Peer_review) or to convey new concepts, information, or, occasionally, engineering humor.""" # noqa: E501 - url = f"https://www.rfc-editor.org/rfc/rfc{number}.html" - datatracker_url = f"https://datatracker.ietf.org/doc/rfc{number}" - async with aiohttp.ClientSession() as session: - async with session.get(url=url) as response: - if response.status == 200: - html = await response.text() - soup = BeautifulSoup(html, 'html.parser') - pre_tags = soup.find_all('pre') - content: list[Embed | str] = [] - for pre_tag in pre_tags: - text = format_rfc_text(md(pre_tag), number) - if len(text) > 4096: - pagified_text = cf.pagify(text, delims=["\n\n"], page_length=4096) - for page in pagified_text: - if await ctx.embed_requested(): - embed = Embed( - title=f"RFC Document {number}", - url=datatracker_url, - description=page, - color=await ctx.embed_color() - ) - content.append(embed) - else: - content.append(page) - else: - if await ctx.embed_requested(): - embed = Embed( - title=f"RFC Document {number}", - url=datatracker_url, - description=text, - color=await ctx.embed_color() - ) - content.append(embed) - else: - content.append(text) - if await ctx.embed_requested(): - for embed in content: - embed.set_footer(text=f"Page {content.index(embed) + 1}/{len(content)}") - await SimpleMenu(pages=content, disable_after_timeout=True, timeout=300).start(ctx) - else: - await ctx.maybe_send_embed(content=cf.error(f"An error occurred while fetching RFC {number}. Status code: {response.status}.")) diff --git a/speedtest/__init__.py b/speedtest/__init__.py new file mode 100644 index 0000000..f91daf6 --- /dev/null +++ b/speedtest/__init__.py @@ -0,0 +1,5 @@ +from .speedtest import Speedtest + + +async def setup(bot): + await bot.add_cog(Speedtest(bot)) diff --git a/speedtest/info.json b/speedtest/info.json new file mode 100644 index 0000000..c36a53a --- /dev/null +++ b/speedtest/info.json @@ -0,0 +1,14 @@ +{ + "author" : ["SeaswimmerTheFsh (seasw.)"], + "install_msg" : "Thank you for installing Speedtest!\nYou can find the source code of this cog [here](https://coastalcommits.com/SeaswimmerTheFsh/SeaCogs).", + "name" : "Speedtest", + "short" : "A collection of useful utilities.", + "description" : "A collection of useful utilities.", + "end_user_data_statement" : "This cog does not store end user data.", + "hidden": true, + "disabled": false, + "min_bot_version": "3.5.0", + "min_python_version": [3, 10, 0], + "tags" : ["utility", "information"], + "requirements": ["pydantic"] +} diff --git a/speedtest/models.py b/speedtest/models.py new file mode 100644 index 0000000..cd1ca07 --- /dev/null +++ b/speedtest/models.py @@ -0,0 +1,76 @@ +from datetime import datetime + +from pydantic import BaseModel + + +class Speedtest(BaseModel): + type: str + timestamp: datetime + ping: "Ping" + download: "Bandwidth" + upload: "Bandwidth" + isp: str + interface: "Interface" + server: "Server" + result: "Result" + + @classmethod + def from_json(cls, data: dict) -> "Speedtest": + return cls( + type=data["type"], + timestamp=datetime.fromisoformat(data["timestamp"]), + ping=Ping(**data["ping"]), + download=Bandwidth(**data["download"]), + upload=Bandwidth(**data["upload"]), + isp=data["isp"], + interface=Interface(**data["interface"]), + server=Server(**data["server"]), + result=Result(**data["result"]) + ) + +class Bandwidth(BaseModel): + bandwidth: float + bytes: int + elapsed: int + latency: "Latency" + + @property + def mbps(self) -> float: + return self.bandwidth / 1_000_000 + +class Latency(BaseModel): + iqm: float + low: float + high: float + jitter: float + +class Interface(BaseModel): + internalIp: str + name: str + macAddr: str + isVpn: bool + externalIp: str + +class Ping(BaseModel): + jitter: float + latency: float + low: float + high: float + +class Server(BaseModel): + id: int + name: str + location: str + country: str + host: str + port: int + ip: str + +class Result(BaseModel): + id: str + url: str + persisted: bool + + @property + def image(self) -> str: + return self.url + ".png" diff --git a/speedtest/speedtest.py b/speedtest/speedtest.py new file mode 100644 index 0000000..e727b0f --- /dev/null +++ b/speedtest/speedtest.py @@ -0,0 +1,71 @@ +# _____ _ +# / ____| (_) +# | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __ +# \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__| +# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ | +# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_| + +import asyncio +import json +import subprocess + +import discord +from redbot.core import commands +from redbot.core.bot import Red +from redbot.core.utils import chat_formatting as cf + +from .models import Speedtest as sp + + +class Speedtest(commands.Cog): + """A collection of random utilities.""" + + __author__ = ["SeaswimmerTheFsh"] + __version__ = "1.0.0" + + def __init__(self, bot: Red): + self.bot = bot + + def format_help_for_context(self, ctx: commands.Context) -> str: + pre_processed = super().format_help_for_context(ctx) or "" + n = "\n" if "\n\n" not in pre_processed else "" + text = [ + f"{pre_processed}{n}", + f"Cog Version: **{self.__version__}**", + f"Author: {cf.humanize_list(self.__author__)}" + ] + return "\n".join(text) + + async def run_speedtest(self) -> str | sp: + try: + process = await asyncio.create_subprocess_exec( + "speedtest", "-f", "json", "--accept-license", + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + except FileNotFoundError: + return "Speedtest CLI is not installed." + stdout, stderr = await process.communicate() + if process.returncode != 0: + return stderr.decode("utf-8") + return sp.from_json(json.loads(stdout.decode("utf-8"))) + + @commands.command() + @commands.is_owner() + async def speedtest(self, ctx: commands.Context) -> None: + """Run a speedtest.""" + msg = await ctx.maybe_send_embed("Running speedtest...") + async with ctx.typing(): + speedtest = await self.run_speedtest() + if await ctx.embed_requested(): + if not isinstance(speedtest, sp): + await msg.edit(embed=discord.Embed(description=f"An error occurred! {speedtest}", color=discord.Colour.red())) + return + embed = discord.Embed(title="Speedtest Results", url=speedtest.result.url, color=await ctx.embed_color()) + embed.set_image(url=speedtest.result.image) + await msg.edit(embed=embed) + else: + if not isinstance(speedtest, sp): + await msg.edit(content=f"An error occurred! \n`{speedtest}`") + return + await msg.edit(content=f"**[Result]({speedtest.result.url})**")