feat(aurora): v3

This commit is contained in:
cswimr 2025-01-25 20:04:02 +00:00
parent f831bfcdd5
commit ba7a427dad
19 changed files with 3219 additions and 1973 deletions

34
aurora/models/base.py Normal file
View file

@ -0,0 +1,34 @@
from typing import Any, Optional
from discord import Guild
from pydantic import BaseModel, ConfigDict
from redbot.core.bot import Red
class AuroraBaseModel(BaseModel):
"""Base class for all models in Aurora."""
model_config = ConfigDict(ignored_types=(Red,), arbitrary_types_allowed=True)
bot: Red
def dump(self) -> dict:
return self.model_dump(exclude={"bot"})
def to_json(self, indent: int | None = None, file: Any | None = None, **kwargs) -> str:
from ..utilities.json import dump, dumps # pylint: disable=cyclic-import
return dump(self.dump(), file, indent=indent, **kwargs) if file else dumps(self.dump(), indent=indent, **kwargs)
class AuroraGuildModel(AuroraBaseModel):
"""Subclass of AuroraBaseModel that includes a guild_id attribute and a guild attribute."""
model_config = ConfigDict(ignored_types=(Red, Guild), arbitrary_types_allowed=True)
guild_id: int
guild: Optional[Guild] = None
def dump(self) -> dict:
return self.model_dump(exclude={"bot", "guild_id", "guild"})
def __repr__(self) -> str:
return f"<{self.__class__.__name__} guild_id={self.guild_id}>"

79
aurora/models/change.py Normal file
View file

@ -0,0 +1,79 @@
import json
from datetime import datetime, timedelta
from typing import Literal, Optional
from redbot.core.bot import Red
from ..utilities.utils import timedelta_from_string
from .base import AuroraBaseModel
from .partials import PartialUser
class Change(AuroraBaseModel):
type: Literal["ORIGINAL", "RESOLVE", "EDIT"]
timestamp: datetime
user_id: int
reason: Optional[str] = None
duration: Optional[timedelta] = None
end_timestamp: Optional[datetime] = None
@property
def unix_timestamp(self) -> int:
return int(self.timestamp.timestamp())
@property
def unix_end_timestamp(self) -> Optional[int]:
if self.end_timestamp:
return int(self.end_timestamp.timestamp())
return None
def __str__(self):
return f"{self.type} {self.user_id} {self.reason}"
def __repr__(self) -> str:
attrs = [
("type", self.type),
("timestamp", self.timestamp),
("user_id", self.user_id),
("reason", self.reason),
("duration", self.duration),
("end_timestamp", self.end_timestamp),
]
joined = " ".join(f"{key}={value!r}" for key, value in attrs)
return f"<{self.__class__.__name__} {joined}>"
async def get_user(self) -> "PartialUser":
return await PartialUser.from_id(self.bot, self.user_id)
@classmethod
def from_dict(cls, bot: Red, data: dict) -> "Change":
if isinstance(data, str):
data = json.loads(data)
if data.get("duration") and not isinstance(data["duration"], timedelta) and not data["duration"] == "NULL":
duration = timedelta_from_string(data["duration"])
elif data.get("duration") and isinstance(data["duration"], timedelta):
duration = data["duration"]
else:
duration = None
if data.get("end_timestamp") and not isinstance(data["end_timestamp"], datetime):
end_timestamp = datetime.fromtimestamp(data["end_timestamp"])
elif data.get("end_timestamp") and isinstance(data["end_timestamp"], datetime):
end_timestamp = data["end_timestamp"]
else:
end_timestamp = None
if not isinstance(data["timestamp"], datetime):
timestamp = datetime.fromtimestamp(data["timestamp"])
else:
timestamp = data["timestamp"]
try:
data["user_id"] = int(data["user_id"])
except ValueError:
data["user_id"] = 0
data.update({"timestamp": timestamp, "end_timestamp": end_timestamp, "duration": duration})
if "bot" in data:
del data["bot"]
return cls(bot=bot, **data)

576
aurora/models/moderation.py Normal file
View file

@ -0,0 +1,576 @@
import json
import sqlite3
from datetime import datetime, timedelta
from time import time
from typing import Dict, Iterable, List, Optional, Tuple, Union
import discord
from aiosqlite import Connection, Cursor, OperationalError, Row
from aiosqlite import connect as aiosqlite_connect
from redbot.core import data_manager
from redbot.core.bot import Red
from ..utilities.logger import logger
from ..utilities.utils import timedelta_to_string
from .base import AuroraGuildModel
from .change import Change
from .partials import PartialChannel, PartialRole, PartialUser
from .type import Type, type_registry
class Moderation(AuroraGuildModel):
"""This class represents a moderation case in the database.
Attributes:
bot (Red): The bot instance.
guild (discord.Guild): The guild the case belongs to.
moderation_id (int): The ID of the moderation case.
timestamp (datetime): The timestamp of the case.
moderation_type (Type): The type of moderation case.
target_type (str): The type of target. Should be either `user` or `channel`.
target_id (int): The ID of the target.
moderator_id (int): The ID of the moderator who issued the case.
role_id (int): The ID of the role, if applicable.
duration (timedelta): The duration of the case, if applicable.
end_timestamp (datetime): The end timestamp of the case, if applicable.
reason (str): The reason for the case.
resolved (bool): Whether the case is resolved.
resolved_by (int): The ID of the user who resolved the case.
resolve_reason (str): The reason the case was resolved.
expired (bool): Whether the case is expired.
changes (List[Change]): A list of changes to the case.
metadata (Dict): A dictionary of metadata stored with the case.
Properties:
id (int): The ID of the case.
type (Type): The type of the case.
unix_timestamp (int): The timestamp of the case as a Unix timestamp.
Methods:
get_moderator: Gets the moderator who issued the case.
get_target: Gets the target of the case.
get_resolved_by: Gets the user who resolved the case.
get_role: Gets the role, if applicable.
resolve: Resolves the case.
update: Updates the case in the database.
Class Methods:
from_dict: Creates a `Moderation` object from a dictionary.
from_result: Creates a `Moderation` object from a database result.
execute: Executes a query on the database.
get_latest: Gets the latest cases from the database.
get_next_case_number: Gets the next case number to use.
find_by_id: Finds a case by its ID.
find_by_target: Finds cases by the target.
find_by_moderator: Finds cases by the moderator.
log: Logs a moderation case in the database.
Static Methods:
connect: Connects to the SQLite database.
"""
moderation_id: int
timestamp: datetime
moderation_type: Type
target_type: str
target_id: int
moderator_id: int
role_id: Optional[int] = None
duration: Optional[timedelta] = None
end_timestamp: Optional[datetime] = None
reason: Optional[str] = None
resolved: bool
resolved_by: Optional[int] = None
resolve_reason: Optional[str] = None
expired: bool
changes: List["Change"]
metadata: Dict
@property
def id(self) -> int:
return self.moderation_id
@property
def type(self) -> Type:
return self.moderation_type
@property
def unix_timestamp(self) -> int:
return int(self.timestamp.timestamp())
async def get_moderator(self) -> "PartialUser":
return await PartialUser.from_id(self.bot, self.moderator_id)
async def get_target(self) -> Union["PartialUser", "PartialChannel"]:
if self.target_type.lower() == "user":
return await PartialUser.from_id(self.bot, self.target_id)
return await PartialChannel.from_id(self.bot, self.target_id, self.guild)
async def get_resolved_by(self) -> Optional["PartialUser"]:
if self.resolved_by:
return await PartialUser.from_id(self.bot, self.resolved_by)
return None
async def get_role(self) -> Optional["PartialRole"]:
if self.role_id:
return await PartialRole.from_id(self.bot, self.guild, self.role_id)
return None
def __str__(self) -> str:
return f"{self.moderation_type} {self.target_type} {self.target_id} {self.reason}"
def __int__(self) -> int:
return self.moderation_id
def __repr__(self) -> str:
attrs = [
("guild_id", self.guild_id),
("moderation_id", self.moderation_id),
("timestamp", self.timestamp),
("type", self.type),
("target_type", self.target_type),
("target_id", self.target_id),
("moderator_id", self.moderator_id),
("role_id", self.role_id),
("duration", self.duration),
("end_timestamp", self.end_timestamp),
("reason", self.reason),
("resolved", self.resolved),
("resolved_by", self.resolved_by),
("resolve_reason", self.resolve_reason),
("expired", self.expired),
("changes", self.changes),
("metadata", self.metadata),
]
joined = " ".join(f"{key}={value!r}" for key, value in attrs)
return f"<{self.__class__.__name__} {joined}>"
async def resolve(self, resolved_by: int, reason: str) -> Tuple[bool, str]:
if self.resolved:
raise ValueError("Case is already resolved!")
self.resolved = True
self.resolved_by = resolved_by
self.resolve_reason = reason
success, msg = await self.type.resolve_handler(moderation=self, reason=reason)
if not self.changes:
self.changes.append(
Change.from_dict(
self.bot,
{
"type": "ORIGINAL",
"timestamp": self.timestamp,
"reason": self.reason,
"user_id": self.moderator_id,
"duration": self.duration,
"end_timestamp": self.end_timestamp,
},
)
)
self.changes.append(
Change.from_dict(
self.bot,
{
"type": "RESOLVE",
"timestamp": datetime.now(),
"reason": reason,
"user_id": resolved_by,
},
)
)
await self.update()
return success, msg
async def update(self) -> None:
from ..utilities.json import dumps
query = f"UPDATE moderation_{self.guild_id} SET timestamp = ?, moderation_type = ?, target_type = ?, moderator_id = ?, role_id = ?, duration = ?, end_timestamp = ?, reason = ?, resolved = ?, resolved_by = ?, resolve_reason = ?, expired = ?, changes = ?, metadata = ? WHERE moderation_id = ?;"
await self.execute(
query,
(
self.timestamp.timestamp(),
self.moderation_type.key,
self.target_type,
self.moderator_id,
self.role_id,
timedelta_to_string(self.duration) if self.duration else None,
self.end_timestamp.timestamp() if self.end_timestamp else None,
self.reason,
self.resolved,
self.resolved_by,
self.resolve_reason,
self.expired,
dumps(self.changes),
dumps(self.metadata),
self.moderation_id,
),
)
logger.verbose(
"Row updated in moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
self.moderation_id,
self.guild_id,
self.timestamp.timestamp(),
self.moderation_type.key,
self.target_type,
self.moderator_id,
self.role_id,
timedelta_to_string(self.duration) if self.duration else None,
self.end_timestamp.timestamp() if self.end_timestamp else None,
self.reason,
self.resolved,
self.resolved_by,
self.resolve_reason,
self.expired,
dumps(self.changes),
dumps(self.metadata),
)
@classmethod
async def from_dict(cls, bot: Red, data: dict) -> "Moderation":
if data.get("guild_id"):
try:
guild = bot.get_guild(data["guild_id"])
if not guild:
guild = await bot.fetch_guild(data["guild_id"])
except (discord.Forbidden, discord.HTTPException):
guild = None
data.update({"guild": guild})
return cls(bot=bot, **data)
@classmethod
async def from_result(cls, bot: Red, result: Iterable, guild_id: int) -> "Moderation":
if result[7] is not None and result[7] != "NULL":
try:
hours, minutes, seconds = map(int, result[7].split(":"))
duration = timedelta(hours=hours, minutes=minutes, seconds=seconds)
except ValueError as e:
logger.error("Error parsing duration for case %s: %s", result[0], result[7])
raise e
else:
duration = None
if result[14] is not None:
changes = json.loads(result[14])
change_obj_list = []
if changes:
for change in changes:
change_obj_list.append(Change.from_dict(bot=bot, data=change))
if result[15] is not None:
metadata = json.loads(result[15])
else:
metadata = {}
moderation_type = str.lower(result[2])
if moderation_type in type_registry:
moderation_type = type_registry[moderation_type]
else:
logger.error("Unknown moderation type in case %s: %s", result[0], result[2])
case = {
"moderation_id": int(result[0]),
"guild_id": int(guild_id),
"timestamp": datetime.fromtimestamp(result[1]),
"moderation_type": moderation_type,
"target_type": str(result[3]),
"target_id": int(result[4]),
"moderator_id": int(result[5]),
"role_id": int(result[6]) if result[6] is not None else None,
"duration": duration,
"end_timestamp": datetime.fromtimestamp(result[8]) if result[8] is not None else None,
"reason": result[9],
"resolved": bool(result[10]),
"resolved_by": result[11],
"resolve_reason": result[12],
"expired": bool(result[13]),
"changes": change_obj_list,
"metadata": metadata if metadata else {},
}
return await cls.from_dict(bot=bot, data=case)
@staticmethod
async def connect() -> Connection:
"""Connects to the SQLite database, and returns a connection object."""
try:
connection = await aiosqlite_connect(database=data_manager.cog_data_path(raw_name="Aurora") / "aurora.db")
return connection
except OperationalError as e:
logger.error("Unable to access the SQLite database!\nError:\n%s", e.msg)
raise ConnectionRefusedError(f"Unable to access the SQLite Database!\n{e.msg}") from e
@classmethod
async def execute(cls, query: str, parameters: tuple | None = None, bot: Red | None = None, guild_id: int | None = None, cursor: Cursor | None = None, return_obj: bool = True) -> Union[Tuple["Moderation"], Iterable[Row]]:
"""Executes a query on the database.
Arguments:
query (str): The query to execute.
parameters (tuple): The parameters to pass to the query.
bot (Red): The bot instance.
guild_id (int): The ID of the guild to execute the query on.
cursor (Cursor): The cursor to use for the query.
return_obj (bool): Whether to return the case object(s). Defaults to `True`. If `False`, returns a `Iterable` of `aiosqlite.Row` objects.
Returns: The result of the query, either as a `Tuple` of `Moderation` objects or an `Iterable` of `aiosqlite.Row` objects.
"""
logger.trace('Executing query: "%s" with parameters "%s"', query, parameters)
if not parameters:
parameters = ()
if not cursor:
no_cursor = True
database = await cls.connect()
cursor = await database.cursor()
else:
no_cursor = False
try:
await cursor.execute(query, parameters)
except OperationalError as e:
logger.error('Error executing query: "%s" with parameters "%s"\nError:\n%s', query, parameters, e)
raise OperationalError(f'Error executing query: "{query}" with parameters "{parameters}"') from e
results = await cursor.fetchall()
await database.commit()
if no_cursor:
await cursor.close()
await database.close()
if results and return_obj and bot and guild_id:
cases = []
for result in results:
if result[0] == 0:
continue
case = await cls.from_result(bot=bot, result=result, guild_id=guild_id)
cases.append(case)
return tuple(cases)
return results
@classmethod
async def get_latest(cls, bot: Red, guild_id: int, before: datetime | None = None, after: datetime | None = None, limit: int | None = None, offset: int = 0, types: Iterable[Type] | None = None, expired: bool | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
params = []
query = f"SELECT * FROM moderation_{guild_id}"
conditions = []
if types:
conditions.append(f"moderation_type IN ({', '.join(['?' for _ in types])})")
params.extend([t.key for t in types])
if before:
conditions.append("timestamp < ?")
params.append(int(before.timestamp()))
if after:
conditions.append("timestamp > ?")
params.append(int(after.timestamp()))
if expired is not None:
conditions.append("expired = ?")
params.append(int(expired))
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY moderation_id DESC"
if limit:
query += " LIMIT ? OFFSET ?"
params.extend((limit, offset))
query += ";"
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=tuple(params) if params else (), cursor=cursor)
@classmethod
async def get_next_case_number(cls, bot: Red, guild_id: int, cursor: Cursor | None = None) -> int:
result = await cls.get_latest(bot=bot, guild_id=guild_id, cursor=cursor, limit=1)
return (result[0].moderation_id + 1) if result else 1
@classmethod
async def find_by_id(cls, bot: Red, moderation_id: int, guild_id: int, cursor: Cursor | None = None) -> "Moderation":
query = f"SELECT * FROM moderation_{guild_id} WHERE moderation_id = ?;"
case = await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderation_id,), cursor=cursor)
if case:
return case[0]
raise ValueError(f"Case {moderation_id} not found in moderation_{guild_id}!")
@classmethod
async def find_by_target(cls, bot: Red, guild_id: int, target: int, before: datetime = None, after: datetime = None, types: Iterable[Type] | None = None, expired: bool | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id} WHERE target_id = ?"
params = [target]
if types:
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
for t in types:
params.append(t.key)
if before:
query += " AND timestamp < ?"
params.append(int(before.timestamp()))
if after:
query += " AND timestamp > ?"
params.append(int(after.timestamp()))
if expired is not None:
query += " AND expired = ?"
params.append(int(expired))
query += " ORDER BY moderation_id DESC;"
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=params, cursor=cursor)
@classmethod
async def find_by_moderator(cls, bot: Red, guild_id: int, moderator: int, before: datetime = None, after: datetime = None, types: Iterable[Type] | None = None, expired: bool | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id} WHERE moderator_id = ?"
params = [moderator]
if types:
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
for t in types:
params.append(t.key)
if before:
query += " AND timestamp < ?"
params.append(int(before.timestamp()))
if after:
query += " AND timestamp > ?"
params.append(int(after.timestamp()))
if expired is not None:
query += " AND expired = ?"
params.append(int(expired))
query += " ORDER BY moderation_id DESC;"
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=params, cursor=cursor)
@classmethod
async def log(
cls,
bot: Red,
guild_id: int,
moderator_id: int,
moderation_type: Type,
target_type: str,
target_id: int,
role_id: int | None = None,
duration: timedelta | None = None,
reason: str | None = None,
database: sqlite3.Connection | None = None,
timestamp: datetime | None = None,
resolved: bool = False,
resolved_by: int | None = None,
resolved_reason: str | None = None,
expired: bool | None = None,
changes: list | None = None,
metadata: dict | None = None,
return_obj: bool = True,
) -> Union["Moderation", int]:
"""Logs a moderation case in the database.
Args:
bot (Red): The bot instance.
guild_id (int): The ID of the guild to log the case in.
moderator_id (int): The ID of the moderator who issued the case.
moderation_type (Type): The type of moderation case. See `aurora.models.moderation_types` for the built-in options.
target_type (str): The type of target. Should be either `user` or `channel`.
target_id (int): The ID of the target.
role_id (int): The ID of the role, if applicable.
duration (timedelta): The duration of the case, if applicable.
reason (str): The reason for the case.
database (sqlite3.Connection): The database connection to use to log the case. A connection will be automatically created if not provided.
timestamp (datetime): The timestamp of the case. Will be automatically generated if not provided.
resolved (bool): Whether the case is resolved.
resolved_by (int): The ID of the user who resolved the case.
resolved_reason (str): The reason the case was resolved.
expired (bool): Whether the case is expired.
changes (list): A list of changes to log. You usually shouldn't pass this, as it's automatically generated by the `/edit` and `/resolve` commands.
metadata (dict): A dictionary of metadata to store with the case.
return_obj (bool): Whether to return the case object. Defaults to `True`. If `False`, returns the case ID.
Returns:
Union[Moderation, int]: The `Moderation` object if `return_obj` is `True`, otherwise the case ID.
"""
from ..utilities.json import dumps
if not timestamp:
timestamp = datetime.fromtimestamp(time())
elif not isinstance(timestamp, datetime):
timestamp = datetime.fromtimestamp(timestamp)
if duration == "NULL":
duration = None
if duration is not None:
end_timestamp = timestamp + duration
else:
duration = None
end_timestamp = None
if not expired:
if end_timestamp:
expired = bool(timestamp > end_timestamp)
else:
expired = False
if reason == "NULL":
reason = None
if resolved_by in ["NULL", "?"]:
resolved_by = None
if resolved_reason == "NULL":
resolved_reason = None
if role_id == 0:
role_id = None
if not database:
database = await cls.connect()
close_db = True
else:
close_db = False
moderation_id = await cls.get_next_case_number(bot=bot, guild_id=guild_id)
case = {
"moderation_id": moderation_id,
"timestamp": timestamp.timestamp(),
"moderation_type": moderation_type.key,
"target_type": target_type,
"target_id": target_id,
"moderator_id": moderator_id,
"role_id": role_id,
"duration": timedelta_to_string(duration) if duration else None,
"end_timestamp": end_timestamp.timestamp() if end_timestamp else None,
"reason": reason,
"resolved": resolved,
"resolved_by": resolved_by,
"resolve_reason": resolved_reason,
"expired": expired,
"changes": dumps(changes),
"metadata": dumps(metadata),
}
sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
await database.execute(sql, tuple(case.values()))
await database.commit()
if close_db:
await database.close()
logger.verbose(
"Row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
guild_id,
case["moderation_id"],
case["timestamp"],
case["moderation_type"],
case["target_type"],
case["target_id"],
case["moderator_id"],
case["role_id"],
case["duration"],
case["end_timestamp"],
case["reason"],
case["resolved"],
case["resolved_by"],
case["resolve_reason"],
case["expired"],
case["changes"],
case["metadata"],
)
if return_obj:
return await cls.find_by_id(bot=bot, moderation_id=moderation_id, guild_id=guild_id)
return moderation_id

View file

@ -0,0 +1,964 @@
# pylint: disable=abstract-method
from datetime import timedelta
from math import ceil
from time import time
from typing import Tuple
from discord import AllowedMentions, File, Interaction, Member, Object, Role, TextChannel, User
from discord.abc import Messageable
from discord.errors import Forbidden, HTTPException, NotFound
from redbot.core import app_commands, commands
from redbot.core.bot import Red
from redbot.core.commands.converter import parse_relativedelta, parse_timedelta
from redbot.core.utils.chat_formatting import bold, error, humanize_timedelta, inline
from ..utilities.config import config
from ..utilities.factory import message_factory, resolve_factory
from ..utilities.logger import logger
from ..utilities.utils import get_footer_image, log, send_evidenceformat, timedelta_from_relativedelta
from .moderation import Moderation
from .type import Type, type_registry
def get_icon(bot: Red) -> File:
cog = bot.get_cog("Aurora")
if cog:
return get_footer_image(cog)
raise ValueError("Aurora cog not found. How was this managed?")
class Note(Type):
key = "note"
string = "note"
verb = "noted"
def void(self) -> None:
return None
@classmethod
async def handler(cls, ctx: commands.Context, target: Member | User, silent: bool, reason: str) -> "Note":
response = await ctx.send(content=f"{target.mention} has {cls.embed_desc}{cls.verb}!\n**Reason** - `{reason}`")
if silent is False:
try:
embed = await message_factory(
bot=ctx.bot,
color=await ctx.embed_color(),
guild=ctx.guild,
moderator=ctx.author,
reason=reason,
moderation_type=cls(),
response=response,
)
await target.send(embed=embed, file=get_icon(ctx.bot))
except HTTPException:
pass
moderation = await Moderation.log(
bot=ctx.bot,
guild_id=ctx.guild.id,
moderator_id=ctx.author.id,
moderation_type=cls(),
target_type="user",
target_id=target.id,
role_id=None,
duration=None,
reason=reason,
)
await response.edit(content=f"{target.mention} has {cls.embed_desc}{cls.verb}! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`")
await log(ctx=ctx, moderation_id=moderation.id)
await send_evidenceformat(ctx=ctx, moderation_id=moderation.id)
return cls()
@classmethod
async def resolve_handler(cls, moderation: Moderation, reason: str) -> Tuple[bool, str]:
if await config.guild(moderation.guild).dm_users() is True:
try:
target = await moderation.bot.fetch_user(moderation.target_id)
embed = await resolve_factory(moderation=moderation, reason=reason)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except (Forbidden, HTTPException, NotFound):
pass
return True, ""
class Warn(Type):
key = "warn"
string = "warn"
verb = "warned"
def void(self) -> None:
return None
@classmethod
async def handler(cls, ctx: commands.Context, target: Member | User, silent: bool, reason: str) -> "Warn":
response = await ctx.send(content=f"{target.mention} has {cls.embed_desc}{cls.verb}!\n**Reason** - `{reason}`")
if silent is False:
try:
embed = await message_factory(
bot=ctx.bot,
color=await ctx.embed_color(),
guild=ctx.guild,
moderator=ctx.author,
reason=reason,
moderation_type=cls(),
response=response,
)
await target.send(embed=embed, file=get_icon(ctx.bot))
except HTTPException:
pass
moderation = await Moderation.log(
bot=ctx.bot,
guild_id=ctx.guild.id,
moderator_id=ctx.author.id,
moderation_type=cls(),
target_type="user",
target_id=target.id,
role_id=None,
duration=None,
reason=reason,
)
await response.edit(content=f"{target.mention} has {cls.embed_desc}{cls.verb}! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`")
await log(ctx=ctx, moderation_id=moderation.id)
await send_evidenceformat(ctx=ctx, moderation_id=moderation.id)
return cls()
@classmethod
async def resolve_handler(cls, moderation: Moderation, reason: str) -> Tuple[bool, str]:
if await config.guild(moderation.guild).dm_users() is True:
try:
target = await moderation.bot.fetch_user(moderation.target_id)
embed = await resolve_factory(moderation=moderation, reason=reason)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except (Forbidden, HTTPException, NotFound):
pass
return True, ""
class AddRole(Type):
key = "addrole"
string = "addrole"
verb = "added a role to"
embed_desc = "been given the "
def void(self) -> None:
return None
@classmethod
async def handler(cls, ctx: commands.Context, target: Member, role: Role, silent: bool, duration: str | None = None, reason: str | None = None):
addrole_whitelist = await config.guild(ctx.guild).addrole_whitelist()
if not addrole_whitelist:
await ctx.send(
content=error("There are no whitelisted roles set for this server!"),
ephemeral=True,
)
return
if duration is not None:
try:
parsed_time = parse_relativedelta(argument=duration)
if parsed_time is None:
raise commands.BadArgument()
parsed_time = timedelta_from_relativedelta(relativedelta=parsed_time)
except (commands.BadArgument, ValueError):
await ctx.send(content=error(text="Please provide a valid duration!"), ephemeral=True)
return cls()
else:
parsed_time = None
if role.id not in addrole_whitelist:
await ctx.send(content=error("That role isn't whitelisted!"), ephemeral=True)
return
if role.id in [user_role.id for user_role in target.roles]:
await ctx.send(
content=error(f"{target.mention} already has this role!"),
ephemeral=True,
)
return
response = await ctx.send(content=f"{target.mention} has {cls.embed_desc}{role.mention} role{' for ' + humanize_timedelta(timedelta=parsed_time) if parsed_time else ''}!\n**Reason** - `{reason}`")
if silent is False:
try:
embed = await message_factory(
bot=ctx.bot,
color=await ctx.embed_color(),
guild=ctx.guild,
moderator=ctx.author,
reason=reason,
moderation_type=cls(),
response=response,
duration=parsed_time,
)
await target.send(embed=embed, file=get_icon(ctx.bot))
except HTTPException:
pass
await target.add_roles(
role,
reason=f"Role added by {ctx.author.id}{' for ' + humanize_timedelta(timedelta=parsed_time) if parsed_time else ''} for: {reason}",
)
moderation = await Moderation.log(
bot=ctx.bot,
guild_id=ctx.guild.id,
moderator_id=ctx.author.id,
moderation_type=cls(),
target_type="user",
target_id=target.id,
role_id=role.id,
duration=parsed_time,
reason=reason,
)
await response.edit(
content=f"{target.mention} has {cls.embed_desc}{role.mention} role{' for ' + humanize_timedelta(timedelta=parsed_time) if parsed_time else ''}! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`",
)
await log(ctx=ctx, moderation_id=moderation.id)
await send_evidenceformat(ctx=ctx, moderation_id=moderation.id)
return cls()
@classmethod
async def duration_edit_handler(cls, interaction: Interaction, old_moderation: Moderation, new_moderation: Moderation) -> bool: # pylint: disable=unused-argument
return True
@classmethod
async def expiry_handler(cls, moderation: Moderation) -> int:
try:
target = moderation.guild.get_member(moderation.target_id)
if not target:
try:
target = await moderation.guild.fetch_member(moderation.target_id)
except NotFound:
return 0
await target.remove_roles(Object(moderation.role_id), reason=f"Automatic role removal from case #{moderation.id}")
if await config.guild(moderation.guild).dm_users() is True:
try:
embed = await message_factory(
bot=moderation.bot,
color=await moderation.bot.get_embed_color(moderation.guild.channels[0]),
guild=moderation.guild,
reason=f"Automatic role removal from case #{moderation.id}",
moderation_type=type_registry["removerole"],
moderator=None,
duration=None,
response=None,
case=False,
)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except HTTPException:
pass
logger.trace(
"Removed role %s from %s (%s)",
moderation.role_id,
target.name,
target.id,
)
return 1
except (
NotFound,
Forbidden,
HTTPException,
) as e:
logger.error(
"Removing the role %s from user %s failed due to: \n%s",
moderation.role_id,
moderation.target_id,
e,
)
return 0
@classmethod
async def resolve_handler(cls, moderation: Moderation, reason: str) -> Tuple[bool, str]:
try:
target = await moderation.guild.fetch_member(moderation.target_id)
await target.remove_roles(Object(moderation.role_id), reason=reason)
if await config.guild(moderation.guild).dm_users() is True:
try:
embed = await resolve_factory(moderation=moderation, reason=reason)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except HTTPException:
pass
logger.trace(
"Removed role %s from %s (%s)",
moderation.role_id,
target.name,
target.id,
)
return True, ""
except (NotFound, Forbidden, HTTPException) as e:
logger.error(
"Failed to remove role %s from user %s (%s)\n%s",
moderation.role_id,
target.name,
target.id,
e,
)
return False, "Failed to remove role from user."
class RemoveRole(Type):
key = "removerole"
string = "removerole"
verb = "removed a role from"
embed_desc = "had the "
def void(self) -> None:
return None
@classmethod
async def handler(cls, ctx: commands.Context, target: Member, role: Role, silent: bool, duration: str | None = None, reason: str | None = None):
addrole_whitelist = await config.guild(ctx.guild).addrole_whitelist()
if not addrole_whitelist:
await ctx.send(
content=error("There are no whitelisted roles set for this server!"),
ephemeral=True,
)
return
if duration is not None:
try:
parsed_time = parse_relativedelta(argument=duration)
if parsed_time is None:
raise commands.BadArgument()
parsed_time = timedelta_from_relativedelta(relativedelta=parsed_time)
except (commands.BadArgument, ValueError):
await ctx.send(content=error(text="Please provide a valid duration!"), ephemeral=True)
return cls()
else:
parsed_time = None
if role.id not in addrole_whitelist:
await ctx.send(content=error("That role isn't whitelisted!"), ephemeral=True)
return
if role.id not in [user_role.id for user_role in target.roles]:
await ctx.send(
content=error(f"{target.mention} does not have this role!"),
ephemeral=True,
)
return
response = await ctx.send(content=f"{target.mention} has {cls.embed_desc}{role.mention} role removed{' for ' + humanize_timedelta(timedelta=parsed_time) if parsed_time else ''}!\n**Reason** - `{reason}`")
if silent is False:
try:
embed = await message_factory(
bot=ctx.bot,
color=await ctx.embed_color(),
guild=ctx.guild,
moderator=ctx.author,
reason=reason,
moderation_type=cls(),
response=response,
duration=parsed_time,
)
await target.send(embed=embed, file=get_icon(ctx.bot))
except HTTPException:
pass
await target.remove_roles(
role,
reason=f"Role removed by {ctx.author.id}{' for ' + humanize_timedelta(timedelta=parsed_time) if parsed_time else ''} for: {reason}",
)
moderation = await Moderation.log(
bot=ctx.bot,
guild_id=ctx.guild.id,
moderator_id=ctx.author.id,
moderation_type=cls(),
target_type="user",
target_id=target.id,
role_id=role.id,
duration=parsed_time,
reason=reason,
)
await response.edit(
content=f"{target.mention} has {cls.embed_desc}{role.mention} role removed{' for ' + humanize_timedelta(timedelta=parsed_time) if parsed_time else ''}! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`",
)
await log(ctx=ctx, moderation_id=moderation.id)
await send_evidenceformat(ctx=ctx, moderation_id=moderation.id)
return cls()
@classmethod
async def duration_edit_handler(cls, interaction: Interaction, old_moderation: Moderation, new_moderation: Moderation) -> bool: # pylint: disable=unused-argument
return True
@classmethod
async def expiry_handler(cls, moderation: Moderation) -> int:
try:
target = moderation.guild.get_member(moderation.target_id)
if not target:
try:
target = await moderation.guild.fetch_member(moderation.target_id)
except NotFound:
return 0
await target.add_roles(Object(moderation.role_id), reason=f"Automatic role addition from case #{moderation.id}")
if await config.guild(moderation.guild).dm_users() is True:
try:
embed = await message_factory(
bot=moderation.bot,
color=await moderation.bot.get_embed_color(moderation.guild.channels[0]),
guild=moderation.guild,
reason=f"Automatic role addition from case #{moderation.id}",
moderation_type=type_registry["addrole"],
moderator=None,
duration=None,
response=None,
case=False,
)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except HTTPException:
pass
logger.trace(
"Added role %s to %s (%s)",
moderation.role_id,
target.name,
target.id,
)
return 1
except (
NotFound,
Forbidden,
HTTPException,
) as e:
logger.error(
"Adding the role %s to user %s failed due to: \n%s",
moderation.role_id,
moderation.target_id,
e,
)
return 0
@classmethod
async def resolve_handler(cls, moderation: Moderation, reason: str) -> Tuple[bool, str]:
try:
target = await moderation.get_target()
await target.add_roles(Object(moderation.role_id), reason=reason)
if await config.guild(moderation.guild).dm_users() is True:
try:
embed = await resolve_factory(moderation=moderation, reason=reason)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except HTTPException:
pass
logger.trace(
"Added role %s to %s (%s)",
moderation.role_id,
target.name,
target.id,
)
return True, ""
except (NotFound, Forbidden, HTTPException) as e:
logger.error(
"Failed to add role %s to user %s (%s)\n%s",
moderation.role_id,
target.name,
target.id,
e,
)
return False, "Failed to add role to user."
class Mute(Type):
key = "mute"
string = "mute"
verb = "muted"
def void(self) -> None:
return None
@classmethod
async def handler(cls, ctx: commands.Context, target: Member, silent: bool, duration: str, reason: str = None):
if target.is_timed_out() is True:
await ctx.send(
error(f"{target.mention} is already muted!"),
allowed_mentions=AllowedMentions(users=False),
ephemeral=True,
)
return
try:
parsed_time = parse_timedelta(duration, maximum=timedelta(days=28))
if parsed_time is None:
await ctx.send(error("Please provide a valid duration!"), ephemeral=True)
return
except commands.BadArgument:
await ctx.send(error("Please provide a duration that is less than 28 days."), ephemeral=True)
return
await target.timeout(parsed_time, reason=f"Muted by {ctx.author.id} for: {reason}")
response = await ctx.send(content=f"{target.mention} has been muted for {humanize_timedelta(timedelta=parsed_time)}!\n**Reason** - `{reason}`")
if silent is False:
try:
embed = await message_factory(
bot=ctx.bot,
color=await ctx.embed_color(),
guild=ctx.guild,
moderator=ctx.author,
reason=reason,
moderation_type=cls(),
response=response,
duration=parsed_time,
)
await target.send(embed=embed, file=get_icon(ctx.bot))
except HTTPException:
pass
moderation = await Moderation.log(
bot=ctx.bot,
guild_id=ctx.guild.id,
moderator_id=ctx.author.id,
moderation_type=cls(),
target_type="user",
target_id=target.id,
role_id=None,
duration=parsed_time,
reason=reason,
)
await response.edit(content=f"{target.mention} has been muted for {humanize_timedelta(timedelta=parsed_time)}! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`")
await log(ctx=ctx, moderation_id=moderation.id)
await send_evidenceformat(ctx=ctx, moderation_id=moderation.id)
return cls()
@classmethod
async def resolve_handler(cls, moderation: Moderation, reason: str) -> Tuple[bool, str]:
try:
target = await moderation.guild.fetch_member(moderation.target_id)
except (Forbidden, HTTPException, NotFound):
return False, "User is not in the server, so I cannot unmute them."
if target.is_timed_out() is False:
return True, ""
await target.timeout(None, reason=reason)
if await config.guild(moderation.guild).dm_users() is True:
try:
embed = await resolve_factory(moderation=moderation, reason=reason)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except (Forbidden, HTTPException):
pass
return True, ""
@classmethod
async def duration_edit_handler(cls, interaction: Interaction, old_moderation: Moderation, new_moderation: Moderation) -> bool: # pylint: disable=unused-argument
if (time() - new_moderation.unix_timestamp) + new_moderation.duration.total_seconds() > 2419200:
await interaction.response.send_message(content=error("Please provide a duration that is less than 28 days from the initial moderation."), ephemeral=True)
return False
try:
member = await interaction.guild.fetch_member(new_moderation.target_id)
await member.timeout(
new_moderation.duration,
reason=f"Case #{new_moderation.id:,} edited by {interaction.user.id}",
)
except NotFound:
pass
return True
class Unmute(Type):
key = "unmute"
string = "unmute"
verb = "unmuted"
def void(self) -> None:
return None
@classmethod
async def handler(cls, ctx: commands.Context, target: Member, silent: bool, reason: str = None):
if target.is_timed_out() is False:
await ctx.send(
content=error(f"{target.mention} is not muted!"),
allowed_mentions=AllowedMentions(users=False),
ephemeral=True,
)
return
if reason:
await target.timeout(None, reason=f"{cls.verb.title()} by {ctx.author.id} for: {reason}")
else:
await target.timeout(None, reason=f"{cls.verb.title()} by {ctx.author.id}")
reason = "No reason given."
response_message = await ctx.send(content=f"{target.mention} has been {cls.verb}!\n**Reason** - `{reason}`")
if silent is False:
try:
embed = await message_factory(
bot=ctx.bot,
color=await ctx.embed_color(),
guild=ctx.guild,
moderator=ctx.author,
reason=reason,
moderation_type=cls(),
response=response_message,
)
await target.send(embed=embed, file=get_icon(ctx.bot))
except HTTPException:
pass
moderation = await Moderation.log(
bot=ctx.bot,
guild_id=ctx.guild.id,
moderator_id=ctx.author.id,
moderation_type=cls(),
target_type="user",
target_id=target.id,
role_id=None,
duration=None,
reason=reason,
)
await response_message.edit(content=f"{target.mention} has been {cls.verb}! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`")
await log(ctx=ctx, moderation_id=moderation.id)
await send_evidenceformat(ctx=ctx, moderation_id=moderation.id)
class Kick(Type):
key = "kick"
string = "kick"
verb = "kicked"
removes_from_guild = True
def void(self) -> None:
return None
@classmethod
async def handler(cls, ctx: commands.Context, target: Member | User, silent: bool, reason: str = None) -> "Kick":
"""Kick a user."""
bot = ctx.bot
response_message = await ctx.send(f"{target.mention} has been {cls.verb}!\n{bold('Reason:')} {inline(reason)}")
if silent is False:
try:
embed = await message_factory(bot=bot, color=await ctx.embed_color(), guild=ctx.guild, reason=reason, moderation_type=cls(), moderator=ctx.author, duration=None, response=response_message)
await target.send(embed=embed, file=get_icon(bot))
except HTTPException:
pass
await target.kick(reason=f"{str.title(cls.verb)} by {ctx.author.id} for: {reason}")
moderation = await Moderation.log(bot=bot, guild_id=ctx.guild.id, moderator_id=ctx.author.id, moderation_type=cls(), target_type="user", target_id=target.id, role_id=None, duration=None, reason=reason)
await response_message.edit(content=f"{target.mention} has been {cls.verb}! (Case {inline(f'#{moderation.id}')})\n{bold('Reason:')} {inline(reason)}")
await log(ctx=ctx, moderation_id=moderation.id)
await send_evidenceformat(ctx=ctx, moderation_id=moderation.id)
return cls()
@classmethod
async def resolve_handler(cls, moderation: Moderation, reason: str) -> Tuple[bool, str]:
if await config.guild(moderation.guild).dm_users() is True:
try:
target = await moderation.bot.fetch_user(moderation.target_id)
embed = await resolve_factory(moderation=moderation, reason=reason)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except (Forbidden, HTTPException, NotFound):
pass
return True, ""
class Ban(Type):
key = "ban"
string = "ban"
verb = "banned"
removes_from_guild = True
def void(self) -> None:
return None
@classmethod
async def handler(cls, ctx: commands.Context, target: Member | User, silent: bool, reason: str = None, delete_messages: app_commands.Choice | None = None) -> "Ban":
"""Ban a user."""
bot = ctx.bot
try:
await ctx.guild.fetch_ban(target)
await ctx.send(content=error(f"{target.mention} is already {cls.verb}!"), ephemeral=True)
return
except NotFound:
pass
if delete_messages is None:
delete_messages_seconds = 0
else:
delete_messages_seconds = delete_messages.value
response_message = await ctx.send(f"{target.mention} has been {cls.verb}!\n{bold('Reason:')} {inline(reason)}")
if silent is False:
try:
embed = await message_factory(bot=bot, color=await ctx.embed_color(), guild=ctx.guild, reason=reason, moderation_type=cls(), moderator=ctx.author, duration=None, response=response_message)
await target.send(embed=embed, file=get_icon(bot))
except HTTPException:
pass
await ctx.guild.ban(target, reason=f"{str.title(cls.verb)} by {ctx.author.id} for: {reason}", delete_message_seconds=delete_messages_seconds)
moderation = await Moderation.log(bot=bot, guild_id=ctx.guild.id, moderator_id=ctx.author.id, moderation_type=cls(), target_type="user", target_id=target.id, role_id=None, duration=None, reason=reason)
await response_message.edit(content=f"{target.mention} has been {cls.verb}! (Case {inline(f'#{moderation.id}')})\n{bold('Reason:')} {inline(reason)}")
await log(ctx=ctx, moderation_id=moderation.id)
await send_evidenceformat(ctx=ctx, moderation_id=moderation.id)
return cls()
@classmethod
async def resolve_handler(cls, moderation: Moderation, reason: str) -> Tuple[bool, str]:
try:
target = await moderation.bot.fetch_user(moderation.target_id)
except (HTTPException, NotFound):
return False, "Fetching the target failed, so I cannot unban them."
try:
await moderation.guild.unban(user=target, reason=reason)
except (NotFound, Forbidden, HTTPException) as e:
if e == NotFound:
return True, ""
return False, "I do not have permission to unban this user."
if await config.guild(moderation.guild).dm_users() is True:
try:
embed = await resolve_factory(moderation=moderation, reason=reason)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except (Forbidden, HTTPException):
pass
return True, ""
class Tempban(Ban):
key = "tempban"
string = "tempban"
verb = "tempbanned"
removes_from_guild = True
def void(self) -> None:
return None
@classmethod
async def handler(cls, ctx: commands.Context, target: Member | User, silent: bool, duration: str, reason: str = None, delete_messages: app_commands.Choice | None = None) -> "Ban": # pylint: disable=arguments-renamed
"""Ban a user."""
bot = ctx.bot
try:
await ctx.guild.fetch_ban(target)
await ctx.send(content=error(f"{target.mention} is already {Ban.verb}!"), ephemeral=True)
return
except NotFound:
pass
if delete_messages is None:
delete_messages_seconds = 0
else:
delete_messages_seconds = delete_messages.value
try:
parsed_time = parse_relativedelta(argument=duration)
if parsed_time is None:
raise commands.BadArgument()
parsed_time = timedelta_from_relativedelta(relativedelta=parsed_time)
except (commands.BadArgument, ValueError):
await ctx.send(content=error(text="Please provide a valid duration!"), ephemeral=True)
return cls()
response_message = await ctx.send(content=f"{target.mention} has been {cls.verb} for {humanize_timedelta(timedelta=parsed_time)}!\n{bold(text='Reason:')} {inline(text=reason)}")
if silent is False:
try:
embed = await message_factory(bot=bot, color=await ctx.embed_color(), guild=ctx.guild, reason=reason, moderation_type=cls(), moderator=ctx.author, duration=parsed_time, response=response_message)
await target.send(embed=embed, file=get_icon(bot))
except HTTPException:
pass
await ctx.guild.ban(target, reason=f"{str.title(cls.verb)} by {ctx.author.id} for: {reason} (Duration: {parsed_time})", delete_message_seconds=delete_messages_seconds)
moderation = await Moderation.log(bot=bot, guild_id=ctx.guild.id, moderator_id=ctx.author.id, moderation_type=cls(), target_type="user", target_id=target.id, role_id=None, duration=parsed_time, reason=reason)
await response_message.edit(content=f"{target.mention} has been {cls.verb} for {humanize_timedelta(timedelta=parsed_time)}! (Case {inline(text=f'#{moderation.id}')})\n{bold(text='Reason:')} {inline(reason)}")
await log(ctx, moderation.id)
await send_evidenceformat(ctx, moderation.id)
return cls()
@classmethod
async def expiry_handler(cls, moderation: Moderation) -> int:
reason = f"Automatic {Unban.string} from case #{moderation.id}"
try:
target = moderation.bot.get_user(moderation.target_id)
if not target:
try:
target = await moderation.bot.fetch_user(moderation.target_id)
except NotFound:
return 0
await moderation.guild.unban(user=target, reason=reason)
if await config.guild(moderation.guild).dm_users() is True:
try:
embed = await message_factory(
bot=moderation.bot,
color=await moderation.bot.get_embed_color(moderation.guild.channels[0]),
guild=moderation.guild,
reason=reason,
moderation_type=type_registry["unban"],
moderator=None,
duration=None,
response=None,
case=False,
)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except HTTPException:
pass
logger.trace(
"%s %s (%s) from %s (%s)",
Unban.verb.title(),
target.name,
target.id,
moderation.guild.name,
moderation.guild.id,
)
return 1
except (NotFound, Forbidden, HTTPException) as e:
logger.error(
"Failed to %s %s (%s) from %s (%s)\n%s",
Unban.string,
target.name,
target.id,
moderation.guild.name,
moderation.guild.id,
e,
)
return 0
@classmethod
async def duration_edit_handler(cls, interaction: Interaction, old_moderation: Moderation, new_moderation: Moderation) -> bool: # pylint: disable=unused-argument
return True
class Softban(Type):
key = "softban"
string = "softban"
verb = "softbanned"
removes_from_guild = True
def void(self) -> None:
return None
@classmethod
async def handler(cls, ctx: commands.Context, target: Member | User, silent: bool, reason: str = None, delete_messages: app_commands.Choice | None = None) -> "Softban":
"""Softban a user."""
bot = ctx.bot
try:
await ctx.guild.fetch_ban(target)
await ctx.send(content=error(f"{target.mention} is already {Ban.verb}!"), ephemeral=True)
except NotFound:
pass
if delete_messages is None:
delete_messages_seconds = 0
else:
delete_messages_seconds = delete_messages.value
response_message = await ctx.send(f"{target.mention} has been {cls.verb}!\n{bold('Reason:')} {inline(reason)}")
if silent is False:
try:
embed = await message_factory(bot=bot, color=await ctx.embed_color(), guild=ctx.guild, reason=reason, moderation_type=cls(), moderator=ctx.author, duration=None, response=response_message)
await target.send(embed=embed, file=get_icon(bot))
except HTTPException:
pass
await ctx.guild.ban(target, reason=f"{str.title(cls.verb)} by {ctx.author.id} for: {reason}", delete_message_seconds=delete_messages_seconds)
await ctx.guild.unban(target, reason=f"{str.title(cls.verb)} by {ctx.author.id} for: {reason}")
moderation = await Moderation.log(bot=bot, guild_id=ctx.guild.id, moderator_id=ctx.author.id, moderation_type=cls(), target_type="user", target_id=target.id, role_id=None, duration=None, reason=reason)
await response_message.edit(content=f"{target.mention} has been {cls.verb}! (Case {inline(f'#{moderation.id}')})\n{bold('Reason:')} {inline(reason)}")
await log(ctx, moderation.id)
await send_evidenceformat(ctx, moderation.id)
return cls()
@classmethod
async def resolve_handler(cls, moderation: Moderation, reason: str) -> Tuple[bool, str]:
if await config.guild(moderation.guild).dm_users() is True:
try:
target = await moderation.bot.fetch_user(moderation.target_id)
embed = await resolve_factory(moderation=moderation, reason=reason)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except (Forbidden, HTTPException, NotFound):
pass
return True, ""
class Unban(Type):
key = "unban"
string = "unban"
verb = "unbanned"
removes_from_guild = True
def void(self) -> None:
return None
@classmethod
async def handler(cls, ctx: commands.Context, target: Member | User, silent: bool, reason: str = None) -> "Unban":
"""Unban a user."""
bot = ctx.bot
try:
await ctx.guild.fetch_ban(target)
except NotFound:
await ctx.send(content=error(f"{target.mention} is not {Ban.verb}!"), ephemeral=True)
return
response_message = await ctx.send(f"{target.mention} has been {cls.verb}!\n{bold('Reason:')} {inline(reason)}")
if silent is False:
try:
embed = await message_factory(bot=bot, color=await ctx.embed_color(), guild=ctx.guild, reason=reason, moderation_type=cls(), moderator=ctx.author, duration=None, response=response_message)
await target.send(embed=embed, file=get_icon(bot))
except HTTPException:
pass
await ctx.guild.unban(target, reason=f"{str.title(cls.verb)} by {ctx.author.id} for: {reason}")
moderation = await Moderation.log(bot=bot, guild_id=ctx.guild.id, moderator_id=ctx.author.id, moderation_type=cls(), target_type="user", target_id=target.id, role_id=None, duration=None, reason=reason)
await response_message.edit(content=f"{target.mention} has been {cls.verb}! (Case {inline(f'#{moderation.id}')})\n{bold('Reason:')} {inline(reason)}")
await log(ctx, moderation.id)
await send_evidenceformat(ctx, moderation.id)
return cls()
class Slowmode(Type):
key = "slowmode"
string = "slowmode"
verb = "set the slowmode in"
channel = True
def void(self) -> None:
return None
@classmethod
async def handler(cls, ctx: commands.Context, target: Messageable, silent: bool, duration: str, reason: str) -> "Slowmode": # pylint: disable=unused-argument
"""Set the slowmode in a channel."""
bot = ctx.bot
try:
parsed_time = parse_relativedelta(argument=duration)
if parsed_time is None:
raise commands.BadArgument()
parsed_time = timedelta_from_relativedelta(relativedelta=parsed_time)
except (commands.BadArgument, ValueError):
await ctx.send(content=error(text="Please provide a valid duration!"), ephemeral=True)
return cls()
if ceil(parsed_time.total_seconds()) > 21600:
await ctx.send(content=error(text="The slowmode duration cannot exceed 6 hours!"), ephemeral=True)
return cls()
if isinstance(target, TextChannel):
await target.edit(slowmode_delay=ceil(parsed_time.total_seconds()))
moderation = await Moderation.log(bot=bot, guild_id=ctx.guild.id, moderator_id=ctx.author.id, moderation_type=cls(), target_type="channel", target_id=target.id, role_id=None, duration=parsed_time, reason=None)
await ctx.send(content=f"{ctx.author.mention} has {cls.verb} {target.mention} to {humanize_timedelta(timedelta=parsed_time)}!\n{bold(text='Reason:')} {inline(text=reason)}")
await log(ctx=ctx, moderation_id=moderation.id)
return cls()
class Lockdown(Type):
key = "lockdown"
string = "lockdown"
verb = "locked down"
channel = True
def void(self) -> None:
return None

90
aurora/models/partials.py Normal file
View file

@ -0,0 +1,90 @@
from discord import ChannelType, Forbidden, Guild, HTTPException, InvalidData, NotFound, Role, User
from discord.abc import Messageable
from redbot.core.bot import Red
from .base import AuroraBaseModel, AuroraGuildModel
class PartialUser(AuroraBaseModel):
id: int
username: str
discriminator: int
_obj: User | None
@property
def name(self):
return f"{self.username}#{self.discriminator}" if self.discriminator != 0 else self.username
def __str__(self):
return self.name
def __repr__(self):
return f"<{self.__class__.__name__} id={self.id}>"
@classmethod
async def from_id(cls, bot: Red, user_id: int) -> "PartialUser":
user = bot.get_user(user_id)
if not user:
try:
user = await bot.fetch_user(user_id)
return cls(bot=bot, id=user.id, username=user.name, discriminator=user.discriminator, _obj=user)
except NotFound:
return cls(bot=bot, id=user_id, username="Deleted User", discriminator=0, _obj=None)
return cls(bot=bot, id=user.id, username=user.name, discriminator=user.discriminator, _obj=user)
class PartialChannel(AuroraGuildModel):
id: int
name: str
type: ChannelType
_obj: Messageable | None
@property
def mention(self):
if self.name in ["Deleted Channel", "Forbidden Channel"]:
return self.name
return f"<#{self.id}>"
def __str__(self):
return self.mention
def __repr__(self):
return f"<{self.__class__.__name__} id={self.id} guild_id={self.guild_id}>"
@classmethod
async def from_id(cls, bot: Red, channel_id: int, guild: Guild) -> "PartialChannel":
channel = bot.get_channel(channel_id)
if not channel:
try:
channel = await bot.fetch_channel(channel_id)
return cls(bot=bot, guild_id=channel.guild.id, guild=guild, id=channel.id, name=channel.name, type=channel.type, _obj=channel)
except (NotFound, InvalidData, HTTPException, Forbidden) as e:
if e == Forbidden:
return cls(bot=bot, guild_id=0, id=channel_id, name="Forbidden Channel")
return cls(bot=bot, guild_id=0, id=channel_id, name="Deleted Channel", type=ChannelType.text, _obj=None)
return cls(bot=bot, guild_id=channel.guild.id, guild=guild, id=channel.id, name=channel.name, type=channel.type, _obj=channel)
class PartialRole(AuroraGuildModel):
id: int
name: str
_obj: Role | None
@property
def mention(self):
if self.name in ["Deleted Role", "Forbidden Role"]:
return self.name
return f"<@&{self.id}>"
def __str__(self):
return self.mention
def __repr__(self) -> str:
return f"<{self.__class__.__name__} id={self.id} guild_id={self.guild_id}>"
@classmethod
async def from_id(cls, bot: Red, guild: Guild, role_id: int) -> "PartialRole":
role = guild.get_role(role_id)
if not role:
return cls(bot=bot, guild_id=guild.id, id=role_id, name="Deleted Role", _obj=None)
return cls(bot=bot, guild_id=guild.id, id=role.id, name=role.name, _obj=role)

98
aurora/models/type.py Normal file
View file

@ -0,0 +1,98 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, Tuple
from class_registry import ClassRegistry
from class_registry.base import AutoRegister
from discord import Interaction, Member, User
from discord.abc import Messageable
from redbot.core import commands
type_registry: Dict["str", "Type"] = ClassRegistry(attr_name="key", unique=True)
class Type(AutoRegister(type_registry), ABC):
"""This is a base class for moderation types.
Attributes:
key (str): The key to use for this type. This should be unique, as this is how the type is registered internally. Changing this key will break existing cases with this type.
string (str): The string to display for this type.
verb (str): The verb to use for this type.
embed_desc (str): The string to use for embed descriptions.
channel (bool): Whether this type targets channels or users. If this is `true` in a subclass, its overridden handler methods should be typed with `discord.abc.Messageable` instead of `discord.Member | discord.User`.
removes_from_guild (bool): Whether this type's handler removes the target from the guild, or if the moderation is expected to occur whenever the user is not in the guild. This does not actually remove the target from the guild, the handler method is responsible for that.
Properties:
name (str): The string to display for this type. This is the same as the `string` attribute.
"""
key = "type"
string = "type"
verb = "typed"
embed_desc = "been "
channel = False
removes_from_guild = False
@abstractmethod
def void(self) -> Any:
"""This method should be overridden by any child classes. This is a placeholder to allow for automatic class registration."""
raise NotImplementedError
@property
def name(self) -> str:
"""Alias for the `string` attribute."""
return self.string
def __str__(self) -> str:
return self.string
def __repr__(self) -> str:
attrs = [
("key", self.key),
("channel", self.channel),
]
joined = " ".join(f"{key}={value!r}" for key, value in attrs)
return f"<{self.__class__.__name__} {joined}>"
@classmethod
async def handler(cls, ctx: commands.Context, target: Member | User | Messageable, silent: bool, **kwargs) -> "Type": # pylint: disable=unused-argument
"""This method should be overridden by any child classes, but should retain the same starting keyword arguments.
Arguments:
ctx (commands.Context): The context of the command.
target (discord.Member | discord.User | discord.abc.Messageable): The target of the moderation.
silent (bool): Whether details about the moderation should be DM'ed to the target of the moderation.
"""
raise NotImplementedError
@classmethod
async def resolve_handler(cls, moderation, reason: str) -> Tuple[bool, str]: # pylint: disable=unused-argument
"""This method should be overridden by any resolvable child classes, but should retain the same keyword arguments.
If your moderation type should not be resolvable, do not override this.
Arguments:
moderation (aurora.models.Moderation): The moderation to resolve.
reason (str): The reason for resolving the moderation.
"""
raise NotImplementedError
@classmethod
async def expiry_handler(cls, moderation) -> int: # pylint: disable=unused-argument
"""This method should be overridden by any expirable child classes, but should retain the same keyword arguments and return an integer.
If your moderation type should not expire, do not override this, but also do not set an `end_timestamp` when you log your moderation.
Arguments:
moderation (aurora.models.Moderation): The moderation that is expiring.
"""
raise NotImplementedError
@classmethod
async def duration_edit_handler(cls, interaction: Interaction, old_moderation, new_moderation) -> bool: # pylint: disable=unused-argument
"""This method should be overridden by any child classes with editable durations, but should retain the same keyword arguments and should return True if the duration was successfully modified, or False if it was not.
If your moderation type's duration should not be editable, do not override this.
Arguments:
interaction (discord.Interaction): The interaction that triggered the duration edit.
old_moderation (aurora.models.Moderation): The old moderation, from before the `/edit` command was invoked.
new_moderation (aurora.models.Moderation): The current state of the moderation.
"""
raise NotImplementedError