Compare commits

..

4 commits

4 changed files with 93 additions and 78 deletions

View file

@ -221,7 +221,8 @@ class Aurora(commands.Cog):
if silent is False: if silent is False:
try: try:
embed = await message_factory( embed = await message_factory(
await self.bot.get_embed_color(interaction.channel), bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild, guild=interaction.guild,
moderator=interaction.user, moderator=interaction.user,
reason=reason, reason=reason,
@ -279,7 +280,8 @@ class Aurora(commands.Cog):
if silent is False: if silent is False:
try: try:
embed = await message_factory( embed = await message_factory(
await self.bot.get_embed_color(interaction.channel), bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild, guild=interaction.guild,
moderator=interaction.user, moderator=interaction.user,
reason=reason, reason=reason,
@ -374,7 +376,8 @@ class Aurora(commands.Cog):
if silent is False: if silent is False:
try: try:
embed = await message_factory( embed = await message_factory(
await self.bot.get_embed_color(interaction.channel), bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild, guild=interaction.guild,
moderator=interaction.user, moderator=interaction.user,
reason=reason, reason=reason,
@ -479,7 +482,8 @@ class Aurora(commands.Cog):
if silent is False: if silent is False:
try: try:
embed = await message_factory( embed = await message_factory(
await self.bot.get_embed_color(interaction.channel), bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild, guild=interaction.guild,
moderator=interaction.user, moderator=interaction.user,
reason=reason, reason=reason,
@ -575,7 +579,8 @@ class Aurora(commands.Cog):
if silent is False: if silent is False:
try: try:
embed = await message_factory( embed = await message_factory(
await self.bot.get_embed_color(interaction.channel), bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild, guild=interaction.guild,
moderator=interaction.user, moderator=interaction.user,
reason=reason, reason=reason,
@ -650,7 +655,8 @@ class Aurora(commands.Cog):
if silent is False: if silent is False:
try: try:
embed = await message_factory( embed = await message_factory(
await self.bot.get_embed_color(interaction.channel), bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild, guild=interaction.guild,
moderator=interaction.user, moderator=interaction.user,
reason=reason, reason=reason,
@ -708,7 +714,8 @@ class Aurora(commands.Cog):
if silent is False: if silent is False:
try: try:
embed = await message_factory( embed = await message_factory(
await self.bot.get_embed_color(interaction.channel), bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild, guild=interaction.guild,
moderator=interaction.user, moderator=interaction.user,
reason=reason, reason=reason,
@ -810,7 +817,8 @@ class Aurora(commands.Cog):
try: try:
embed = await message_factory( embed = await message_factory(
await self.bot.get_embed_color(interaction.channel), bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild, guild=interaction.guild,
moderator=interaction.user, moderator=interaction.user,
reason=reason, reason=reason,
@ -853,8 +861,9 @@ class Aurora(commands.Cog):
silent = not await config.guild(interaction.guild).dm_users() silent = not await config.guild(interaction.guild).dm_users()
if silent is False: if silent is False:
try: try:
embed = embed = await message_factory( embed = await message_factory(
await self.bot.get_embed_color(interaction.channel), bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild, guild=interaction.guild,
moderator=interaction.user, moderator=interaction.user,
reason=reason, reason=reason,
@ -936,7 +945,8 @@ class Aurora(commands.Cog):
if silent is False: if silent is False:
try: try:
embed = await message_factory( embed = await message_factory(
await self.bot.get_embed_color(interaction.channel), bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild, guild=interaction.guild,
moderator=interaction.user, moderator=interaction.user,
reason=reason, reason=reason,
@ -1089,7 +1099,7 @@ class Aurora(commands.Cog):
+ f"moderation_{interaction.guild.id}.json" + f"moderation_{interaction.guild.id}.json"
) )
cases = Moderation.get_all_cases(bot=interaction.client, guild_id=interaction.guild.id) cases = Moderation.get_latest(bot=interaction.client, guild_id=interaction.guild.id)
with open(filename, "w", encoding="utf-8") as f: with open(filename, "w", encoding="utf-8") as f:
dump(obj=cases, fp=f, indent=2) dump(obj=cases, fp=f, indent=2)
@ -1118,7 +1128,7 @@ class Aurora(commands.Cog):
elif moderator: elif moderator:
moderations = Moderation.find_by_moderator(interaction.client, interaction.guild.id, moderator.id) moderations = Moderation.find_by_moderator(interaction.client, interaction.guild.id, moderator.id)
else: else:
moderations = Moderation.get_all_cases(interaction.client, interaction.guild.id) moderations = Moderation.get_latest(interaction.client, interaction.guild.id)
case_quantity = len(moderations) case_quantity = len(moderations)
page_quantity = ceil(case_quantity / pagesize) page_quantity = ceil(case_quantity / pagesize)
@ -1191,7 +1201,7 @@ class Aurora(commands.Cog):
Reason for resolving case""" Reason for resolving case"""
permissions = check_permissions( permissions = check_permissions(
interaction.client.user, interaction.client.user,
["embed_links", "moderate_members", "ban_members"], ("embed_links", "moderate_members", "ban_members"),
interaction, interaction,
) )
if permissions: if permissions:
@ -1204,7 +1214,7 @@ class Aurora(commands.Cog):
return return
try: try:
moderation = Moderation.from_sql(interaction.client, case, interaction.guild.id) moderation = Moderation.find_by_id(interaction.client, case, interaction.guild.id)
except ValueError: except ValueError:
await interaction.response.send_message( await interaction.response.send_message(
content=error(f"Case #{case:,} does not exist!"), ephemeral=True content=error(f"Case #{case:,} does not exist!"), ephemeral=True
@ -1288,7 +1298,7 @@ class Aurora(commands.Cog):
) )
try: try:
mod = Moderation.from_sql(interaction.client, case, interaction.guild.id) mod = Moderation.find_by_id(interaction.client, case, interaction.guild.id)
except ValueError: except ValueError:
await interaction.response.send_message( await interaction.response.send_message(
content=error(f"Case #{case:,} does not exist!"), ephemeral=True content=error(f"Case #{case:,} does not exist!"), ephemeral=True
@ -1381,7 +1391,7 @@ class Aurora(commands.Cog):
return return
try: try:
moderation = Moderation.from_sql(interaction.client, case, interaction.guild.id) moderation = Moderation.find_by_id(interaction.client, case, interaction.guild.id)
old_moderation = moderation old_moderation = moderation
except ValueError: except ValueError:
await interaction.response.send_message( await interaction.response.send_message(
@ -1511,7 +1521,8 @@ class Aurora(commands.Cog):
) )
embed = await message_factory( embed = await message_factory(
await self.bot.get_embed_color(guild.channels[0]), bot=self.bot,
color=await self.bot.get_embed_color(guild.channels[0]),
guild=guild, guild=guild,
reason=f"Automatic unban from case #{moderation_id}", reason=f"Automatic unban from case #{moderation_id}",
moderation_type="unbanned", moderation_type="unbanned",

View file

@ -1,6 +1,7 @@
import json import json
import sqlite3 import sqlite3
from datetime import datetime, timedelta from datetime import datetime, timedelta
from sqlite3 import Cursor
from time import time from time import time
from typing import Dict, Iterable, List, Optional, Tuple, Union from typing import Dict, Iterable, List, Optional, Tuple, Union
@ -9,7 +10,6 @@ from discord import NotFound
from redbot.core.bot import Red from redbot.core.bot import Red
from ..utilities.logger import logger from ..utilities.logger import logger
from ..utilities.utils import get_next_case_number
from .base import AuroraGuildModel from .base import AuroraGuildModel
from .change import Change from .change import Change
from .partials import PartialChannel, PartialRole, PartialUser from .partials import PartialChannel, PartialRole, PartialUser
@ -63,10 +63,10 @@ class Moderation(AuroraGuildModel):
return await PartialRole.from_id(self.bot, self.guild_id, self.role_id) return await PartialRole.from_id(self.bot, self.guild_id, self.role_id)
return None return None
def __str__(self): def __str__(self) -> str:
return f"{self.moderation_type} {self.target_type} {self.target_id} {self.reason}" return f"{self.moderation_type} {self.target_type} {self.target_id} {self.reason}"
async def resolve(self, resolved_by: int, reason: str): async def resolve(self, resolved_by: int, reason: str) -> None:
if self.resolved: if self.resolved:
raise ValueError("Case is already resolved!") raise ValueError("Case is already resolved!")
@ -113,7 +113,7 @@ class Moderation(AuroraGuildModel):
self.update() self.update()
def update(self): def update(self) -> None:
from ..utilities.database import connect from ..utilities.database import connect
from ..utilities.json import dumps from ..utilities.json import dumps
query = f"UPDATE moderation_{self.guild_id} SET timestamp = ?, moderation_type = ?, target_type = ?, moderator_id = ?, role_id = ?, duration = ?, end_timestamp = ?, reason = ?, resolved = ?, resolved_by = ?, resolve_reason = ?, expired = ?, changes = ?, metadata = ? WHERE moderation_id = ?;" query = f"UPDATE moderation_{self.guild_id} SET timestamp = ?, moderation_type = ?, target_type = ?, moderator_id = ?, role_id = ?, duration = ?, end_timestamp = ?, reason = ?, resolved = ?, resolved_by = ?, resolve_reason = ?, expired = ?, changes = ?, metadata = ? WHERE moderation_id = ?;"
@ -202,62 +202,75 @@ class Moderation(AuroraGuildModel):
return cls.from_dict(bot=bot, data=case) return cls.from_dict(bot=bot, data=case)
@classmethod @classmethod
def execute(cls, bot: Red, guild_id: int, query: str, parameters: tuple | None = None) -> Tuple["Moderation"]: def execute(cls, bot: Red, guild_id: int, query: str, parameters: tuple | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
from ..utilities.database import connect from ..utilities.database import connect
if not parameters: if not parameters:
parameters = () parameters = ()
with connect() as database: if not cursor:
no_cursor = True
database = connect()
cursor = database.cursor() cursor = database.cursor()
cursor.execute(query, parameters) else:
results = cursor.fetchall() no_cursor = False
cursor.close()
if results: cursor.execute(query, parameters)
cases = [] results = cursor.fetchall()
for result in results: if no_cursor:
case = cls.from_result(bot=bot, result=result, guild_id=guild_id) cursor.close()
if case.moderation_id != 0: database.close()
cases.append(case)
return tuple(cases) if results:
cases = []
for result in results:
case = cls.from_result(bot=bot, result=result, guild_id=guild_id)
if case.moderation_id != 0:
cases.append(case)
return tuple(cases)
return () return ()
@classmethod @classmethod
def from_sql(cls, bot: Red, moderation_id: int, guild_id: int) -> "Moderation": def get_latest(cls, bot: Red, guild_id: int, limit: int | None = None, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
return cls.find_by_id(bot=bot, moderation_id=moderation_id, guild_id=guild_id) params = []
query = f"SELECT * FROM moderation_{guild_id} ORDER BY moderation_id DESC"
if limit:
query += " LIMIT ?"
params.append(limit)
if types:
query += f" WHERE moderation_type IN ({', '.join(['?' for _ in types])})"
params.extend(types)
query += ";"
return cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=tuple(params) if limit else (), cursor=cursor)
@classmethod @classmethod
def find_by_id(cls, bot: Red, moderation_id: int, guild_id: int) -> "Moderation": def get_next_case_number(cls, bot: Red, guild_id: int, cursor: Cursor | None = None) -> int:
result = cls.get_latest(bot=bot, guild_id=guild_id, cursor=cursor)
return (result[0].moderation_id + 1) if result else 1
@classmethod
def find_by_id(cls, bot: Red, moderation_id: int, guild_id: int, cursor: Cursor | None = None) -> "Moderation":
query = f"SELECT * FROM moderation_{guild_id} WHERE moderation_id = ?;" query = f"SELECT * FROM moderation_{guild_id} WHERE moderation_id = ?;"
case = cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderation_id,)) case = cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderation_id,), cursor=cursor)
if case: if case:
return case[0] return case[0]
raise ValueError(f"Case {moderation_id} not found in moderation_{guild_id}!") raise ValueError(f"Case {moderation_id} not found in moderation_{guild_id}!")
@classmethod @classmethod
def find_by_target(cls, bot: Red, guild_id: int, target: int, types: list | None = None) -> Tuple["Moderation"]: def find_by_target(cls, bot: Red, guild_id: int, target: int, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id} WHERE target_id = ?" query = f"SELECT * FROM moderation_{guild_id} WHERE target_id = ?"
if types: if types:
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})" query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
query += " ORDER BY moderation_id DESC;" query += " ORDER BY moderation_id DESC;"
return cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(target, *types) if types else (target,)) return cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(target, *types) if types else (target,), cursor=cursor)
@classmethod @classmethod
def find_by_moderator(cls, bot: Red, guild_id: int, moderator: int, types: list | None = None) -> Tuple["Moderation"]: def find_by_moderator(cls, bot: Red, guild_id: int, moderator: int, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id} WHERE moderator_id = ?" query = f"SELECT * FROM moderation_{guild_id} WHERE moderator_id = ?"
if types: if types:
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})" query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
query += " ORDER BY moderation_id DESC;" query += " ORDER BY moderation_id DESC;"
return cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderator, *types) if types else (moderator,)) return cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderator, *types) if types else (moderator,), cursor=cursor)
@classmethod
def get_all_cases(cls, bot: Red, guild_id: int, types: list | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id}"
if types:
query += f" WHERE moderation_type IN ({', '.join(['?' for _ in types])})"
query += " ORDER BY moderation_id DESC;"
return cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=tuple(iterable=types) if types else None)
@classmethod @classmethod
def log( def log(
@ -321,7 +334,7 @@ class Moderation(AuroraGuildModel):
close_db = False close_db = False
cursor = database.cursor() cursor = database.cursor()
moderation_id = get_next_case_number(guild_id=guild_id, cursor=cursor) moderation_id = cls.get_next_case_number(bot=bot, guild_id=guild_id, cursor=cursor)
case = { case = {
"moderation_id": moderation_id, "moderation_id": moderation_id,
@ -371,4 +384,4 @@ class Moderation(AuroraGuildModel):
case["metadata"], case["metadata"],
) )
return cls.from_sql(bot=bot, moderation_id=moderation_id, guild_id=guild_id) return cls.find_by_id(bot=bot, moderation_id=moderation_id, guild_id=guild_id)

View file

@ -4,15 +4,17 @@ from typing import Union
from discord import Color, Embed, Guild, Interaction, InteractionMessage, Member, Role, User from discord import Color, Embed, Guild, Interaction, InteractionMessage, Member, Role, User
from redbot.core import commands from redbot.core import commands
from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import bold, box, error, humanize_timedelta, warning from redbot.core.utils.chat_formatting import bold, box, error, humanize_timedelta, warning
from ..models.moderation import Moderation from ..models.moderation import Moderation
from ..models.partials import PartialUser from ..models.partials import PartialUser
from .config import config from .config import config
from .utils import get_bool_emoji, get_next_case_number, get_pagesize_str from .utils import get_bool_emoji, get_pagesize_str
async def message_factory( async def message_factory(
bot: Red,
color: Color, color: Color,
guild: Guild, guild: Guild,
reason: str, reason: str,
@ -25,6 +27,7 @@ async def message_factory(
"""This function creates a message from set parameters, meant for contacting the moderated user. """This function creates a message from set parameters, meant for contacting the moderated user.
Args: Args:
bot (Red): The bot instance.
color (Color): The color of the embed. color (Color): The color of the embed.
guild (Guild): The guild the moderation occurred in. guild (Guild): The guild the moderation occurred in.
reason (str): The reason for the moderation. reason (str): The reason for the moderation.
@ -88,7 +91,7 @@ async def message_factory(
embed.set_author(name=guild.name) embed.set_author(name=guild.name)
embed.set_footer( embed.set_footer(
text=f"Case #{get_next_case_number(guild.id):,}", text=f"Case #{Moderation.get_next_case_number(bot=bot, guild_id=guild.id):,}",
icon_url="attachment://arrow.png", icon_url="attachment://arrow.png",
) )

View file

@ -1,6 +1,6 @@
# pylint: disable=cyclic-import # pylint: disable=cyclic-import
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Optional, Union from typing import Optional, Tuple, Union
from dateutil.relativedelta import relativedelta as rd from dateutil.relativedelta import relativedelta as rd
from discord import File, Guild, Interaction, Member, SelectOption, TextChannel, User from discord import File, Guild, Interaction, Member, SelectOption, TextChannel, User
@ -13,7 +13,7 @@ from ..utilities.config import config
def check_permissions( def check_permissions(
user: User, user: User,
permissions: list, permissions: Tuple[str],
ctx: Union[commands.Context, Interaction] | None = None, ctx: Union[commands.Context, Interaction] | None = None,
guild: Guild | None = None, guild: Guild | None = None,
) -> Union[bool, str]: ) -> Union[bool, str]:
@ -40,7 +40,7 @@ def check_permissions(
async def check_moddable( async def check_moddable(
target: Union[User, Member, TextChannel], interaction: Interaction, permissions: list target: Union[User, Member, TextChannel], interaction: Interaction, permissions: Tuple[str]
) -> bool: ) -> bool:
"""Checks if a moderator can moderate a target.""" """Checks if a moderator can moderate a target."""
is_channel = isinstance(target, TextChannel) is_channel = isinstance(target, TextChannel)
@ -110,20 +110,6 @@ async def check_moddable(
return True return True
def get_next_case_number(guild_id: str, cursor=None) -> int:
"""This function returns the next case number from the MySQL table for a specific guild."""
from .database import connect
if not cursor:
database = connect()
cursor = database.cursor()
cursor.execute(
f"SELECT moderation_id FROM `moderation_{guild_id}` ORDER BY moderation_id DESC LIMIT 1"
)
result = cursor.fetchone()
return (result[0] + 1) if result else 1
async def log(interaction: Interaction, moderation_id: int, resolved: bool = False) -> None: async def log(interaction: Interaction, moderation_id: int, resolved: bool = False) -> None:
"""This function sends a message to the guild's configured logging channel when an infraction takes place.""" """This function sends a message to the guild's configured logging channel when an infraction takes place."""
from ..models.moderation import Moderation from ..models.moderation import Moderation
@ -134,7 +120,7 @@ async def log(interaction: Interaction, moderation_id: int, resolved: bool = Fal
logging_channel = interaction.guild.get_channel(logging_channel_id) logging_channel = interaction.guild.get_channel(logging_channel_id)
try: try:
moderation = Moderation.from_sql(interaction.client, moderation_id, interaction.guild_id) moderation = Moderation.find_by_id(interaction.client, moderation_id, interaction.guild_id)
embed = await log_factory( embed = await log_factory(
interaction=interaction, moderation=moderation, resolved=resolved interaction=interaction, moderation=moderation, resolved=resolved
) )
@ -159,18 +145,20 @@ async def send_evidenceformat(interaction: Interaction, moderation_id: int) -> N
if send_evidence_bool is False: if send_evidence_bool is False:
return return
moderation = Moderation.from_sql(interaction.client, moderation_id, interaction.guild.id) moderation = Moderation.find_by_id(interaction.client, moderation_id, interaction.guild.id)
content = await evidenceformat_factory(moderation=moderation) content = await evidenceformat_factory(moderation=moderation)
await interaction.followup.send(content=content, ephemeral=True) await interaction.followup.send(content=content, ephemeral=True)
def get_bool_emoji(value: Optional[bool]) -> str: def get_bool_emoji(value: Optional[bool]) -> str:
"""Returns a unicode emoji based on a boolean value.""" """Returns a unicode emoji based on a boolean value."""
if value is True: match value:
return "\N{WHITE HEAVY CHECK MARK}" case True:
if value is False: return "\N{WHITE HEAVY CHECK MARK}"
return "\N{NO ENTRY SIGN}" case False:
return "\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}" return "\N{NO ENTRY SIGN}"
case _:
return "\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}"
def get_pagesize_str(value: Union[int, None]) -> str: def get_pagesize_str(value: Union[int, None]) -> str: