forked from blizzthewolf/SeaCogs
feat(aurora): renamed moderation cog to aurora, converted to sqlite3
This commit is contained in:
parent
9abbe12270
commit
ca50deedd5
14 changed files with 63 additions and 183 deletions
5
aurora/__init__.py
Normal file
5
aurora/__init__.py
Normal file
|
@ -0,0 +1,5 @@
|
|||
from .aurora import Aurora
|
||||
|
||||
|
||||
async def setup(bot):
|
||||
await bot.add_cog(Aurora(bot))
|
1334
aurora/aurora.py
Normal file
1334
aurora/aurora.py
Normal file
File diff suppressed because it is too large
Load diff
0
aurora/importers/__init__.py
Normal file
0
aurora/importers/__init__.py
Normal file
112
aurora/importers/aurora.py
Normal file
112
aurora/importers/aurora.py
Normal file
|
@ -0,0 +1,112 @@
|
|||
import json
|
||||
from datetime import timedelta
|
||||
from typing import Dict
|
||||
|
||||
from discord import ButtonStyle, Interaction, Message, ui
|
||||
from redbot.core import commands
|
||||
|
||||
from ..utilities.database import connect, create_guild_table, mysql_log
|
||||
|
||||
|
||||
class ImportAuroraView(ui.View):
|
||||
def __init__(self, timeout, ctx, message):
|
||||
super().__init__()
|
||||
self.ctx: commands.Context = ctx
|
||||
self.message: Message = message
|
||||
|
||||
@ui.button(label="Yes", style=ButtonStyle.success)
|
||||
async def import_button_y(
|
||||
self, interaction: Interaction, button: ui.Button
|
||||
): # pylint: disable=unused-argument
|
||||
await self.message.delete()
|
||||
await interaction.response.send_message(
|
||||
"Deleting original table...", ephemeral=True
|
||||
)
|
||||
|
||||
database = connect()
|
||||
cursor = database.cursor()
|
||||
|
||||
query = f"DROP TABLE IF EXISTS moderation_{self.ctx.guild.id};"
|
||||
cursor.execute(query)
|
||||
|
||||
cursor.close()
|
||||
database.commit()
|
||||
|
||||
await interaction.edit_original_response(content="Creating new table...")
|
||||
|
||||
await create_guild_table(self.ctx.guild)
|
||||
|
||||
await interaction.edit_original_response(content="Importing moderations...")
|
||||
|
||||
file = await self.ctx.message.attachments[0].read()
|
||||
data: [dict] = sorted(json.loads(file), key=lambda x: x["moderation_id"])
|
||||
|
||||
user_mod_types = ["NOTE", "WARN", "MUTE", "UNMUTE", "KICK", "BAN", "UNBAN"]
|
||||
|
||||
channel_mod_types = ["SLOWMODE", "LOCKDOWN"]
|
||||
|
||||
failed_cases = []
|
||||
|
||||
for case in data:
|
||||
if case["moderation_id"] == 0:
|
||||
continue
|
||||
|
||||
if "target_type" not in case or not case["target_type"]:
|
||||
if case["moderation_type"] in user_mod_types:
|
||||
case["target_type"] = "USER"
|
||||
elif case["moderation_type"] in channel_mod_types:
|
||||
case["target_type"] = "CHANNEL"
|
||||
|
||||
if "role_id" not in case or not case["role_id"]:
|
||||
case["role_id"] = 0
|
||||
|
||||
if "changes" not in case or not case["changes"]:
|
||||
case["changes"] = []
|
||||
|
||||
if "metadata" not in case:
|
||||
metadata = {}
|
||||
else:
|
||||
metadata: Dict[str, any] = case["metadata"]
|
||||
if not metadata['imported_from']:
|
||||
metadata.update({
|
||||
'imported_from': 'Aurora'
|
||||
})
|
||||
|
||||
if case["duration"] != "NULL":
|
||||
hours, minutes, seconds = map(int, case["duration"].split(":"))
|
||||
duration = timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
||||
else:
|
||||
duration = "NULL"
|
||||
|
||||
await mysql_log(
|
||||
self.ctx.guild.id,
|
||||
case["moderator_id"],
|
||||
case["moderation_type"],
|
||||
case["target_type"],
|
||||
case["target_id"],
|
||||
case["role_id"],
|
||||
duration,
|
||||
case["reason"],
|
||||
timestamp=case["timestamp"],
|
||||
resolved=case["resolved"],
|
||||
resolved_by=case["resolved_by"],
|
||||
resolved_reason=case["resolve_reason"],
|
||||
expired=case["expired"],
|
||||
changes=case["changes"],
|
||||
metadata=metadata,
|
||||
database=database,
|
||||
)
|
||||
|
||||
await interaction.edit_original_response(content="Import complete.")
|
||||
if failed_cases:
|
||||
await interaction.edit_original_response(
|
||||
content=f"Import complete.\n*Failed to import the following cases:*\n```{failed_cases}```"
|
||||
)
|
||||
|
||||
@ui.button(label="No", style=ButtonStyle.danger)
|
||||
async def import_button_n(
|
||||
self, interaction: Interaction, button: ui.Button
|
||||
): # pylint: disable=unused-argument
|
||||
await self.message.edit("Import cancelled.", view=None)
|
||||
await self.message.delete(10)
|
||||
await self.ctx.message.delete(10)
|
145
aurora/importers/galacticbot.py
Normal file
145
aurora/importers/galacticbot.py
Normal file
|
@ -0,0 +1,145 @@
|
|||
import json
|
||||
from datetime import timedelta
|
||||
|
||||
from discord import ButtonStyle, Interaction, Message, ui
|
||||
from redbot.core import commands
|
||||
|
||||
from ..utilities.database import connect, create_guild_table, mysql_log
|
||||
|
||||
|
||||
class ImportGalacticBotView(ui.View):
|
||||
def __init__(self, timeout, ctx, message):
|
||||
super().__init__()
|
||||
self.ctx: commands.Context = ctx
|
||||
self.message: Message = message
|
||||
|
||||
@ui.button(label="Yes", style=ButtonStyle.success)
|
||||
async def import_button_y(self, interaction: Interaction, button: ui.Button): # pylint: disable=unused-argument
|
||||
await self.message.delete()
|
||||
await interaction.response.send_message("Deleting original table...", ephemeral=True)
|
||||
|
||||
database = connect()
|
||||
cursor = database.cursor()
|
||||
|
||||
query = f"DROP TABLE IF EXISTS moderation_{self.ctx.guild.id};"
|
||||
cursor.execute(query)
|
||||
|
||||
cursor.close()
|
||||
database.commit()
|
||||
|
||||
await interaction.edit_original_response(content="Creating new table...")
|
||||
|
||||
await create_guild_table(self.ctx.guild)
|
||||
|
||||
await interaction.edit_original_response(content="Importing moderations...")
|
||||
|
||||
accepted_types = [
|
||||
'NOTE',
|
||||
'WARN',
|
||||
'MUTE',
|
||||
'UNMUTE',
|
||||
'KICK',
|
||||
'SOFTBAN',
|
||||
'BAN',
|
||||
'UNBAN',
|
||||
'SLOWMODE',
|
||||
'LOCKDOWN'
|
||||
]
|
||||
|
||||
file = await self.ctx.message.attachments[0].read()
|
||||
data = sorted(json.loads(file), key=lambda x: x['case'])
|
||||
|
||||
failed_cases = []
|
||||
|
||||
for case in data:
|
||||
if case['type'] not in accepted_types:
|
||||
continue
|
||||
|
||||
timestamp = round(case['timestamp'] / 1000)
|
||||
|
||||
try:
|
||||
if case['duration'] is not None and float(case['duration']) != 0:
|
||||
duration = timedelta(seconds=round(float(case['duration']) / 1000))
|
||||
else:
|
||||
duration = 'NULL'
|
||||
except OverflowError:
|
||||
failed_cases.append(case['case'])
|
||||
continue
|
||||
|
||||
metadata = {
|
||||
'imported_from': 'GalacticBot'
|
||||
}
|
||||
|
||||
if case['type'] == 'SLOWMODE':
|
||||
metadata['seconds'] = case['data']['seconds']
|
||||
|
||||
if case['resolved']:
|
||||
resolved = 1
|
||||
resolved_by = None
|
||||
resolved_reason = None
|
||||
resolved_timestamp = None
|
||||
if case['changes']:
|
||||
for change in case['changes']:
|
||||
if change['type'] == 'RESOLVE':
|
||||
resolved_by = change['staff']
|
||||
resolved_reason = change['reason']
|
||||
resolved_timestamp = round(change['timestamp'] / 1000)
|
||||
break
|
||||
if resolved_by is None:
|
||||
resolved_by = '?'
|
||||
if resolved_reason is None:
|
||||
resolved_reason = 'Could not get resolve reason during moderation import.'
|
||||
if resolved_timestamp is None:
|
||||
resolved_timestamp = timestamp
|
||||
changes = [
|
||||
{
|
||||
'type': "ORIGINAL",
|
||||
'reason': case['reason'],
|
||||
'user_id': case['executor'],
|
||||
'timestamp': timestamp
|
||||
},
|
||||
{
|
||||
'type': "RESOLVE",
|
||||
'reason': resolved_reason,
|
||||
'user_id': resolved_by,
|
||||
'timestamp': resolved_timestamp
|
||||
}
|
||||
]
|
||||
else:
|
||||
resolved = 0
|
||||
resolved_by = 'NULL'
|
||||
resolved_reason = 'NULL'
|
||||
changes = []
|
||||
|
||||
if case['reason'] and case['reason'] != "N/A":
|
||||
reason = case['reason']
|
||||
else:
|
||||
reason = "NULL"
|
||||
|
||||
await mysql_log(
|
||||
self.ctx.guild.id,
|
||||
case['executor'],
|
||||
case['type'],
|
||||
case['targetType'],
|
||||
case['target'],
|
||||
0,
|
||||
duration,
|
||||
reason,
|
||||
timestamp=timestamp,
|
||||
resolved=resolved,
|
||||
resolved_by=resolved_by,
|
||||
resolved_reason=resolved_reason,
|
||||
changes=changes,
|
||||
metadata=metadata,
|
||||
database=database
|
||||
)
|
||||
|
||||
await interaction.edit_original_response(content="Import complete.")
|
||||
if failed_cases:
|
||||
await interaction.edit_original_response(content=f"Import complete.\n*Failed to import the following cases:*\n```{failed_cases}```")
|
||||
|
||||
@ui.button(label="No", style=ButtonStyle.danger)
|
||||
async def import_button_n(self, interaction: Interaction, button: ui.Button): # pylint: disable=unused-argument
|
||||
await self.message.edit("Import cancelled.", view=None)
|
||||
await self.message.delete(10)
|
||||
await self.ctx.message.delete(10)
|
11
aurora/info.json
Normal file
11
aurora/info.json
Normal file
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"author" : ["SeaswimmerTheFsh"],
|
||||
"install_msg" : "Thank you for installing Aurora!\nYou can find the source code of this cog [here](https://coastalcommits.com/SeaswimmerTheFsh/SeaCogs).",
|
||||
"name" : "Aurora",
|
||||
"short" : "A full replacement for Red's core Mod cogs.",
|
||||
"description" : "Aurora is designed to be a full replacement for Red's core Mod cogs. It is heavily inspired by GalacticBot, and is designed to be a more user-friendly alternative to Red's core Mod cogs. This cog stores all of its data in an SQLite database.",
|
||||
"end_user_data_statement" : "This cog stores the following information:\n- User IDs of accounts who moderate users or are moderated\n- Guild IDs of guilds with the cog enabled\n- Timestamps of moderations\n- Other information relating to moderations",
|
||||
"requirements": ["humanize", "pytimeparse2"],
|
||||
"hidden": false,
|
||||
"disabled": false
|
||||
}
|
0
aurora/utilities/__init__.py
Normal file
0
aurora/utilities/__init__.py
Normal file
25
aurora/utilities/config.py
Normal file
25
aurora/utilities/config.py
Normal file
|
@ -0,0 +1,25 @@
|
|||
from redbot.core import Config
|
||||
|
||||
config: Config = Config.get_conf(None, identifier=481923957134912, cog_name="Moderation")
|
||||
|
||||
def register_config(config_obj: Config):
|
||||
config_obj.register_guild(
|
||||
use_discord_permissions = True,
|
||||
ignore_other_bots = True,
|
||||
dm_users = True,
|
||||
log_channel = " ",
|
||||
immune_roles = [],
|
||||
history_ephemeral = False,
|
||||
history_inline = False,
|
||||
history_pagesize = 5,
|
||||
history_inline_pagesize = 6,
|
||||
auto_evidenceformat = False,
|
||||
blacklist_roles = []
|
||||
)
|
||||
config_obj.register_user(
|
||||
history_ephemeral = None,
|
||||
history_inline = None,
|
||||
history_pagesize = None,
|
||||
history_inline_pagesize = None,
|
||||
auto_evidenceformat = None
|
||||
)
|
220
aurora/utilities/database.py
Normal file
220
aurora/utilities/database.py
Normal file
|
@ -0,0 +1,220 @@
|
|||
# pylint: disable=cyclic-import
|
||||
|
||||
import json
|
||||
import time
|
||||
import sqlite3
|
||||
from os import sep
|
||||
from datetime import datetime
|
||||
|
||||
from discord import Guild
|
||||
from redbot.core import data_manager
|
||||
|
||||
from .logger import logger
|
||||
from .utils import check_conf, generate_dict, get_next_case_number
|
||||
|
||||
|
||||
def connect() -> sqlite3.Connection:
|
||||
"""Connects to the SQLite database, and returns a connection object."""
|
||||
try:
|
||||
connection = sqlite3.connect(database=data_manager.cog_data_path(raw_name='Moderation') + sep + 'moderation.db')
|
||||
return connection
|
||||
|
||||
except sqlite3.ProgrammingError as e:
|
||||
logger.error("Unable to access the SQLite database!\nError:\n%s", e.msg)
|
||||
raise ConnectionRefusedError(
|
||||
f"Unable to access the SQLite Database!\n{e.msg}"
|
||||
) from e
|
||||
|
||||
|
||||
async def create_guild_table(guild: Guild):
|
||||
database = connect()
|
||||
cursor = database.cursor()
|
||||
|
||||
try:
|
||||
cursor.execute(f"SELECT * FROM `moderation_{guild.id}`")
|
||||
logger.debug("SQLite Table exists for server %s (%s)", guild.name, guild.id)
|
||||
|
||||
except sqlite3.ProgrammingError:
|
||||
query = f"""
|
||||
CREATE TABLE `moderation_{guild.id}` (
|
||||
moderation_id INT UNIQUE PRIMARY KEY NOT NULL,
|
||||
timestamp INT NOT NULL,
|
||||
moderation_type LONGTEXT NOT NULL,
|
||||
target_type LONGTEXT NOT NULL,
|
||||
target_id LONGTEXT NOT NULL,
|
||||
moderator_id LONGTEXT NOT NULL,
|
||||
role_id LONGTEXT,
|
||||
duration LONGTEXT,
|
||||
end_timestamp INT,
|
||||
reason LONGTEXT,
|
||||
resolved BOOL NOT NULL,
|
||||
resolved_by LONGTEXT,
|
||||
resolve_reason LONGTEXT,
|
||||
expired BOOL NOT NULL,
|
||||
changes JSON NOT NULL,
|
||||
metadata JSON NOT NULL
|
||||
)
|
||||
"""
|
||||
cursor.execute(query)
|
||||
|
||||
index_query_1 = "CREATE INDEX idx_target_id ON moderation_%s(target_id(25));"
|
||||
cursor.execute(index_query_1, (guild.id,))
|
||||
|
||||
index_query_2 = (
|
||||
"CREATE INDEX idx_moderator_id ON moderation_%s(moderator_id(25));"
|
||||
)
|
||||
cursor.execute(index_query_2, (guild.id,))
|
||||
|
||||
index_query_3 = (
|
||||
"CREATE INDEX idx_moderation_id ON moderation_%s(moderation_id);"
|
||||
)
|
||||
cursor.execute(index_query_3, (guild.id,))
|
||||
|
||||
insert_query = f"""
|
||||
INSERT INTO `moderation_{guild.id}`
|
||||
(moderation_id, timestamp, moderation_type, target_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||
"""
|
||||
insert_values = (
|
||||
0,
|
||||
0,
|
||||
"NULL",
|
||||
"NULL",
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
"NULL",
|
||||
0,
|
||||
"NULL",
|
||||
0,
|
||||
"NULL",
|
||||
"NULL",
|
||||
0,
|
||||
json.dumps([]), # pylint: disable=dangerous-default-value
|
||||
json.dumps({}), # pylint: disable=dangerous-default-value
|
||||
)
|
||||
cursor.execute(insert_query, insert_values)
|
||||
|
||||
database.commit()
|
||||
|
||||
logger.debug(
|
||||
"SQLite Table (moderation_%s) created for %s (%s)",
|
||||
guild.id,
|
||||
guild.name,
|
||||
guild.id,
|
||||
)
|
||||
|
||||
database.close()
|
||||
|
||||
|
||||
async def mysql_log(
|
||||
guild_id: str,
|
||||
author_id: str,
|
||||
moderation_type: str,
|
||||
target_type: str,
|
||||
target_id: int,
|
||||
role_id: int,
|
||||
duration,
|
||||
reason: str,
|
||||
database: sqlite3.Connection = None,
|
||||
timestamp: int = None,
|
||||
resolved: bool = False,
|
||||
resolved_by: str = None,
|
||||
resolved_reason: str = None,
|
||||
expired: bool = None,
|
||||
changes: list = [],
|
||||
metadata: dict = {},
|
||||
) -> int: # pylint: disable=dangerous-default-value
|
||||
if not timestamp:
|
||||
timestamp = int(time.time())
|
||||
|
||||
if duration != "NULL":
|
||||
end_timedelta = datetime.fromtimestamp(timestamp) + duration
|
||||
end_timestamp = int(end_timedelta.timestamp())
|
||||
else:
|
||||
end_timestamp = 0
|
||||
|
||||
if not expired:
|
||||
if int(time.time()) > end_timestamp:
|
||||
expired = 1
|
||||
else:
|
||||
expired = 0
|
||||
|
||||
if resolved_by is None:
|
||||
resolved_by = "NULL"
|
||||
|
||||
if resolved_reason is None:
|
||||
resolved_reason = "NULL"
|
||||
|
||||
if not database:
|
||||
database = connect()
|
||||
close_db = True
|
||||
else:
|
||||
close_db = False
|
||||
cursor = database.cursor()
|
||||
|
||||
moderation_id = await get_next_case_number(guild_id=guild_id, cursor=cursor)
|
||||
|
||||
sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
|
||||
val = (
|
||||
moderation_id,
|
||||
timestamp,
|
||||
moderation_type,
|
||||
target_type,
|
||||
target_id,
|
||||
author_id,
|
||||
role_id,
|
||||
duration,
|
||||
end_timestamp,
|
||||
reason,
|
||||
int(resolved),
|
||||
resolved_by,
|
||||
resolved_reason,
|
||||
expired,
|
||||
json.dumps(changes),
|
||||
json.dumps(metadata),
|
||||
)
|
||||
cursor.execute(sql, val)
|
||||
|
||||
cursor.close()
|
||||
database.commit()
|
||||
if close_db:
|
||||
database.close()
|
||||
|
||||
logger.debug(
|
||||
"Row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
|
||||
guild_id,
|
||||
moderation_id,
|
||||
timestamp,
|
||||
moderation_type,
|
||||
target_type,
|
||||
target_id,
|
||||
author_id,
|
||||
role_id,
|
||||
duration,
|
||||
end_timestamp,
|
||||
reason,
|
||||
int(resolved),
|
||||
resolved_by,
|
||||
resolved_reason,
|
||||
expired,
|
||||
changes,
|
||||
metadata,
|
||||
)
|
||||
|
||||
return moderation_id
|
||||
|
||||
|
||||
async def fetch_case(moderation_id: int, guild_id: str) -> dict:
|
||||
"""This method fetches a case from the database and returns the case's dictionary."""
|
||||
database = connect()
|
||||
cursor = database.cursor()
|
||||
|
||||
query = "SELECT * FROM moderation_%s WHERE moderation_id = %s;"
|
||||
cursor.execute(query, (guild_id, moderation_id))
|
||||
result = cursor.fetchone()
|
||||
|
||||
cursor.close()
|
||||
database.close()
|
||||
|
||||
return generate_dict(result)
|
202
aurora/utilities/embed_factory.py
Normal file
202
aurora/utilities/embed_factory.py
Normal file
|
@ -0,0 +1,202 @@
|
|||
# pylint: disable=cyclic-import
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import humanize
|
||||
from discord import Color, Embed, Guild, Interaction, InteractionMessage
|
||||
|
||||
from .utils import fetch_channel_dict, fetch_user_dict, get_next_case_number
|
||||
|
||||
|
||||
async def embed_factory(embed_type: str, color: Color, /, interaction: Interaction = None, case_dict: dict = None, guild: Guild = None, reason: str = None, moderation_type: str = None, response: InteractionMessage = None, duration: timedelta = None, resolved: bool = False):
|
||||
"""This method creates an embed from set parameters, meant for either moderation logging or contacting the moderated user.
|
||||
|
||||
Valid arguments for 'embed_type':
|
||||
- 'message'
|
||||
- 'log'
|
||||
- 'case'
|
||||
- 'changes'
|
||||
|
||||
Required arguments for 'message':
|
||||
- guild
|
||||
- reason
|
||||
- moderation_type
|
||||
- response
|
||||
- duration (optional)
|
||||
|
||||
Required arguments for 'log':
|
||||
- interaction
|
||||
- case_dict
|
||||
- resolved (optional)
|
||||
|
||||
Required arguments for 'case', 'changes', and `evidenceformat`:
|
||||
- interaction
|
||||
- case_dict"""
|
||||
codeblock = '```'
|
||||
if embed_type == 'message':
|
||||
|
||||
if moderation_type in ["kicked", "banned", "tempbanned", "unbanned"]:
|
||||
guild_name = guild.name
|
||||
else:
|
||||
guild_name = f"[{guild.name}]({response.jump_url})"
|
||||
|
||||
if moderation_type in ["tempbanned", "muted"] and duration:
|
||||
embed_duration = f" for {humanize.precisedelta(duration)}"
|
||||
else:
|
||||
embed_duration = ""
|
||||
|
||||
if moderation_type == "note":
|
||||
embed_desc = "received a"
|
||||
else:
|
||||
embed_desc = "been"
|
||||
|
||||
embed = Embed(title=str.title(moderation_type), description=f"You have {embed_desc} {moderation_type}{embed_duration} in {guild_name}.", color=color, timestamp=datetime.now())
|
||||
embed.add_field(name='Reason', value=f"`{reason}`")
|
||||
embed.set_author(name=guild.name, icon_url=guild.icon.url)
|
||||
embed.set_footer(text=f"Case #{await get_next_case_number(guild.id):,}", icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&")
|
||||
|
||||
return embed
|
||||
|
||||
if embed_type == 'case':
|
||||
if case_dict['target_type'] == 'USER':
|
||||
target_user = await fetch_user_dict(interaction, case_dict['target_id'])
|
||||
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
|
||||
elif case_dict['target_type'] == 'CHANNEL':
|
||||
target_user = await fetch_channel_dict(interaction, case_dict['target_id'])
|
||||
if target_user['mention']:
|
||||
target_name = f"{target_user['mention']}"
|
||||
else:
|
||||
target_name = f"`{target_user['name']}`"
|
||||
|
||||
moderator_user = await fetch_user_dict(interaction, case_dict['moderator_id'])
|
||||
moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`"
|
||||
|
||||
embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,}", color=color)
|
||||
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Resolved:** {bool(case_dict['resolved'])}\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
|
||||
|
||||
if case_dict['duration'] != 'NULL':
|
||||
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))})
|
||||
duration_embed = f"{humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>" if bool(case_dict['expired']) is False else str(humanize.precisedelta(td))
|
||||
embed.description += f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}"
|
||||
|
||||
embed.description += f"\n**Changes:** {len(case_dict['changes']) - 1}" if case_dict['changes'] else "\n**Changes:** 0"
|
||||
|
||||
if case_dict['metadata']:
|
||||
if case_dict['metadata']['imported_from']:
|
||||
embed.description += f"\n**Imported From:** {case_dict['metadata']['imported_from']}"
|
||||
|
||||
embed.add_field(name='Reason', value=f"{codeblock}{case_dict['reason']}{codeblock}", inline=False)
|
||||
|
||||
if case_dict['resolved'] == 1:
|
||||
resolved_user = await fetch_user_dict(interaction, case_dict['resolved_by'])
|
||||
resolved_name = f"`{resolved_user['name']}`" if resolved_user['discriminator'] == "0" else f"`{resolved_user['name']}#{resolved_user['discriminator']}`"
|
||||
embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n{codeblock}{case_dict['resolve_reason']}{codeblock}", inline=False)
|
||||
|
||||
return embed
|
||||
|
||||
if embed_type == 'changes':
|
||||
embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,} Changes", color=color)
|
||||
|
||||
memory_dict = {}
|
||||
|
||||
if case_dict['changes']:
|
||||
for change in case_dict['changes']:
|
||||
if change['user_id'] not in memory_dict:
|
||||
memory_dict[str(change['user_id'])] = await fetch_user_dict(interaction, change['user_id'])
|
||||
|
||||
user = memory_dict[str(change['user_id'])]
|
||||
name = user['name'] if user['discriminator'] == "0" else f"{user['name']}#{user['discriminator']}"
|
||||
|
||||
timestamp = f"<t:{change['timestamp']}> | <t:{change['timestamp']}:R>"
|
||||
|
||||
if change['type'] == 'ORIGINAL':
|
||||
embed.add_field(name='Original', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False)
|
||||
|
||||
elif change['type'] == 'EDIT':
|
||||
embed.add_field(name='Edit', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False)
|
||||
|
||||
elif change['type'] == 'RESOLVE':
|
||||
embed.add_field(name='Resolve', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False)
|
||||
|
||||
else:
|
||||
embed.description = "*No changes have been made to this case.* 🙁"
|
||||
|
||||
return embed
|
||||
|
||||
if embed_type == 'evidenceformat':
|
||||
if case_dict['target_type'] == 'USER':
|
||||
target_user = await fetch_user_dict(interaction, case_dict['target_id'])
|
||||
target_name = target_user['name'] if target_user['discriminator'] == "0" else f"{target_user['name']}#{target_user['discriminator']}"
|
||||
|
||||
elif case_dict['target_type'] == 'CHANNEL':
|
||||
target_user = await fetch_channel_dict(interaction, case_dict['target_id'])
|
||||
target_name = target_user['name']
|
||||
|
||||
moderator_user = await fetch_user_dict(interaction, case_dict['moderator_id'])
|
||||
moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}"
|
||||
|
||||
content = f"{codeblock}prolog\nCase: {case_dict['moderation_id']:,} ({str.title(case_dict['moderation_type'])})\nTarget: {target_name} ({target_user['id']})\nModerator: {moderator_name} ({moderator_user['id']})"
|
||||
|
||||
if case_dict['duration'] != 'NULL':
|
||||
hours, minutes, seconds = map(int, case_dict['duration'].split(':'))
|
||||
td = timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
||||
content += f"\nDuration: {humanize.precisedelta(td)}"
|
||||
|
||||
content += f"\nReason: {case_dict['reason']}{codeblock}"
|
||||
|
||||
return content
|
||||
|
||||
if embed_type == 'log':
|
||||
if resolved:
|
||||
if case_dict['target_type'] == 'USER':
|
||||
target_user = await fetch_user_dict(interaction, case_dict['target_id'])
|
||||
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
|
||||
elif case_dict['target_type'] == 'CHANNEL':
|
||||
target_user = await fetch_channel_dict(interaction, case_dict['target_id'])
|
||||
if target_user['mention']:
|
||||
target_name = f"{target_user['mention']}"
|
||||
else:
|
||||
target_name = f"`{target_user['name']}`"
|
||||
|
||||
moderator_user = await fetch_user_dict(interaction, case_dict['moderator_id'])
|
||||
moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`"
|
||||
|
||||
embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,} Resolved", color=color)
|
||||
|
||||
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
|
||||
|
||||
if case_dict['duration'] != 'NULL':
|
||||
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))})
|
||||
duration_embed = f"{humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>" if case_dict["expired"] == '0' else str(humanize.precisedelta(td))
|
||||
embed.description = embed.description + f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}"
|
||||
|
||||
embed.add_field(name='Reason', value=f"{codeblock}{case_dict['reason']}{codeblock}", inline=False)
|
||||
|
||||
resolved_user = await fetch_user_dict(interaction, case_dict['resolved_by'])
|
||||
resolved_name = resolved_user['name'] if resolved_user['discriminator'] == "0" else f"{resolved_user['name']}#{resolved_user['discriminator']}"
|
||||
embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n{codeblock}{case_dict['resolve_reason']}{codeblock}", inline=False)
|
||||
else:
|
||||
if case_dict['target_type'] == 'USER':
|
||||
target_user = await fetch_user_dict(interaction, case_dict['target_id'])
|
||||
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
|
||||
elif case_dict['target_type'] == 'CHANNEL':
|
||||
target_user = await fetch_channel_dict(interaction, case_dict['target_id'])
|
||||
if target_user['mention']:
|
||||
target_name = target_user['mention']
|
||||
else:
|
||||
target_name = f"`{target_user['name']}`"
|
||||
|
||||
moderator_user = await fetch_user_dict(interaction, case_dict['moderator_id'])
|
||||
moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`"
|
||||
|
||||
embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,}", color=color)
|
||||
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
|
||||
|
||||
if case_dict['duration'] != 'NULL':
|
||||
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))})
|
||||
embed.description = embed.description + f"\n**Duration:** {humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>"
|
||||
|
||||
embed.add_field(name='Reason', value=f"{codeblock}{case_dict['reason']}{codeblock}", inline=False)
|
||||
return embed
|
||||
|
||||
raise(TypeError("'type' argument is invalid!"))
|
3
aurora/utilities/logger.py
Normal file
3
aurora/utilities/logger.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
import logging
|
||||
|
||||
logger = logging.getLogger("red.sea.moderation")
|
234
aurora/utilities/utils.py
Normal file
234
aurora/utilities/utils.py
Normal file
|
@ -0,0 +1,234 @@
|
|||
# pylint: disable=cyclic-import
|
||||
|
||||
import json
|
||||
from typing import Union
|
||||
|
||||
from discord import Guild, Interaction, Member, User
|
||||
from discord.errors import Forbidden, NotFound
|
||||
from redbot.core import commands
|
||||
|
||||
from .config import config
|
||||
|
||||
|
||||
async def check_conf(config_list: list):
|
||||
"""Checks if any required config options are not set."""
|
||||
not_found_list = []
|
||||
|
||||
for item in config_list:
|
||||
if await config.item() == " ":
|
||||
not_found_list.append(item)
|
||||
|
||||
return not_found_list
|
||||
|
||||
|
||||
def check_permissions(
|
||||
user: User,
|
||||
permissions: list,
|
||||
ctx: Union[commands.Context, Interaction] = None,
|
||||
guild: Guild = None,
|
||||
):
|
||||
"""Checks if a user has a specific permission (or a list of permissions) in a channel."""
|
||||
if ctx:
|
||||
member = ctx.guild.get_member(user.id)
|
||||
resolved_permissions = ctx.channel.permissions_for(member)
|
||||
|
||||
elif guild:
|
||||
member = guild.get_member(user.id)
|
||||
resolved_permissions = member.guild_permissions
|
||||
|
||||
else:
|
||||
raise (KeyError)
|
||||
|
||||
for permission in permissions:
|
||||
if (
|
||||
not getattr(resolved_permissions, permission, False)
|
||||
and not resolved_permissions.administrator is True
|
||||
):
|
||||
return permission
|
||||
|
||||
return False
|
||||
|
||||
|
||||
async def check_moddable(
|
||||
target: Union[User, Member], interaction: Interaction, permissions: list
|
||||
):
|
||||
"""Checks if a moderator can moderate a target."""
|
||||
if check_permissions(interaction.client.user, permissions, guild=interaction.guild):
|
||||
await interaction.response.send_message(
|
||||
f"I do not have the `{permissions}` permission, required for this action.",
|
||||
ephemeral=True,
|
||||
)
|
||||
return False
|
||||
|
||||
if await config.guild(interaction.guild).use_discord_permissions() is True:
|
||||
if check_permissions(interaction.user, permissions, guild=interaction.guild):
|
||||
await interaction.response.send_message(
|
||||
f"You do not have the `{permissions}` permission, required for this action.",
|
||||
ephemeral=True,
|
||||
)
|
||||
return False
|
||||
|
||||
if interaction.user.id == target.id:
|
||||
await interaction.response.send_message(
|
||||
content="You cannot moderate yourself!", ephemeral=True
|
||||
)
|
||||
return False
|
||||
|
||||
if target.bot:
|
||||
await interaction.response.send_message(
|
||||
content="You cannot moderate bots!", ephemeral=True
|
||||
)
|
||||
return False
|
||||
|
||||
if isinstance(target, Member):
|
||||
if interaction.user.top_role <= target.top_role:
|
||||
await interaction.response.send_message(
|
||||
content="You cannot moderate members with a higher role than you!",
|
||||
ephemeral=True,
|
||||
)
|
||||
return False
|
||||
|
||||
if (
|
||||
interaction.guild.get_member(interaction.client.user.id).top_role
|
||||
<= target.top_role
|
||||
):
|
||||
await interaction.response.send_message(
|
||||
content="You cannot moderate members with a role higher than the bot!",
|
||||
ephemeral=True,
|
||||
)
|
||||
return False
|
||||
|
||||
immune_roles = await config.guild(target.guild).immune_roles()
|
||||
|
||||
for role in target.roles:
|
||||
if role.id in immune_roles:
|
||||
await interaction.response.send_message(
|
||||
content="You cannot moderate members with an immune role!",
|
||||
ephemeral=True,
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
async def get_next_case_number(guild_id: str, cursor=None):
|
||||
"""This function returns the next case number from the MySQL table for a specific guild."""
|
||||
from .database import connect
|
||||
|
||||
if not cursor:
|
||||
database = connect()
|
||||
cursor = database.cursor()
|
||||
cursor.execute(
|
||||
f"SELECT moderation_id FROM `moderation_{guild_id}` ORDER BY moderation_id DESC LIMIT 1"
|
||||
)
|
||||
return cursor.fetchone()[0] + 1
|
||||
|
||||
|
||||
def generate_dict(result):
|
||||
case: dict = {
|
||||
"moderation_id": result[0],
|
||||
"timestamp": result[1],
|
||||
"moderation_type": result[2],
|
||||
"target_type": result[3],
|
||||
"target_id": result[4],
|
||||
"moderator_id": result[5],
|
||||
"role_id": result[6],
|
||||
"duration": result[7],
|
||||
"end_timestamp": result[8],
|
||||
"reason": result[9],
|
||||
"resolved": result[10],
|
||||
"resolved_by": result[11],
|
||||
"resolve_reason": result[12],
|
||||
"expired": result[13],
|
||||
"changes": json.loads(result[14]),
|
||||
"metadata": json.loads(result[15])
|
||||
}
|
||||
return case
|
||||
|
||||
|
||||
async def fetch_user_dict(interaction: Interaction, user_id: str):
|
||||
"""This function returns a dictionary containing either user information or a standard deleted user template."""
|
||||
if user_id == "?":
|
||||
user_dict = {"id": "?", "name": "Unknown User", "discriminator": "0"}
|
||||
|
||||
else:
|
||||
try:
|
||||
user = interaction.client.get_user(user_id)
|
||||
if user is None:
|
||||
user = await interaction.client.fetch_user(user_id)
|
||||
|
||||
user_dict = {
|
||||
"id": user.id,
|
||||
"name": user.name,
|
||||
"discriminator": user.discriminator,
|
||||
}
|
||||
|
||||
except NotFound:
|
||||
user_dict = {
|
||||
"id": user_id,
|
||||
"name": "Deleted User",
|
||||
"discriminator": "0",
|
||||
}
|
||||
|
||||
return user_dict
|
||||
|
||||
|
||||
async def fetch_channel_dict(interaction: Interaction, channel_id: str):
|
||||
"""This function returns a dictionary containing either channel information or a standard deleted channel template."""
|
||||
try:
|
||||
channel = interaction.guild.get_channel(channel_id)
|
||||
if not channel:
|
||||
channel = await interaction.guild.fetch_channel(channel_id)
|
||||
|
||||
channel_dict = {"id": channel.id, "name": channel.name, "mention": channel.mention}
|
||||
|
||||
except NotFound:
|
||||
channel_dict = {"id": channel_id, "name": "Deleted Channel", "mention": None}
|
||||
|
||||
return channel_dict
|
||||
|
||||
|
||||
async def fetch_role_dict(interaction: Interaction, role_id: str):
|
||||
"""This function returns a dictionary containing either role information or a standard deleted role template."""
|
||||
role = interaction.guild.get_role(role_id)
|
||||
if not role:
|
||||
role_dict = {"id": role_id, "name": "Deleted Role"}
|
||||
|
||||
role_dict = {"id": role.id, "name": role.name}
|
||||
|
||||
return role_dict
|
||||
|
||||
|
||||
async def log(interaction: Interaction, moderation_id: int, resolved: bool = False):
|
||||
"""This function sends a message to the guild's configured logging channel when an infraction takes place."""
|
||||
from .database import fetch_case
|
||||
from .embed_factory import embed_factory
|
||||
|
||||
logging_channel_id = await config.guild(interaction.guild).log_channel()
|
||||
if logging_channel_id != " ":
|
||||
logging_channel = interaction.guild.get_channel(logging_channel_id)
|
||||
|
||||
case = await fetch_case(moderation_id, interaction.guild.id)
|
||||
if case:
|
||||
embed = await embed_factory(
|
||||
"log", await interaction.client.get_embed_color(None), interaction=interaction, case_dict=case, resolved=resolved
|
||||
)
|
||||
try:
|
||||
await logging_channel.send(embed=embed)
|
||||
except Forbidden:
|
||||
return
|
||||
|
||||
async def send_evidenceformat(interaction: Interaction, case_dict: dict):
|
||||
"""This function sends an ephemeral message to the moderator who took the moderation action, with a pre-made codeblock for use in the mod-evidence channel."""
|
||||
from .embed_factory import embed_factory
|
||||
|
||||
send_evidence_bool = (await config.user(interaction.user).auto_evidenceformat()
|
||||
or await config.guild(interaction.guild).auto_evidenceformat()
|
||||
or False)
|
||||
if send_evidence_bool is False:
|
||||
return
|
||||
|
||||
content = await embed_factory(
|
||||
"evidenceformat", await interaction.client.get_embed_color(None), interaction=interaction, case_dict=case_dict
|
||||
)
|
||||
await interaction.followup.send(content=content, ephemeral=True)
|
Loading…
Add table
Add a link
Reference in a new issue