Compare commits

..

2 commits

Author SHA1 Message Date
fa3b353704
misc(repository): black reformat
Some checks failed
Actions / Lint Code (Pylint) (push) Failing after 18s
Actions / Build Documentation (MkDocs) (push) Successful in 13s
2024-02-02 11:22:08 -05:00
519e3056ab
feat(aurora): added addrole command 2024-02-02 11:21:56 -05:00
7 changed files with 1353 additions and 456 deletions

File diff suppressed because it is too large Load diff

View file

@ -2,26 +2,27 @@ from redbot.core import Config
config: Config = Config.get_conf(None, identifier=481923957134912, cog_name="Aurora") config: Config = Config.get_conf(None, identifier=481923957134912, cog_name="Aurora")
def register_config(config_obj: Config): def register_config(config_obj: Config):
config_obj.register_guild( config_obj.register_guild(
show_moderator = True, show_moderator=True,
use_discord_permissions = True, use_discord_permissions=True,
ignore_modlog = True, ignore_modlog=True,
ignore_other_bots = True, ignore_other_bots=True,
dm_users = True, dm_users=True,
log_channel = " ", log_channel=" ",
immune_roles = [], immune_roles=[],
history_ephemeral = False, history_ephemeral=False,
history_inline = False, history_inline=False,
history_pagesize = 5, history_pagesize=5,
history_inline_pagesize = 6, history_inline_pagesize=6,
auto_evidenceformat = False, auto_evidenceformat=False,
addrole_whitelist = [] addrole_whitelist=[],
) )
config_obj.register_user( config_obj.register_user(
history_ephemeral = None, history_ephemeral=None,
history_inline = None, history_inline=None,
history_pagesize = None, history_pagesize=None,
history_inline_pagesize = None, history_inline_pagesize=None,
auto_evidenceformat = None auto_evidenceformat=None,
) )

View file

@ -1,21 +1,23 @@
# pylint: disable=cyclic-import # pylint: disable=cyclic-import
import json import json
import time
import sqlite3 import sqlite3
import time
from datetime import datetime, timedelta from datetime import datetime, timedelta
from discord import Guild from discord import Guild
from redbot.core import data_manager from redbot.core import data_manager
from .logger import logger from .logger import logger
from .utils import generate_dict, get_next_case_number, convert_timedelta_to_str from .utils import convert_timedelta_to_str, generate_dict, get_next_case_number
def connect() -> sqlite3.Connection: def connect() -> sqlite3.Connection:
"""Connects to the SQLite database, and returns a connection object.""" """Connects to the SQLite database, and returns a connection object."""
try: try:
connection = sqlite3.connect(database=data_manager.cog_data_path(raw_name='Aurora') / 'aurora.db') connection = sqlite3.connect(
database=data_manager.cog_data_path(raw_name="Aurora") / "aurora.db"
)
return connection return connection
except sqlite3.OperationalError as e: except sqlite3.OperationalError as e:
@ -85,8 +87,8 @@ async def create_guild_table(guild: Guild):
"NULL", "NULL",
"NULL", "NULL",
0, 0,
json.dumps([]), # pylint: disable=dangerous-default-value json.dumps([]), # pylint: disable=dangerous-default-value
json.dumps({}), # pylint: disable=dangerous-default-value json.dumps({}), # pylint: disable=dangerous-default-value
) )
cursor.execute(insert_query, insert_values) cursor.execute(insert_query, insert_values)

View file

@ -1,18 +1,32 @@
# pylint: disable=cyclic-import # pylint: disable=cyclic-import
from typing import Union
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Union
import humanize import humanize
from discord import Color, Embed, Guild, Interaction, InteractionMessage, User, Member from discord import Color, Embed, Guild, Interaction, InteractionMessage, Member, User
from redbot.core import commands from redbot.core import commands
from redbot.core.utils.chat_formatting import box, bold, error, warning from redbot.core.utils.chat_formatting import bold, box, error, warning
from aurora.utilities.config import config from aurora.utilities.config import config
from aurora.utilities.utils import fetch_channel_dict, fetch_user_dict, get_next_case_number, get_bool_emoji, get_pagesize_str from aurora.utilities.utils import (
fetch_channel_dict,
fetch_user_dict,
get_bool_emoji,
get_next_case_number,
get_pagesize_str,
)
async def message_factory(color: Color, guild: Guild, reason: str, moderation_type: str, moderator: Union[Member, User] = None, duration: timedelta = None, response: InteractionMessage = None) -> Embed: async def message_factory(
color: Color,
guild: Guild,
reason: str,
moderation_type: str,
moderator: Union[Member, User] = None,
duration: timedelta = None,
response: InteractionMessage = None,
) -> Embed:
"""This function creates a message from set parameters, meant for contacting the moderated user. """This function creates a message from set parameters, meant for contacting the moderated user.
Args: Args:
@ -27,7 +41,12 @@ async def message_factory(color: Color, guild: Guild, reason: str, moderation_ty
Returns: Returns:
embed: The message embed. embed: The message embed.
""" """
if response is not None and not moderation_type in ["kicked", "banned", "tempbanned", "unbanned"]: if response is not None and not moderation_type in [
"kicked",
"banned",
"tempbanned",
"unbanned",
]:
guild_name = f"[{guild.name}]({response.jump_url})" guild_name = f"[{guild.name}]({response.jump_url})"
else: else:
guild_name = guild.name guild_name = guild.name
@ -39,26 +58,43 @@ async def message_factory(color: Color, guild: Guild, reason: str, moderation_ty
if moderation_type == "note": if moderation_type == "note":
embed_desc = "received a" embed_desc = "received a"
elif moderation_type == "role added":
embed_desc = "received the"
elif moderation_type == "role removed":
embed_desc = "lost the"
else: else:
embed_desc = "been" embed_desc = "been"
embed = Embed(title=str.title(moderation_type), description=f"You have {embed_desc} {moderation_type}{embed_duration} in {guild_name}.", color=color, timestamp=datetime.now()) embed = Embed(
title=str.title(moderation_type),
description=f"You have {embed_desc} {moderation_type}{embed_duration} in {guild_name}.",
color=color,
timestamp=datetime.now(),
)
if await config.guild(guild).show_moderator() and moderator is not None: if await config.guild(guild).show_moderator() and moderator is not None:
embed.add_field(name='Moderator', value=f"`{moderator.name} ({moderator.id})`", inline=False) embed.add_field(
name="Moderator", value=f"`{moderator.name} ({moderator.id})`", inline=False
)
embed.add_field(name='Reason', value=f"`{reason}`", inline=False) embed.add_field(name="Reason", value=f"`{reason}`", inline=False)
if guild.icon.url is not None: if guild.icon.url is not None:
embed.set_author(name=guild.name, icon_url=guild.icon.url) embed.set_author(name=guild.name, icon_url=guild.icon.url)
else: else:
embed.set_author(name=guild.name) embed.set_author(name=guild.name)
embed.set_footer(text=f"Case #{await get_next_case_number(guild.id):,}", icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&") embed.set_footer(
text=f"Case #{await get_next_case_number(guild.id):,}",
icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&",
)
return embed return embed
async def log_factory(interaction: Interaction, case_dict: dict, resolved: bool = False) -> Embed:
async def log_factory(
interaction: Interaction, case_dict: dict, resolved: bool = False
) -> Embed:
"""This function creates a log embed from set parameters, meant for moderation logging. """This function creates a log embed from set parameters, meant for moderation logging.
Args: Args:
@ -67,57 +103,115 @@ async def log_factory(interaction: Interaction, case_dict: dict, resolved: bool
resolved (bool, optional): Whether the case is resolved or not. Defaults to False. resolved (bool, optional): Whether the case is resolved or not. Defaults to False.
""" """
if resolved: if resolved:
if case_dict['target_type'] == 'USER': if case_dict["target_type"] == "USER":
target_user = await fetch_user_dict(interaction, case_dict['target_id']) target_user = await fetch_user_dict(interaction, case_dict["target_id"])
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" target_name = (
elif case_dict['target_type'] == 'CHANNEL': f"`{target_user['name']}`"
target_user = await fetch_channel_dict(interaction, case_dict['target_id']) if target_user["discriminator"] == "0"
if target_user['mention']: else f"`{target_user['name']}#{target_user['discriminator']}`"
)
elif case_dict["target_type"] == "CHANNEL":
target_user = await fetch_channel_dict(interaction, case_dict["target_id"])
if target_user["mention"]:
target_name = f"{target_user['mention']}" target_name = f"{target_user['mention']}"
else: else:
target_name = f"`{target_user['name']}`" target_name = f"`{target_user['name']}`"
moderator_user = await fetch_user_dict(interaction, case_dict['moderator_id']) moderator_user = await fetch_user_dict(interaction, case_dict["moderator_id"])
moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`" moderator_name = (
f"`{moderator_user['name']}`"
if moderator_user["discriminator"] == "0"
else f"`{moderator_user['name']}#{moderator_user['discriminator']}`"
)
embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,} Resolved", color=await interaction.client.get_embed_color(interaction.channel)) embed = Embed(
title=f"📕 Case #{case_dict['moderation_id']:,} Resolved",
color=await interaction.client.get_embed_color(interaction.channel),
)
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>" embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
if case_dict['duration'] != 'NULL': if case_dict["duration"] != "NULL":
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) td = timedelta(
duration_embed = f"{humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>" if case_dict["expired"] == '0' else str(humanize.precisedelta(td)) **{
embed.description = embed.description + f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}" unit: int(val)
for unit, val in zip(
["hours", "minutes", "seconds"],
case_dict["duration"].split(":"),
)
}
)
duration_embed = (
f"{humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>"
if case_dict["expired"] == "0"
else str(humanize.precisedelta(td))
)
embed.description = (
embed.description
+ f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}"
)
embed.add_field(name='Reason', value=box(case_dict['reason']), inline=False) embed.add_field(name="Reason", value=box(case_dict["reason"]), inline=False)
resolved_user = await fetch_user_dict(interaction, case_dict['resolved_by']) resolved_user = await fetch_user_dict(interaction, case_dict["resolved_by"])
resolved_name = resolved_user['name'] if resolved_user['discriminator'] == "0" else f"{resolved_user['name']}#{resolved_user['discriminator']}" resolved_name = (
embed.add_field(name='Resolve Reason', value=f"Resolved by `{resolved_name}` ({resolved_user['id']}) for:\n" + box(case_dict['resolve_reason']), inline=False) resolved_user["name"]
if resolved_user["discriminator"] == "0"
else f"{resolved_user['name']}#{resolved_user['discriminator']}"
)
embed.add_field(
name="Resolve Reason",
value=f"Resolved by `{resolved_name}` ({resolved_user['id']}) for:\n"
+ box(case_dict["resolve_reason"]),
inline=False,
)
else: else:
if case_dict['target_type'] == 'USER': if case_dict["target_type"] == "USER":
target_user = await fetch_user_dict(interaction, case_dict['target_id']) target_user = await fetch_user_dict(interaction, case_dict["target_id"])
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" target_name = (
elif case_dict['target_type'] == 'CHANNEL': f"`{target_user['name']}`"
target_user = await fetch_channel_dict(interaction, case_dict['target_id']) if target_user["discriminator"] == "0"
if target_user['mention']: else f"`{target_user['name']}#{target_user['discriminator']}`"
target_name = target_user['mention'] )
elif case_dict["target_type"] == "CHANNEL":
target_user = await fetch_channel_dict(interaction, case_dict["target_id"])
if target_user["mention"]:
target_name = target_user["mention"]
else: else:
target_name = f"`{target_user['name']}`" target_name = f"`{target_user['name']}`"
moderator_user = await fetch_user_dict(interaction, case_dict['moderator_id']) moderator_user = await fetch_user_dict(interaction, case_dict["moderator_id"])
moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`" moderator_name = (
f"`{moderator_user['name']}`"
if moderator_user["discriminator"] == "0"
else f"`{moderator_user['name']}#{moderator_user['discriminator']}`"
)
embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,}", color=await interaction.client.get_embed_color(interaction.channel)) embed = Embed(
title=f"📕 Case #{case_dict['moderation_id']:,}",
color=await interaction.client.get_embed_color(interaction.channel),
)
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>" embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
if case_dict['duration'] != 'NULL': if case_dict["duration"] != "NULL":
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) td = timedelta(
embed.description = embed.description + f"\n**Duration:** {humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>" **{
unit: int(val)
for unit, val in zip(
["hours", "minutes", "seconds"],
case_dict["duration"].split(":"),
)
}
)
embed.description = (
embed.description
+ f"\n**Duration:** {humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>"
)
embed.add_field(name='Reason', value=box(case_dict['reason']), inline=False) embed.add_field(name="Reason", value=box(case_dict["reason"]), inline=False)
return embed return embed
async def case_factory(interaction: Interaction, case_dict: dict) -> Embed: async def case_factory(interaction: Interaction, case_dict: dict) -> Embed:
"""This function creates a case embed from set parameters. """This function creates a case embed from set parameters.
@ -125,42 +219,79 @@ async def case_factory(interaction: Interaction, case_dict: dict) -> Embed:
interaction (Interaction): The interaction object. interaction (Interaction): The interaction object.
case_dict (dict): The case dictionary. case_dict (dict): The case dictionary.
""" """
if case_dict['target_type'] == 'USER': if case_dict["target_type"] == "USER":
target_user = await fetch_user_dict(interaction, case_dict['target_id']) target_user = await fetch_user_dict(interaction, case_dict["target_id"])
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" target_name = (
elif case_dict['target_type'] == 'CHANNEL': f"`{target_user['name']}`"
target_user = await fetch_channel_dict(interaction, case_dict['target_id']) if target_user["discriminator"] == "0"
if target_user['mention']: else f"`{target_user['name']}#{target_user['discriminator']}`"
)
elif case_dict["target_type"] == "CHANNEL":
target_user = await fetch_channel_dict(interaction, case_dict["target_id"])
if target_user["mention"]:
target_name = f"{target_user['mention']}" target_name = f"{target_user['mention']}"
else: else:
target_name = f"`{target_user['name']}`" target_name = f"`{target_user['name']}`"
moderator_user = await fetch_user_dict(interaction, case_dict['moderator_id']) moderator_user = await fetch_user_dict(interaction, case_dict["moderator_id"])
moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`" moderator_name = (
f"`{moderator_user['name']}`"
if moderator_user["discriminator"] == "0"
else f"`{moderator_user['name']}#{moderator_user['discriminator']}`"
)
embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,}", color=await interaction.client.get_embed_color(interaction.channel)) embed = Embed(
title=f"📕 Case #{case_dict['moderation_id']:,}",
color=await interaction.client.get_embed_color(interaction.channel),
)
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Resolved:** {bool(case_dict['resolved'])}\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>" embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Resolved:** {bool(case_dict['resolved'])}\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
if case_dict['duration'] != 'NULL': if case_dict["duration"] != "NULL":
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) td = timedelta(
duration_embed = f"{humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>" if bool(case_dict['expired']) is False else str(humanize.precisedelta(td)) **{
unit: int(val)
for unit, val in zip(
["hours", "minutes", "seconds"], case_dict["duration"].split(":")
)
}
)
duration_embed = (
f"{humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>"
if bool(case_dict["expired"]) is False
else str(humanize.precisedelta(td))
)
embed.description += f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}" embed.description += f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}"
embed.description += f"\n**Changes:** {len(case_dict['changes']) - 1}" if case_dict['changes'] else "\n**Changes:** 0" embed.description += (
f"\n**Changes:** {len(case_dict['changes']) - 1}"
if case_dict["changes"]
else "\n**Changes:** 0"
)
if case_dict['metadata']: if case_dict["metadata"]:
if case_dict['metadata']['imported_from']: if case_dict["metadata"]["imported_from"]:
embed.description += f"\n**Imported From:** {case_dict['metadata']['imported_from']}" embed.description += (
f"\n**Imported From:** {case_dict['metadata']['imported_from']}"
)
embed.add_field(name='Reason', value=box(case_dict['reason']), inline=False) embed.add_field(name="Reason", value=box(case_dict["reason"]), inline=False)
if case_dict['resolved'] == 1: if case_dict["resolved"] == 1:
resolved_user = await fetch_user_dict(interaction, case_dict['resolved_by']) resolved_user = await fetch_user_dict(interaction, case_dict["resolved_by"])
resolved_name = f"`{resolved_user['name']}`" if resolved_user['discriminator'] == "0" else f"`{resolved_user['name']}#{resolved_user['discriminator']}`" resolved_name = (
embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n{box(case_dict['resolve_reason'])}", inline=False) f"`{resolved_user['name']}`"
if resolved_user["discriminator"] == "0"
else f"`{resolved_user['name']}#{resolved_user['discriminator']}`"
)
embed.add_field(
name="Resolve Reason",
value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n{box(case_dict['resolve_reason'])}",
inline=False,
)
return embed return embed
async def changes_factory(interaction: Interaction, case_dict: dict) -> Embed: async def changes_factory(interaction: Interaction, case_dict: dict) -> Embed:
"""This function creates a changes embed from set parameters. """This function creates a changes embed from set parameters.
@ -168,34 +299,56 @@ async def changes_factory(interaction: Interaction, case_dict: dict) -> Embed:
interaction (Interaction): The interaction object. interaction (Interaction): The interaction object.
case_dict (dict): The case dictionary. case_dict (dict): The case dictionary.
""" """
embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,} Changes", color=await interaction.client.get_embed_color(interaction.channel)) embed = Embed(
title=f"📕 Case #{case_dict['moderation_id']:,} Changes",
color=await interaction.client.get_embed_color(interaction.channel),
)
memory_dict = {} memory_dict = {}
if case_dict['changes']: if case_dict["changes"]:
for change in case_dict['changes']: for change in case_dict["changes"]:
if change['user_id'] not in memory_dict: if change["user_id"] not in memory_dict:
memory_dict[str(change['user_id'])] = await fetch_user_dict(interaction, change['user_id']) memory_dict[str(change["user_id"])] = await fetch_user_dict(
interaction, change["user_id"]
)
user = memory_dict[str(change['user_id'])] user = memory_dict[str(change["user_id"])]
name = user['name'] if user['discriminator'] == "0" else f"{user['name']}#{user['discriminator']}" name = (
user["name"]
if user["discriminator"] == "0"
else f"{user['name']}#{user['discriminator']}"
)
timestamp = f"<t:{change['timestamp']}> | <t:{change['timestamp']}:R>" timestamp = f"<t:{change['timestamp']}> | <t:{change['timestamp']}:R>"
if change['type'] == 'ORIGINAL': if change["type"] == "ORIGINAL":
embed.add_field(name='Original', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False) embed.add_field(
name="Original",
value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}",
inline=False,
)
elif change['type'] == 'EDIT': elif change["type"] == "EDIT":
embed.add_field(name='Edit', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False) embed.add_field(
name="Edit",
value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}",
inline=False,
)
elif change['type'] == 'RESOLVE': elif change["type"] == "RESOLVE":
embed.add_field(name='Resolve', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False) embed.add_field(
name="Resolve",
value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}",
inline=False,
)
else: else:
embed.description = "*No changes have been made to this case.* 🙁" embed.description = "*No changes have been made to this case.* 🙁"
return embed return embed
async def evidenceformat_factory(interaction: Interaction, case_dict: dict) -> str: async def evidenceformat_factory(interaction: Interaction, case_dict: dict) -> str:
"""This function creates a codeblock in evidence format from set parameters. """This function creates a codeblock in evidence format from set parameters.
@ -203,42 +356,49 @@ async def evidenceformat_factory(interaction: Interaction, case_dict: dict) -> s
interaction (Interaction): The interaction object. interaction (Interaction): The interaction object.
case_dict (dict): The case dictionary. case_dict (dict): The case dictionary.
""" """
if case_dict['target_type'] == 'USER': if case_dict["target_type"] == "USER":
target_user = await fetch_user_dict(interaction, case_dict['target_id']) target_user = await fetch_user_dict(interaction, case_dict["target_id"])
target_name = target_user['name'] if target_user['discriminator'] == "0" else f"{target_user['name']}#{target_user['discriminator']}" target_name = (
target_user["name"]
if target_user["discriminator"] == "0"
else f"{target_user['name']}#{target_user['discriminator']}"
)
elif case_dict['target_type'] == 'CHANNEL': elif case_dict["target_type"] == "CHANNEL":
target_user = await fetch_channel_dict(interaction, case_dict['target_id']) target_user = await fetch_channel_dict(interaction, case_dict["target_id"])
target_name = target_user['name'] target_name = target_user["name"]
moderator_user = await fetch_user_dict(interaction, case_dict['moderator_id']) moderator_user = await fetch_user_dict(interaction, case_dict["moderator_id"])
moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}" moderator_name = (
moderator_user["name"]
if moderator_user["discriminator"] == "0"
else f"{moderator_user['name']}#{moderator_user['discriminator']}"
)
content = f"Case: {case_dict['moderation_id']:,} ({str.title(case_dict['moderation_type'])})\nTarget: {target_name} ({target_user['id']})\nModerator: {moderator_name} ({moderator_user['id']})" content = f"Case: {case_dict['moderation_id']:,} ({str.title(case_dict['moderation_type'])})\nTarget: {target_name} ({target_user['id']})\nModerator: {moderator_name} ({moderator_user['id']})"
if case_dict['duration'] != 'NULL': if case_dict["duration"] != "NULL":
hours, minutes, seconds = map(int, case_dict['duration'].split(':')) hours, minutes, seconds = map(int, case_dict["duration"].split(":"))
td = timedelta(hours=hours, minutes=minutes, seconds=seconds) td = timedelta(hours=hours, minutes=minutes, seconds=seconds)
content += f"\nDuration: {humanize.precisedelta(td)}" content += f"\nDuration: {humanize.precisedelta(td)}"
content += f"\nReason: {case_dict['reason']}" content += f"\nReason: {case_dict['reason']}"
return box(content, 'prolog') return box(content, "prolog")
######################################################################################################################## ########################################################################################################################
### Configuration Embeds # ### Configuration Embeds #
######################################################################################################################## ########################################################################################################################
async def _config(ctx: commands.Context) -> Embed: async def _config(ctx: commands.Context) -> Embed:
"""Generates the core embed for configuration menus to use.""" """Generates the core embed for configuration menus to use."""
e = Embed( e = Embed(title="Aurora Configuration Menu", color=await ctx.embed_color())
title="Aurora Configuration Menu",
color=await ctx.embed_color()
)
e.set_thumbnail(url=ctx.bot.user.display_avatar.url) e.set_thumbnail(url=ctx.bot.user.display_avatar.url)
return e return e
async def overrides_embed(ctx: commands.Context) -> Embed: async def overrides_embed(ctx: commands.Context) -> Embed:
"""Generates a configuration menu embed for a user's overrides.""" """Generates a configuration menu embed for a user's overrides."""
@ -247,32 +407,44 @@ async def overrides_embed(ctx: commands.Context) -> Embed:
"inline": await config.user(ctx.author).history_inline(), "inline": await config.user(ctx.author).history_inline(),
"inline_pagesize": await config.user(ctx.author).history_inline_pagesize(), "inline_pagesize": await config.user(ctx.author).history_inline_pagesize(),
"pagesize": await config.user(ctx.author).history_pagesize(), "pagesize": await config.user(ctx.author).history_pagesize(),
"auto_evidenceformat": await config.user(ctx.author).auto_evidenceformat() "auto_evidenceformat": await config.user(ctx.author).auto_evidenceformat(),
} }
override_str = [ override_str = [
'- ' + bold("Auto Evidence Format: ") + get_bool_emoji(override_settings['auto_evidenceformat']), "- "
'- ' + bold("Ephemeral: ") + get_bool_emoji(override_settings['ephemeral']), + bold("Auto Evidence Format: ")
'- ' + bold("History Inline: ") + get_bool_emoji(override_settings['inline']), + get_bool_emoji(override_settings["auto_evidenceformat"]),
'- ' + bold("History Inline Pagesize: ") + get_pagesize_str(override_settings['inline_pagesize']), "- " + bold("Ephemeral: ") + get_bool_emoji(override_settings["ephemeral"]),
'- ' + bold("History Pagesize: ") + get_pagesize_str(override_settings['pagesize']), "- " + bold("History Inline: ") + get_bool_emoji(override_settings["inline"]),
"- "
+ bold("History Inline Pagesize: ")
+ get_pagesize_str(override_settings["inline_pagesize"]),
"- "
+ bold("History Pagesize: ")
+ get_pagesize_str(override_settings["pagesize"]),
] ]
override_str = '\n'.join(override_str) override_str = "\n".join(override_str)
e = await _config(ctx) e = await _config(ctx)
e.title += ": User Overrides" e.title += ": User Overrides"
e.description = """ e.description = (
"""
Use the buttons below to manage your user overrides. Use the buttons below to manage your user overrides.
These settings will override the relevant guild settings.\n These settings will override the relevant guild settings.\n
""" + override_str """
+ override_str
)
return e return e
async def guild_embed(ctx: commands.Context) -> Embed: async def guild_embed(ctx: commands.Context) -> Embed:
"""Generates a configuration menu field value for a guild's settings.""" """Generates a configuration menu field value for a guild's settings."""
guild_settings = { guild_settings = {
"show_moderator": await config.guild(ctx.guild).show_moderator(), "show_moderator": await config.guild(ctx.guild).show_moderator(),
"use_discord_permissions": await config.guild(ctx.guild).use_discord_permissions(), "use_discord_permissions": await config.guild(
ctx.guild
).use_discord_permissions(),
"ignore_modlog": await config.guild(ctx.guild).ignore_modlog(), "ignore_modlog": await config.guild(ctx.guild).ignore_modlog(),
"ignore_other_bots": await config.guild(ctx.guild).ignore_other_bots(), "ignore_other_bots": await config.guild(ctx.guild).ignore_other_bots(),
"dm_users": await config.guild(ctx.guild).dm_users(), "dm_users": await config.guild(ctx.guild).dm_users(),
@ -280,54 +452,83 @@ async def guild_embed(ctx: commands.Context) -> Embed:
"history_ephemeral": await config.guild(ctx.guild).history_ephemeral(), "history_ephemeral": await config.guild(ctx.guild).history_ephemeral(),
"history_inline": await config.guild(ctx.guild).history_inline(), "history_inline": await config.guild(ctx.guild).history_inline(),
"history_pagesize": await config.guild(ctx.guild).history_pagesize(), "history_pagesize": await config.guild(ctx.guild).history_pagesize(),
"history_inline_pagesize": await config.guild(ctx.guild).history_inline_pagesize(), "history_inline_pagesize": await config.guild(
ctx.guild
).history_inline_pagesize(),
"auto_evidenceformat": await config.guild(ctx.guild).auto_evidenceformat(), "auto_evidenceformat": await config.guild(ctx.guild).auto_evidenceformat(),
} }
channel = ctx.guild.get_channel(guild_settings['log_channel']) channel = ctx.guild.get_channel(guild_settings["log_channel"])
if channel is None: if channel is None:
channel = warning("Not Set") channel = warning("Not Set")
else: else:
channel = channel.mention channel = channel.mention
guild_str = [ guild_str = [
'- '+ bold("Show Moderator: ") + get_bool_emoji(guild_settings['show_moderator']), "- "
'- '+ bold("Use Discord Permissions: ") + get_bool_emoji(guild_settings['use_discord_permissions']), + bold("Show Moderator: ")
'- '+ bold("Ignore Modlog: ") + get_bool_emoji(guild_settings['ignore_modlog']), + get_bool_emoji(guild_settings["show_moderator"]),
'- '+ bold("Ignore Other Bots: ") + get_bool_emoji(guild_settings['ignore_other_bots']), "- "
'- '+ bold("DM Users: ") + get_bool_emoji(guild_settings['dm_users']), + bold("Use Discord Permissions: ")
'- '+ bold("Auto Evidence Format: ") + get_bool_emoji(guild_settings['auto_evidenceformat']), + get_bool_emoji(guild_settings["use_discord_permissions"]),
'- '+ bold("Ephemeral: ") + get_bool_emoji(guild_settings['history_ephemeral']), "- "
'- '+ bold("History Inline: ") + get_bool_emoji(guild_settings['history_inline']), + bold("Ignore Modlog: ")
'- '+ bold("History Pagesize: ") + get_pagesize_str(guild_settings['history_pagesize']), + get_bool_emoji(guild_settings["ignore_modlog"]),
'- '+ bold("History Inline Pagesize: ") + get_pagesize_str(guild_settings['history_inline_pagesize']), "- "
'- '+ bold("Log Channel: ") + channel + bold("Ignore Other Bots: ")
+ get_bool_emoji(guild_settings["ignore_other_bots"]),
"- " + bold("DM Users: ") + get_bool_emoji(guild_settings["dm_users"]),
"- "
+ bold("Auto Evidence Format: ")
+ get_bool_emoji(guild_settings["auto_evidenceformat"]),
"- "
+ bold("Ephemeral: ")
+ get_bool_emoji(guild_settings["history_ephemeral"]),
"- "
+ bold("History Inline: ")
+ get_bool_emoji(guild_settings["history_inline"]),
"- "
+ bold("History Pagesize: ")
+ get_pagesize_str(guild_settings["history_pagesize"]),
"- "
+ bold("History Inline Pagesize: ")
+ get_pagesize_str(guild_settings["history_inline_pagesize"]),
"- " + bold("Log Channel: ") + channel,
] ]
guild_str = '\n'.join(guild_str) guild_str = "\n".join(guild_str)
e = await _config(ctx) e = await _config(ctx)
e.title += ": Server Configuration" e.title += ": Server Configuration"
e.description = """ e.description = (
"""
Use the buttons below to manage Aurora's server configuration.\n Use the buttons below to manage Aurora's server configuration.\n
""" + guild_str """
+ guild_str
)
return e return e
async def addrole_embed(ctx: commands.Context) -> Embed: async def addrole_embed(ctx: commands.Context) -> Embed:
"""Generates a configuration menu field value for a guild's addrole whitelist.""" """Generates a configuration menu field value for a guild's addrole whitelist."""
whitelist = await config.guild(ctx.guild).addrole_whitelist() whitelist = await config.guild(ctx.guild).addrole_whitelist()
if whitelist: if whitelist:
whitelist = [ctx.guild.get_role(role).mention or error(f"`{role}` (Not Found)") for role in whitelist] whitelist = [
whitelist = '\n'.join(whitelist) ctx.guild.get_role(role).mention or error(f"`{role}` (Not Found)")
for role in whitelist
]
whitelist = "\n".join(whitelist)
else: else:
whitelist = warning("No roles are on the addrole whitelist!") whitelist = warning("No roles are on the addrole whitelist!")
e = await _config(ctx) e = await _config(ctx)
e.title += ": Addrole Whitelist" e.title += ": Addrole Whitelist"
e.description = "Use the select menu below to manage this guild's addrole whitelist." e.description = (
"Use the select menu below to manage this guild's addrole whitelist."
)
if len(whitelist) > 4000 and len(whitelist) < 5000: if len(whitelist) > 4000 and len(whitelist) < 5000:
lines = whitelist.split('\n') lines = whitelist.split("\n")
chunks = [] chunks = []
chunk = "" chunk = ""
for line in lines: for line in lines:
@ -335,23 +536,27 @@ async def addrole_embed(ctx: commands.Context) -> Embed:
chunks.append(chunk) chunks.append(chunk)
chunk = line chunk = line
else: else:
chunk += '\n' + line if chunk else line chunk += "\n" + line if chunk else line
chunks.append(chunk) chunks.append(chunk)
for chunk in chunks: for chunk in chunks:
e.add_field(name="", value=chunk) e.add_field(name="", value=chunk)
else: else:
e.description += '\n\n' + whitelist e.description += "\n\n" + whitelist
return e return e
async def immune_embed(ctx: commands.Context) -> Embed: async def immune_embed(ctx: commands.Context) -> Embed:
"""Generates a configuration menu field value for a guild's immune roles.""" """Generates a configuration menu field value for a guild's immune roles."""
immune_roles = await config.guild(ctx.guild).immune_roles() immune_roles = await config.guild(ctx.guild).immune_roles()
if immune_roles: if immune_roles:
immune_str = [ctx.guild.get_role(role).mention or error(f"`{role}` (Not Found)") for role in immune_roles] immune_str = [
immune_str = '\n'.join(immune_str) ctx.guild.get_role(role).mention or error(f"`{role}` (Not Found)")
for role in immune_roles
]
immune_str = "\n".join(immune_str)
else: else:
immune_str = warning("No roles are set as immune roles!") immune_str = warning("No roles are set as immune roles!")
@ -360,7 +565,7 @@ async def immune_embed(ctx: commands.Context) -> Embed:
e.description = "Use the select menu below to manage this guild's immune roles." e.description = "Use the select menu below to manage this guild's immune roles."
if len(immune_str) > 4000 and len(immune_str) < 5000: if len(immune_str) > 4000 and len(immune_str) < 5000:
lines = immune_str.split('\n') lines = immune_str.split("\n")
chunks = [] chunks = []
chunk = "" chunk = ""
for line in lines: for line in lines:
@ -368,12 +573,12 @@ async def immune_embed(ctx: commands.Context) -> Embed:
chunks.append(chunk) chunks.append(chunk)
chunk = line chunk = line
else: else:
chunk += '\n' + line if chunk else line chunk += "\n" + line if chunk else line
chunks.append(chunk) chunks.append(chunk)
for chunk in chunks: for chunk in chunks:
e.add_field(name="", value=chunk) e.add_field(name="", value=chunk)
else: else:
e.description += '\n\n' + immune_str e.description += "\n\n" + immune_str
return e return e

View file

@ -4,7 +4,7 @@ import json
from datetime import timedelta as td from datetime import timedelta as td
from typing import Union from typing import Union
from discord import Guild, Interaction, Member, User, SelectOption from discord import Guild, Interaction, Member, SelectOption, User
from discord.errors import Forbidden, NotFound from discord.errors import Forbidden, NotFound
from redbot.core import commands from redbot.core import commands
from redbot.core.utils.chat_formatting import error from redbot.core.utils.chat_formatting import error
@ -46,7 +46,9 @@ async def check_moddable(
"""Checks if a moderator can moderate a target.""" """Checks if a moderator can moderate a target."""
if check_permissions(interaction.client.user, permissions, guild=interaction.guild): if check_permissions(interaction.client.user, permissions, guild=interaction.guild):
await interaction.response.send_message( await interaction.response.send_message(
error(f"I do not have the `{permissions}` permission, required for this action."), error(
f"I do not have the `{permissions}` permission, required for this action."
),
ephemeral=True, ephemeral=True,
) )
return False return False
@ -54,7 +56,9 @@ async def check_moddable(
if await config.guild(interaction.guild).use_discord_permissions() is True: if await config.guild(interaction.guild).use_discord_permissions() is True:
if check_permissions(interaction.user, permissions, guild=interaction.guild): if check_permissions(interaction.user, permissions, guild=interaction.guild):
await interaction.response.send_message( await interaction.response.send_message(
error(f"You do not have the `{permissions}` permission, required for this action."), error(
f"You do not have the `{permissions}` permission, required for this action."
),
ephemeral=True, ephemeral=True,
) )
return False return False
@ -74,7 +78,9 @@ async def check_moddable(
if isinstance(target, Member): if isinstance(target, Member):
if interaction.user.top_role <= target.top_role: if interaction.user.top_role <= target.top_role:
await interaction.response.send_message( await interaction.response.send_message(
content=error("You cannot moderate members with a higher role than you!"), content=error(
"You cannot moderate members with a higher role than you!"
),
ephemeral=True, ephemeral=True,
) )
return False return False
@ -84,7 +90,9 @@ async def check_moddable(
<= target.top_role <= target.top_role
): ):
await interaction.response.send_message( await interaction.response.send_message(
content=error("You cannot moderate members with a role higher than the bot!"), content=error(
"You cannot moderate members with a role higher than the bot!"
),
ephemeral=True, ephemeral=True,
) )
return False return False
@ -102,7 +110,7 @@ async def check_moddable(
return True return True
async def get_next_case_number(guild_id: str, cursor = None) -> int: async def get_next_case_number(guild_id: str, cursor=None) -> int:
"""This function returns the next case number from the MySQL table for a specific guild.""" """This function returns the next case number from the MySQL table for a specific guild."""
from .database import connect from .database import connect
@ -133,7 +141,7 @@ def generate_dict(result):
"resolve_reason": result[12], "resolve_reason": result[12],
"expired": result[13], "expired": result[13],
"changes": json.loads(result[14]), "changes": json.loads(result[14]),
"metadata": json.loads(result[15]) "metadata": json.loads(result[15]),
} }
return case return case
@ -172,7 +180,11 @@ async def fetch_channel_dict(interaction: Interaction, channel_id: str):
if not channel: if not channel:
channel = await interaction.guild.fetch_channel(channel_id) channel = await interaction.guild.fetch_channel(channel_id)
channel_dict = {"id": channel.id, "name": channel.name, "mention": channel.mention} channel_dict = {
"id": channel.id,
"name": channel.name,
"mention": channel.mention,
}
except NotFound: except NotFound:
channel_dict = {"id": channel_id, "name": "Deleted Channel", "mention": None} channel_dict = {"id": channel_id, "name": "Deleted Channel", "mention": None}
@ -202,25 +214,31 @@ async def log(interaction: Interaction, moderation_id: int, resolved: bool = Fal
case = await fetch_case(moderation_id, interaction.guild.id) case = await fetch_case(moderation_id, interaction.guild.id)
if case: if case:
embed = await log_factory(interaction=interaction, case_dict=case, resolved=resolved) embed = await log_factory(
interaction=interaction, case_dict=case, resolved=resolved
)
try: try:
await logging_channel.send(embed=embed) await logging_channel.send(embed=embed)
except Forbidden: except Forbidden:
return return
async def send_evidenceformat(interaction: Interaction, case_dict: dict): async def send_evidenceformat(interaction: Interaction, case_dict: dict):
"""This function sends an ephemeral message to the moderator who took the moderation action, with a pre-made codeblock for use in the mod-evidence channel.""" """This function sends an ephemeral message to the moderator who took the moderation action, with a pre-made codeblock for use in the mod-evidence channel."""
from .factory import evidenceformat_factory from .factory import evidenceformat_factory
send_evidence_bool = (await config.user(interaction.user).auto_evidenceformat() send_evidence_bool = (
await config.user(interaction.user).auto_evidenceformat()
or await config.guild(interaction.guild).auto_evidenceformat() or await config.guild(interaction.guild).auto_evidenceformat()
or False) or False
)
if send_evidence_bool is False: if send_evidence_bool is False:
return return
content = await evidenceformat_factory(interaction=interaction, case_dict=case_dict) content = await evidenceformat_factory(interaction=interaction, case_dict=case_dict)
await interaction.followup.send(content=content, ephemeral=True) await interaction.followup.send(content=content, ephemeral=True)
def convert_timedelta_to_str(timedelta: td) -> str: def convert_timedelta_to_str(timedelta: td) -> str:
"""This function converts a timedelta object to a string.""" """This function converts a timedelta object to a string."""
total_seconds = int(timedelta.total_seconds()) total_seconds = int(timedelta.total_seconds())
@ -229,6 +247,7 @@ def convert_timedelta_to_str(timedelta: td) -> str:
seconds = total_seconds % 60 seconds = total_seconds % 60
return f"{hours}:{minutes}:{seconds}" return f"{hours}:{minutes}:{seconds}"
def get_bool_emoji(value: bool) -> str: def get_bool_emoji(value: bool) -> str:
"""Returns a unicode emoji based on a boolean value.""" """Returns a unicode emoji based on a boolean value."""
if value is True: if value is True:
@ -237,12 +256,14 @@ def get_bool_emoji(value: bool) -> str:
return "\N{NO ENTRY SIGN}" return "\N{NO ENTRY SIGN}"
return "\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}" return "\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}"
def get_pagesize_str(value: Union[int, None]) -> str: def get_pagesize_str(value: Union[int, None]) -> str:
"""Returns a string based on a pagesize value.""" """Returns a string based on a pagesize value."""
if value is None: if value is None:
return "\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}" return "\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}"
return str(value) + " cases per page" return str(value) + " cases per page"
def create_pagesize_options() -> list[SelectOption]: def create_pagesize_options() -> list[SelectOption]:
"""Returns a list of SelectOptions for pagesize configuration.""" """Returns a list of SelectOptions for pagesize configuration."""
options = [] options = []

View file

@ -6,16 +6,17 @@
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_| # |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
import contextlib import contextlib
import logging
import json import json
import logging
import re import re
from redbot.core import commands
from redbot.core.bot import Red
from redbot.cogs.downloader import errors from redbot.cogs.downloader import errors
from redbot.cogs.downloader.converters import InstalledCog from redbot.cogs.downloader.converters import InstalledCog
from redbot.core import commands
from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import error, text_to_file from redbot.core.utils.chat_formatting import error, text_to_file
class Backup(commands.Cog): class Backup(commands.Cog):
"""A utility to make reinstalling repositories and cogs after migrating the bot far easier.""" """A utility to make reinstalling repositories and cogs after migrating the bot far easier."""
@ -32,16 +33,22 @@ class Backup(commands.Cog):
async def backup(self, ctx: commands.Context): async def backup(self, ctx: commands.Context):
"""Backup your installed cogs.""" """Backup your installed cogs."""
@backup.command(name='export') @backup.command(name="export")
@commands.is_owner() @commands.is_owner()
async def backup_export(self, ctx: commands.Context): async def backup_export(self, ctx: commands.Context):
"""Export your installed repositories and cogs to a file.""" """Export your installed repositories and cogs to a file."""
downloader = ctx.bot.get_cog("Downloader") downloader = ctx.bot.get_cog("Downloader")
if downloader is None: if downloader is None:
await ctx.send(error(f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again.")) await ctx.send(
error(
f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again."
)
)
return return
all_repos = list(downloader._repo_manager.repos) # pylint: disable=protected-access all_repos = list(
downloader._repo_manager.repos
) # pylint: disable=protected-access
export_data = [] export_data = []
@ -50,7 +57,7 @@ class Backup(commands.Cog):
"name": repo.name, "name": repo.name,
"url": repo.url, "url": repo.url,
"branch": repo.branch, "branch": repo.branch,
"cogs": [] "cogs": [],
} }
cogs = await downloader.installed_cogs() cogs = await downloader.installed_cogs()
@ -59,19 +66,21 @@ class Backup(commands.Cog):
if cog.repo_name == repo.name: if cog.repo_name == repo.name:
cog_dict = { cog_dict = {
"name": cog.name, "name": cog.name,
# "loaded": cog.name in ctx.bot.extensions.keys(), # "loaded": cog.name in ctx.bot.extensions.keys(),
# this functionality was planned but never implemented due to Red limitations # this functionality was planned but never implemented due to Red limitations
# and the possibility of restoration functionality being added to Core # and the possibility of restoration functionality being added to Core
"pinned": cog.pinned, "pinned": cog.pinned,
"commit": cog.commit "commit": cog.commit,
} }
repo_dict["cogs"].append(cog_dict) repo_dict["cogs"].append(cog_dict)
export_data.append(repo_dict) export_data.append(repo_dict)
await ctx.send(file=text_to_file(json.dumps(export_data, indent=4), 'backup.json')) await ctx.send(
file=text_to_file(json.dumps(export_data, indent=4), "backup.json")
)
@backup.command(name='import') @backup.command(name="import")
@commands.is_owner() @commands.is_owner()
async def backup_import(self, ctx: commands.Context): async def backup_import(self, ctx: commands.Context):
"""Import your installed repositories and cogs from an export file.""" """Import your installed repositories and cogs from an export file."""
@ -83,7 +92,11 @@ class Backup(commands.Cog):
downloader = ctx.bot.get_cog("Downloader") downloader = ctx.bot.get_cog("Downloader")
if downloader is None: if downloader is None:
await ctx.send(error(f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again.")) await ctx.send(
error(
f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again."
)
)
return return
repo_s = [] repo_s = []
@ -96,29 +109,41 @@ class Backup(commands.Cog):
async with ctx.typing(): async with ctx.typing():
for repo in export: for repo in export:
# Most of this code is from the Downloader cog. # Most of this code is from the Downloader cog.
name = repo['name'] name = repo["name"]
branch = repo['branch'] branch = repo["branch"]
url = repo['url'] url = repo["url"]
cogs = repo['cogs'] cogs = repo["cogs"]
if 'PyLav/Red-Cogs' in url: if "PyLav/Red-Cogs" in url:
repo_e.append("PyLav cogs are not supported.") repo_e.append("PyLav cogs are not supported.")
continue continue
if name.startswith('.') or name.endswith('.'): if name.startswith(".") or name.endswith("."):
repo_e.append(f"Invalid repository name: {name}\nRepository names cannot start or end with a dot.") repo_e.append(
f"Invalid repository name: {name}\nRepository names cannot start or end with a dot."
)
continue continue
if re.match(r"^[a-zA-Z0-9_\-\.]+$", name) is None: if re.match(r"^[a-zA-Z0-9_\-\.]+$", name) is None:
repo_e.append(f"Invalid repository name: {name}\nRepository names may only contain letters, numbers, underscores, hyphens, and dots.") repo_e.append(
f"Invalid repository name: {name}\nRepository names may only contain letters, numbers, underscores, hyphens, and dots."
)
continue continue
try: try:
repository = await downloader._repo_manager.add_repo(url, name, branch) # pylint: disable=protected-access repository = await downloader._repo_manager.add_repo(
repo_s.append(f"Added repository {name} from {url} on branch {branch}.") url, name, branch
self.logger.debug("Added repository %s from %s on branch %s", name, url, branch) ) # pylint: disable=protected-access
repo_s.append(
f"Added repository {name} from {url} on branch {branch}."
)
self.logger.debug(
"Added repository %s from %s on branch %s", name, url, branch
)
except errors.ExistingGitRepo: except errors.ExistingGitRepo:
repo_e.append(f"Repository {name} already exists.") repo_e.append(f"Repository {name} already exists.")
repository = downloader._repo_manager.get_repo(name) # pylint: disable=protected-access repository = downloader._repo_manager.get_repo(
name
) # pylint: disable=protected-access
self.logger.debug("Repository %s already exists", name) self.logger.debug("Repository %s already exists", name)
# This is commented out because errors.AuthenticationError is not yet implemented in Red 3.5.5's Downloader cog. # This is commented out because errors.AuthenticationError is not yet implemented in Red 3.5.5's Downloader cog.
@ -134,7 +159,9 @@ class Backup(commands.Cog):
# continue # continue
except errors.CloningError as err: except errors.CloningError as err:
repo_e.append(f"Cloning error while adding repository {name}. See logs for more information.") repo_e.append(
f"Cloning error while adding repository {name}. See logs for more information."
)
self.logger.exception( self.logger.exception(
"Something went wrong whilst cloning %s (to revision %s)", "Something went wrong whilst cloning %s (to revision %s)",
url, url,
@ -144,21 +171,23 @@ class Backup(commands.Cog):
continue continue
except OSError: except OSError:
repo_e.append(f"OS error while adding repository {name}. See logs for more information.") repo_e.append(
f"OS error while adding repository {name}. See logs for more information."
)
self.logger.exception( self.logger.exception(
"Something went wrong trying to add repo %s under name %s", "Something went wrong trying to add repo %s under name %s",
url, url,
name name,
) )
continue continue
cog_modules = [] cog_modules = []
for cog in cogs: for cog in cogs:
# If you're forking this cog, make sure to change these strings! # If you're forking this cog, make sure to change these strings!
if cog['name'] == "backup" and 'SeaswimmerTheFsh/SeaCogs' in url: if cog["name"] == "backup" and "SeaswimmerTheFsh/SeaCogs" in url:
continue continue
try: try:
cog_module = await InstalledCog.convert(ctx, cog['name']) cog_module = await InstalledCog.convert(ctx, cog["name"])
except commands.BadArgument: except commands.BadArgument:
uninstall_e.append(f"Failed to uninstall {cog['name']}") uninstall_e.append(f"Failed to uninstall {cog['name']}")
continue continue
@ -170,42 +199,67 @@ class Backup(commands.Cog):
with contextlib.suppress(commands.ExtensionNotLoaded): with contextlib.suppress(commands.ExtensionNotLoaded):
await ctx.bot.unload_extension(cog) await ctx.bot.unload_extension(cog)
await ctx.bot.remove_loaded_package(cog) await ctx.bot.remove_loaded_package(cog)
await downloader._delete_cog(poss_installed_path) # pylint: disable=protected-access await downloader._delete_cog(
poss_installed_path
) # pylint: disable=protected-access
uninstall_s.append(f"Uninstalled {cog}") uninstall_s.append(f"Uninstalled {cog}")
self.logger.debug("Uninstalled %s", cog) self.logger.debug("Uninstalled %s", cog)
else: else:
uninstall_e.append(f"Failed to uninstall {cog}") uninstall_e.append(f"Failed to uninstall {cog}")
self.logger.warning("Failed to uninstall %s", cog) self.logger.warning("Failed to uninstall %s", cog)
await downloader._remove_from_installed(cog_modules) # pylint: disable=protected-access await downloader._remove_from_installed(
cog_modules
) # pylint: disable=protected-access
for cog in cogs: for cog in cogs:
cog_name = cog['name'] cog_name = cog["name"]
cog_pinned = cog['pinned'] cog_pinned = cog["pinned"]
if cog_pinned: if cog_pinned:
commit = cog['commit'] commit = cog["commit"]
else: else:
commit = None commit = None
# If you're forking this cog, make sure to change these strings! # If you're forking this cog, make sure to change these strings!
if cog_name == 'backup' and 'SeaswimmerTheFsh/SeaCogs' in url: if cog_name == "backup" and "SeaswimmerTheFsh/SeaCogs" in url:
continue continue
async with repository.checkout(commit, exit_to_rev=repository.branch): async with repository.checkout(
cogs_c, message = await downloader._filter_incorrect_cogs_by_names(repository, [cog_name]) # pylint: disable=protected-access commit, exit_to_rev=repository.branch
):
cogs_c, message = (
await downloader._filter_incorrect_cogs_by_names(
repository, [cog_name]
)
) # pylint: disable=protected-access
if not cogs_c: if not cogs_c:
install_e.append(message) install_e.append(message)
self.logger.error(message) self.logger.error(message)
continue continue
failed_reqs = await downloader._install_requirements(cogs_c) # pylint: disable=protected-access failed_reqs = await downloader._install_requirements(
cogs_c
) # pylint: disable=protected-access
if failed_reqs: if failed_reqs:
install_e.append(f"Failed to install {cog_name} due to missing requirements: {failed_reqs}") install_e.append(
self.logger.error("Failed to install %s due to missing requirements: %s", cog_name, failed_reqs) f"Failed to install {cog_name} due to missing requirements: {failed_reqs}"
)
self.logger.error(
"Failed to install %s due to missing requirements: %s",
cog_name,
failed_reqs,
)
continue continue
installed_cogs, failed_cogs = await downloader._install_cogs(cogs_c) # pylint: disable=protected-access installed_cogs, failed_cogs = await downloader._install_cogs(
cogs_c
) # pylint: disable=protected-access
if repository.available_libraries: if repository.available_libraries:
installed_libs, failed_libs = await repository.install_libraries(target_dir=downloader.SHAREDLIB_PATH, req_target_dir=downloader.LIB_PATH) installed_libs, failed_libs = (
await repository.install_libraries(
target_dir=downloader.SHAREDLIB_PATH,
req_target_dir=downloader.LIB_PATH,
)
)
else: else:
installed_libs = None installed_libs = None
failed_libs = None failed_libs = None
@ -214,21 +268,41 @@ class Backup(commands.Cog):
for cog in installed_cogs: for cog in installed_cogs:
cog.pinned = True cog.pinned = True
await downloader._save_to_installed(installed_cogs + installed_libs if installed_libs else installed_cogs) # pylint: disable=protected-access await downloader._save_to_installed(
installed_cogs + installed_libs
if installed_libs
else installed_cogs
) # pylint: disable=protected-access
if installed_cogs: if installed_cogs:
installed_cog_name = installed_cogs[0].name installed_cog_name = installed_cogs[0].name
install_s.append(f"Installed {installed_cog_name}") install_s.append(f"Installed {installed_cog_name}")
self.logger.debug("Installed %s", installed_cog_name) self.logger.debug("Installed %s", installed_cog_name)
if installed_libs: if installed_libs:
for lib in installed_libs: for lib in installed_libs:
install_s.append(f"Installed {lib.name} required for {cog_name}") install_s.append(
self.logger.debug("Installed %s required for %s", lib.name, cog_name) f"Installed {lib.name} required for {cog_name}"
)
self.logger.debug(
"Installed %s required for %s", lib.name, cog_name
)
if failed_cogs: if failed_cogs:
failed_cog_name = failed_cogs[0].name failed_cog_name = failed_cogs[0].name
install_e.append(f"Failed to install {failed_cog_name}") install_e.append(f"Failed to install {failed_cog_name}")
self.logger.error("Failed to install %s", failed_cog_name) self.logger.error("Failed to install %s", failed_cog_name)
if failed_libs: if failed_libs:
for lib in failed_libs: for lib in failed_libs:
install_e.append(f"Failed to install {lib.name} required for {cog_name}") install_e.append(
self.logger.error("Failed to install %s required for %s", lib.name, cog_name) f"Failed to install {lib.name} required for {cog_name}"
await ctx.send("Import complete!", file=text_to_file(f"Repositories:\n{repo_s}\n\nRepository Errors:\n{repo_e}\n\nUninstalled Cogs:\n{uninstall_s}\n\nUninstalled Cogs Errors:\n{uninstall_e}\n\nInstalled Cogs:\n{install_s}\n\nInstalled Cogs Errors:\n{install_e}", 'backup.log')) )
self.logger.error(
"Failed to install %s required for %s",
lib.name,
cog_name,
)
await ctx.send(
"Import complete!",
file=text_to_file(
f"Repositories:\n{repo_s}\n\nRepository Errors:\n{repo_e}\n\nUninstalled Cogs:\n{uninstall_s}\n\nUninstalled Cogs Errors:\n{uninstall_e}\n\nInstalled Cogs:\n{install_s}\n\nInstalled Cogs Errors:\n{install_e}",
"backup.log",
),
)

View file

@ -24,7 +24,9 @@ class Nerdify(commands.Cog):
self.bot = bot self.bot = bot
@commands.command(aliases=["nerd"]) @commands.command(aliases=["nerd"])
async def nerdify(self, ctx: commands.Context, *, text: Optional[str] = None) -> None: async def nerdify(
self, ctx: commands.Context, *, text: Optional[str] = None
) -> None:
"""Nerdify the replied to message, previous message, or your own text.""" """Nerdify the replied to message, previous message, or your own text."""
if not text: if not text:
if hasattr(ctx.message, "reference") and ctx.message.reference: if hasattr(ctx.message, "reference") and ctx.message.reference:
@ -57,7 +59,7 @@ class Nerdify(commands.Cog):
Returns: Returns:
The converted text.""" The converted text."""
return f"\"{text}\" 🤓" return f'"{text}" 🤓'
async def type_message( async def type_message(
self, destination: discord.abc.Messageable, content: str, **kwargs: Any self, destination: discord.abc.Messageable, content: str, **kwargs: Any