Compare commits

..

4 commits

4 changed files with 93 additions and 78 deletions

View file

@ -221,7 +221,8 @@ class Aurora(commands.Cog):
if silent is False:
try:
embed = await message_factory(
await self.bot.get_embed_color(interaction.channel),
bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild,
moderator=interaction.user,
reason=reason,
@ -279,7 +280,8 @@ class Aurora(commands.Cog):
if silent is False:
try:
embed = await message_factory(
await self.bot.get_embed_color(interaction.channel),
bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild,
moderator=interaction.user,
reason=reason,
@ -374,7 +376,8 @@ class Aurora(commands.Cog):
if silent is False:
try:
embed = await message_factory(
await self.bot.get_embed_color(interaction.channel),
bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild,
moderator=interaction.user,
reason=reason,
@ -479,7 +482,8 @@ class Aurora(commands.Cog):
if silent is False:
try:
embed = await message_factory(
await self.bot.get_embed_color(interaction.channel),
bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild,
moderator=interaction.user,
reason=reason,
@ -575,7 +579,8 @@ class Aurora(commands.Cog):
if silent is False:
try:
embed = await message_factory(
await self.bot.get_embed_color(interaction.channel),
bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild,
moderator=interaction.user,
reason=reason,
@ -650,7 +655,8 @@ class Aurora(commands.Cog):
if silent is False:
try:
embed = await message_factory(
await self.bot.get_embed_color(interaction.channel),
bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild,
moderator=interaction.user,
reason=reason,
@ -708,7 +714,8 @@ class Aurora(commands.Cog):
if silent is False:
try:
embed = await message_factory(
await self.bot.get_embed_color(interaction.channel),
bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild,
moderator=interaction.user,
reason=reason,
@ -810,7 +817,8 @@ class Aurora(commands.Cog):
try:
embed = await message_factory(
await self.bot.get_embed_color(interaction.channel),
bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild,
moderator=interaction.user,
reason=reason,
@ -853,8 +861,9 @@ class Aurora(commands.Cog):
silent = not await config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = embed = await message_factory(
await self.bot.get_embed_color(interaction.channel),
embed = await message_factory(
bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild,
moderator=interaction.user,
reason=reason,
@ -936,7 +945,8 @@ class Aurora(commands.Cog):
if silent is False:
try:
embed = await message_factory(
await self.bot.get_embed_color(interaction.channel),
bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild,
moderator=interaction.user,
reason=reason,
@ -1089,7 +1099,7 @@ class Aurora(commands.Cog):
+ f"moderation_{interaction.guild.id}.json"
)
cases = Moderation.get_all_cases(bot=interaction.client, guild_id=interaction.guild.id)
cases = Moderation.get_latest(bot=interaction.client, guild_id=interaction.guild.id)
with open(filename, "w", encoding="utf-8") as f:
dump(obj=cases, fp=f, indent=2)
@ -1118,7 +1128,7 @@ class Aurora(commands.Cog):
elif moderator:
moderations = Moderation.find_by_moderator(interaction.client, interaction.guild.id, moderator.id)
else:
moderations = Moderation.get_all_cases(interaction.client, interaction.guild.id)
moderations = Moderation.get_latest(interaction.client, interaction.guild.id)
case_quantity = len(moderations)
page_quantity = ceil(case_quantity / pagesize)
@ -1191,7 +1201,7 @@ class Aurora(commands.Cog):
Reason for resolving case"""
permissions = check_permissions(
interaction.client.user,
["embed_links", "moderate_members", "ban_members"],
("embed_links", "moderate_members", "ban_members"),
interaction,
)
if permissions:
@ -1204,7 +1214,7 @@ class Aurora(commands.Cog):
return
try:
moderation = Moderation.from_sql(interaction.client, case, interaction.guild.id)
moderation = Moderation.find_by_id(interaction.client, case, interaction.guild.id)
except ValueError:
await interaction.response.send_message(
content=error(f"Case #{case:,} does not exist!"), ephemeral=True
@ -1288,7 +1298,7 @@ class Aurora(commands.Cog):
)
try:
mod = Moderation.from_sql(interaction.client, case, interaction.guild.id)
mod = Moderation.find_by_id(interaction.client, case, interaction.guild.id)
except ValueError:
await interaction.response.send_message(
content=error(f"Case #{case:,} does not exist!"), ephemeral=True
@ -1381,7 +1391,7 @@ class Aurora(commands.Cog):
return
try:
moderation = Moderation.from_sql(interaction.client, case, interaction.guild.id)
moderation = Moderation.find_by_id(interaction.client, case, interaction.guild.id)
old_moderation = moderation
except ValueError:
await interaction.response.send_message(
@ -1511,7 +1521,8 @@ class Aurora(commands.Cog):
)
embed = await message_factory(
await self.bot.get_embed_color(guild.channels[0]),
bot=self.bot,
color=await self.bot.get_embed_color(guild.channels[0]),
guild=guild,
reason=f"Automatic unban from case #{moderation_id}",
moderation_type="unbanned",

View file

@ -1,6 +1,7 @@
import json
import sqlite3
from datetime import datetime, timedelta
from sqlite3 import Cursor
from time import time
from typing import Dict, Iterable, List, Optional, Tuple, Union
@ -9,7 +10,6 @@ from discord import NotFound
from redbot.core.bot import Red
from ..utilities.logger import logger
from ..utilities.utils import get_next_case_number
from .base import AuroraGuildModel
from .change import Change
from .partials import PartialChannel, PartialRole, PartialUser
@ -63,10 +63,10 @@ class Moderation(AuroraGuildModel):
return await PartialRole.from_id(self.bot, self.guild_id, self.role_id)
return None
def __str__(self):
def __str__(self) -> str:
return f"{self.moderation_type} {self.target_type} {self.target_id} {self.reason}"
async def resolve(self, resolved_by: int, reason: str):
async def resolve(self, resolved_by: int, reason: str) -> None:
if self.resolved:
raise ValueError("Case is already resolved!")
@ -113,7 +113,7 @@ class Moderation(AuroraGuildModel):
self.update()
def update(self):
def update(self) -> None:
from ..utilities.database import connect
from ..utilities.json import dumps
query = f"UPDATE moderation_{self.guild_id} SET timestamp = ?, moderation_type = ?, target_type = ?, moderator_id = ?, role_id = ?, duration = ?, end_timestamp = ?, reason = ?, resolved = ?, resolved_by = ?, resolve_reason = ?, expired = ?, changes = ?, metadata = ? WHERE moderation_id = ?;"
@ -202,15 +202,22 @@ class Moderation(AuroraGuildModel):
return cls.from_dict(bot=bot, data=case)
@classmethod
def execute(cls, bot: Red, guild_id: int, query: str, parameters: tuple | None = None) -> Tuple["Moderation"]:
def execute(cls, bot: Red, guild_id: int, query: str, parameters: tuple | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
from ..utilities.database import connect
if not parameters:
parameters = ()
with connect() as database:
if not cursor:
no_cursor = True
database = connect()
cursor = database.cursor()
else:
no_cursor = False
cursor.execute(query, parameters)
results = cursor.fetchall()
if no_cursor:
cursor.close()
database.close()
if results:
cases = []
@ -222,42 +229,48 @@ class Moderation(AuroraGuildModel):
return ()
@classmethod
def from_sql(cls, bot: Red, moderation_id: int, guild_id: int) -> "Moderation":
return cls.find_by_id(bot=bot, moderation_id=moderation_id, guild_id=guild_id)
def get_latest(cls, bot: Red, guild_id: int, limit: int | None = None, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
params = []
query = f"SELECT * FROM moderation_{guild_id} ORDER BY moderation_id DESC"
if limit:
query += " LIMIT ?"
params.append(limit)
if types:
query += f" WHERE moderation_type IN ({', '.join(['?' for _ in types])})"
params.extend(types)
query += ";"
return cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=tuple(params) if limit else (), cursor=cursor)
@classmethod
def find_by_id(cls, bot: Red, moderation_id: int, guild_id: int) -> "Moderation":
def get_next_case_number(cls, bot: Red, guild_id: int, cursor: Cursor | None = None) -> int:
result = cls.get_latest(bot=bot, guild_id=guild_id, cursor=cursor)
return (result[0].moderation_id + 1) if result else 1
@classmethod
def find_by_id(cls, bot: Red, moderation_id: int, guild_id: int, cursor: Cursor | None = None) -> "Moderation":
query = f"SELECT * FROM moderation_{guild_id} WHERE moderation_id = ?;"
case = cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderation_id,))
case = cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderation_id,), cursor=cursor)
if case:
return case[0]
raise ValueError(f"Case {moderation_id} not found in moderation_{guild_id}!")
@classmethod
def find_by_target(cls, bot: Red, guild_id: int, target: int, types: list | None = None) -> Tuple["Moderation"]:
def find_by_target(cls, bot: Red, guild_id: int, target: int, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id} WHERE target_id = ?"
if types:
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
query += " ORDER BY moderation_id DESC;"
return cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(target, *types) if types else (target,))
return cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(target, *types) if types else (target,), cursor=cursor)
@classmethod
def find_by_moderator(cls, bot: Red, guild_id: int, moderator: int, types: list | None = None) -> Tuple["Moderation"]:
def find_by_moderator(cls, bot: Red, guild_id: int, moderator: int, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id} WHERE moderator_id = ?"
if types:
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
query += " ORDER BY moderation_id DESC;"
return cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderator, *types) if types else (moderator,))
@classmethod
def get_all_cases(cls, bot: Red, guild_id: int, types: list | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id}"
if types:
query += f" WHERE moderation_type IN ({', '.join(['?' for _ in types])})"
query += " ORDER BY moderation_id DESC;"
return cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=tuple(iterable=types) if types else None)
return cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderator, *types) if types else (moderator,), cursor=cursor)
@classmethod
def log(
@ -321,7 +334,7 @@ class Moderation(AuroraGuildModel):
close_db = False
cursor = database.cursor()
moderation_id = get_next_case_number(guild_id=guild_id, cursor=cursor)
moderation_id = cls.get_next_case_number(bot=bot, guild_id=guild_id, cursor=cursor)
case = {
"moderation_id": moderation_id,
@ -371,4 +384,4 @@ class Moderation(AuroraGuildModel):
case["metadata"],
)
return cls.from_sql(bot=bot, moderation_id=moderation_id, guild_id=guild_id)
return cls.find_by_id(bot=bot, moderation_id=moderation_id, guild_id=guild_id)

View file

@ -4,15 +4,17 @@ from typing import Union
from discord import Color, Embed, Guild, Interaction, InteractionMessage, Member, Role, User
from redbot.core import commands
from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import bold, box, error, humanize_timedelta, warning
from ..models.moderation import Moderation
from ..models.partials import PartialUser
from .config import config
from .utils import get_bool_emoji, get_next_case_number, get_pagesize_str
from .utils import get_bool_emoji, get_pagesize_str
async def message_factory(
bot: Red,
color: Color,
guild: Guild,
reason: str,
@ -25,6 +27,7 @@ async def message_factory(
"""This function creates a message from set parameters, meant for contacting the moderated user.
Args:
bot (Red): The bot instance.
color (Color): The color of the embed.
guild (Guild): The guild the moderation occurred in.
reason (str): The reason for the moderation.
@ -88,7 +91,7 @@ async def message_factory(
embed.set_author(name=guild.name)
embed.set_footer(
text=f"Case #{get_next_case_number(guild.id):,}",
text=f"Case #{Moderation.get_next_case_number(bot=bot, guild_id=guild.id):,}",
icon_url="attachment://arrow.png",
)

View file

@ -1,6 +1,6 @@
# pylint: disable=cyclic-import
from datetime import datetime, timedelta
from typing import Optional, Union
from typing import Optional, Tuple, Union
from dateutil.relativedelta import relativedelta as rd
from discord import File, Guild, Interaction, Member, SelectOption, TextChannel, User
@ -13,7 +13,7 @@ from ..utilities.config import config
def check_permissions(
user: User,
permissions: list,
permissions: Tuple[str],
ctx: Union[commands.Context, Interaction] | None = None,
guild: Guild | None = None,
) -> Union[bool, str]:
@ -40,7 +40,7 @@ def check_permissions(
async def check_moddable(
target: Union[User, Member, TextChannel], interaction: Interaction, permissions: list
target: Union[User, Member, TextChannel], interaction: Interaction, permissions: Tuple[str]
) -> bool:
"""Checks if a moderator can moderate a target."""
is_channel = isinstance(target, TextChannel)
@ -110,20 +110,6 @@ async def check_moddable(
return True
def get_next_case_number(guild_id: str, cursor=None) -> int:
"""This function returns the next case number from the MySQL table for a specific guild."""
from .database import connect
if not cursor:
database = connect()
cursor = database.cursor()
cursor.execute(
f"SELECT moderation_id FROM `moderation_{guild_id}` ORDER BY moderation_id DESC LIMIT 1"
)
result = cursor.fetchone()
return (result[0] + 1) if result else 1
async def log(interaction: Interaction, moderation_id: int, resolved: bool = False) -> None:
"""This function sends a message to the guild's configured logging channel when an infraction takes place."""
from ..models.moderation import Moderation
@ -134,7 +120,7 @@ async def log(interaction: Interaction, moderation_id: int, resolved: bool = Fal
logging_channel = interaction.guild.get_channel(logging_channel_id)
try:
moderation = Moderation.from_sql(interaction.client, moderation_id, interaction.guild_id)
moderation = Moderation.find_by_id(interaction.client, moderation_id, interaction.guild_id)
embed = await log_factory(
interaction=interaction, moderation=moderation, resolved=resolved
)
@ -159,17 +145,19 @@ async def send_evidenceformat(interaction: Interaction, moderation_id: int) -> N
if send_evidence_bool is False:
return
moderation = Moderation.from_sql(interaction.client, moderation_id, interaction.guild.id)
moderation = Moderation.find_by_id(interaction.client, moderation_id, interaction.guild.id)
content = await evidenceformat_factory(moderation=moderation)
await interaction.followup.send(content=content, ephemeral=True)
def get_bool_emoji(value: Optional[bool]) -> str:
"""Returns a unicode emoji based on a boolean value."""
if value is True:
match value:
case True:
return "\N{WHITE HEAVY CHECK MARK}"
if value is False:
case False:
return "\N{NO ENTRY SIGN}"
case _:
return "\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}"