import logging import time from datetime import datetime, timedelta, timezone import discord import humanize import mysql.connector from pytimeparse2 import disable_dateutil, parse from redbot.core import app_commands, checks, Config, commands from redbot.core.app_commands import Choice class Moderation(commands.Cog): """Custom cog moderation cog, meant to copy GalacticBot. Developed by SeaswimmerTheFsh.""" def __init__(self, bot): = bot self.config = Config.get_conf(self, identifier=481923957134912) self.config.register_global( mysql_address= " ", mysql_database = " ", mysql_username = " ", mysql_password = " ", ignore_other_bots = True ) disable_dateutil() async def cog_load(self): """This method prepares the database schema for all of the guilds the bot is currently in.""" conf = await self.check_conf([ 'mysql_address', 'mysql_database', 'mysql_username', 'mysql_password' ]) if conf: logging.fatal("Failed to create tables, due to MySQL connection configuration being unset.") return guilds: list[discord.Guild] = try: for guild in guilds: await self.create_guild_table(guild) except ConnectionRefusedError: return @commands.Cog.listener('on_guild_join') async def db_generate_guild_join(self, guild: discord.Guild): """This method prepares the database schema whenever the bot joins a guild.""" conf = await self.check_conf([ 'mysql_address', 'mysql_database', 'mysql_username', 'mysql_password' ]) if conf: logging.error("Failed to create a table for %s, due to MySQL connection configuration being unset.", return try: await self.create_guild_table(guild) except ConnectionRefusedError: return @commands.Cog.listener('on_audit_log_entry_create') async def autologger(self, entry: discord.AuditLogEntry): """This method automatically logs moderations done by users manually ("right clicks").""" if await self.config.ignore_other_bots() is True: if or return else: if == return duration = "NULL" if entry.reason: reason = entry.reason + " (This action was performed without the bot.)" else: reason = "This action was performed without the bot." if entry.action == discord.AuditLogAction.kick: moderation_type = 'KICK' elif entry.action == discord.AuditLogAction.ban: moderation_type = 'BAN' elif entry.action == discord.AuditLogAction.unban: moderation_type = 'UNBAN' elif entry.action == discord.AuditLogAction.member_update: if entry.after.timed_out_until is not None: timed_out_until_aware = entry.after.timed_out_until.replace(tzinfo=timezone.utc) duration_datetime = timed_out_until_aware - minutes = round(duration_datetime.total_seconds() / 60) duration = timedelta(minutes=minutes) moderation_type = 'MUTE' else: moderation_type = 'UNMUTE' else: return await self.mysql_log(,, moderation_type,, duration, reason) async def connect(self): """Connects to the MySQL database, and returns a connection object.""" conf = await self.check_conf([ 'mysql_address', 'mysql_database', 'mysql_username', 'mysql_password' ]) if conf: raise LookupError("MySQL connection details not set properly!") try: connection = mysql.connector.connect( host=await self.config.mysql_address(), user=await self.config.mysql_username(), password=await self.config.mysql_password(), database=await self.config.mysql_database() ) return connection except mysql.connector.ProgrammingError as e: logging.fatal("Unable to access the MySQL database!\nError:\n%s", e.msg) raise ConnectionRefusedError(f"Unable to access the MySQL Database!\n{e.msg}") from e async def create_guild_table(self, guild: discord.Guild): database = await self.connect() cursor = database.cursor() try: cursor.execute(f"SELECT * FROM `moderation_{}`")"MySQL Table exists for server %s (%s)",, except mysql.connector.errors.ProgrammingError: query = f""" CREATE TABLE `moderation_{}` ( moderation_id INT UNIQUE PRIMARY KEY NOT NULL, timestamp INT NOT NULL, moderation_type LONGTEXT NOT NULL, target_id LONGTEXT NOT NULL, moderator_id LONGTEXT NOT NULL, duration LONGTEXT, end_timestamp INT, reason LONGTEXT, resolved BOOL NOT NULL, resolve_reason LONGTEXT, expired BOOL NOT NULL ) """ cursor.execute(query) insert_query = f""" INSERT INTO `moderation_{}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, duration, end_timestamp, reason, resolved, resolve_reason, expired) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ insert_values = (0, 0, "NULL", 0, 0, "NULL", 0, "NULL", 0, "NULL", 0) cursor.execute(insert_query, insert_values) database.commit() database.close()"MySQL Table (moderation_%s) created for %s (%s)",,, else: database.close() return async def check_conf(self, config: list): """Checks if any required config options are not set.""" not_found_list = [] for item in config: if await self.config.item() == " ": not_found_list.append(item) return not_found_list async def mysql_log(self, guild_id: str, author_id: str, moderation_type: str, target_id: int, duration, reason: str): timestamp = int(time.time()) if duration != "NULL": end_timedelta = datetime.fromtimestamp(timestamp) + duration end_timestamp = int(end_timedelta.timestamp()) else: end_timestamp = 0 database = await self.connect() cursor = database.cursor() moderation_id = await self.get_next_case_number(guild_id=guild_id, cursor=cursor) sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, duration, end_timestamp, reason, resolved, resolve_reason, expired) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" val = (moderation_id, timestamp, moderation_type, target_id, author_id, duration, end_timestamp, f"{reason}", 0, "NULL", 0) cursor.execute(sql, val) database.commit() database.close() logging.debug("MySQL row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, 0, NULL", guild_id, moderation_id, timestamp, moderation_type, target_id, author_id, duration, end_timestamp, reason) async def get_next_case_number(self, guild_id: str, cursor = None): """This method returns the next case number from the MySQL table for a specific guild.""" if not cursor: database = await self.connect() cursor = database.cursor() cursor.execute(f"SELECT moderation_id FROM `moderation_{guild_id}` ORDER BY moderation_id DESC LIMIT 1") return cursor.fetchone()[0] + 1 async def embed_factory(self, embed_type: str, guild: discord.Guild, reason: str, moderation_type: str, response: discord.InteractionMessage, duration: timedelta = None): """This method creates an embed from set parameters, meant for either moderation logging or contacting the moderated user. Valid arguments for 'embed_type': - 'message' - 'log' - WIP - 'case' - WIP""" if embed_type == 'message': if moderation_type in ["kicked", "banned", "tempbanned", "unbanned"]: guild_name = else: guild_name = f"[{}]({response.jump_url})" if moderation_type in ["tempbanned", "muted"] and duration: embed_duration = f" for {humanize.precisedelta(duration)}" else: embed_duration = "" if moderation_type == "note": embed_desc = "recieved a" else: embed_desc = "been" embed = discord.Embed(title=str.title(moderation_type), description=f"You have {embed_desc} {moderation_type}{embed_duration} in {guild_name}.", color=await, embed.add_field(name='Reason', value=f"`{reason}`") embed.set_author(, icon_url=guild.icon.url) embed.set_footer(text=f"Case #{await self.get_next_case_number(}", icon_url="") return embed raise(TypeError("'type' argument is invalid!")) @app_commands.command(name="note") async def note(self, interaction: discord.Interaction, target: discord.Member, reason: str): """Add a note to a user.""" await interaction.response.send_message(content=f"{target.mention} has recieved a note!\n**Reason** - `{reason}`") try: embed = await self.embed_factory('message', interaction.guild, reason, 'note', await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass await self.mysql_log(,, 'NOTE',, 'NULL', reason) @app_commands.command(name="warn") async def warn(self, interaction: discord.Interaction, target: discord.Member, reason: str): """Warn a user.""" await interaction.response.send_message(content=f"{target.mention} has been warned!\n**Reason** - `{reason}`") try: embed = await self.embed_factory('message', interaction.guild, reason, 'warned', await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass await self.mysql_log(,, 'WARN',, 'NULL', reason) @app_commands.command(name="mute") async def mute(self, interaction: discord.Interaction, target: discord.Member, duration: str, reason: str): """Mute a user.""" if target.is_timed_out() is True: await interaction.response.send_message(f"{target.mention} is already muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True) return try: parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True) except ValueError: await interaction.response.send_message("Please provide a valid duration!", ephemeral=True) return if parsed_time.total_seconds() / 1000 > 2419200000: await interaction.response.send_message("Please provide a duration that is less than 28 days.") return await target.timeout(parsed_time, reason=f"Muted by {} for: {reason}") await interaction.response.send_message(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`") try: embed = await self.embed_factory('message', interaction.guild, reason, 'muted', await interaction.original_response(), parsed_time) await target.send(embed=embed) except discord.errors.HTTPException: pass await self.mysql_log(,, 'MUTE',, parsed_time, reason) @app_commands.command(name="unmute") async def unmute(self, interaction: discord.Interaction, target: discord.Member, reason: str = None): """Unmute a user.""" if target.is_timed_out() is False: await interaction.response.send_message(f"{target.mention} is not muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True) return if reason: await target.timeout(None, reason=f"Unmuted by {} for: {reason}") else: await target.timeout(None, reason=f"Unbanned by {}") reason = "No reason given." await interaction.response.send_message(content=f"{target.mention} has been unmuted!\n**Reason** - `{reason}`") try: embed = await self.embed_factory('message', interaction.guild, reason, 'unmuted', await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass await self.mysql_log(,, 'UNMUTE',, 'NULL', reason) @app_commands.command(name="kick") async def kick(self, interaction: discord.Interaction, target: discord.Member, reason: str): """Kick a user.""" await interaction.response.send_message(content=f"{target.mention} has been kicked!\n**Reason** - `{reason}`") try: embed = await self.embed_factory('message', interaction.guild, reason, 'kicked', await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass await target.kick(f"Kicked by {} for: {reason}") await self.mysql_log(,, 'KICK',, 'NULL', reason) @app_commands.command(name="ban") @app_commands.choices(answer=[ Choice(name="None", value=0), Choice(name='1 Hour', value=3600), Choice(name='12 Hours', value=43200), Choice(name='1 Day', value=86400), Choice(name='3 Days', value=259200), Choice(name='7 Days', value=604800), ]) async def ban(self, interaction: discord.Interaction, target: discord.User, reason: str, duration: str = None, delete_messages: Choice[int] = 0): """Ban a user.""" try: await interaction.guild.fetch_ban( await interaction.response.send_message(content=f"{target.mention} is already banned!", ephemeral=True) return except discord.errors.NotFound: pass if duration: try: parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True) except ValueError: await interaction.response.send_message("Please provide a valid duration!", ephemeral=True) return await interaction.response.send_message(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`") try: embed = await self.embed_factory('message', interaction.guild, reason, 'tempbanned', await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass await interaction.guild.ban(, reason=f"Tempbanned by {} for: {reason} (Duration: {parsed_time})", delete_message_seconds=delete_messages) await self.mysql_log(,, 'TEMPBAN',, parsed_time, reason) else: await interaction.response.send_message(content=f"{target.mention} has been banned!\n**Reason** - `{reason}`") try: embed = await self.embed_factory('message', interaction.guild, reason, 'banned', await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass await interaction.guild.ban(, reason=f"Banned by {} for: {reason}", delete_message_seconds=delete_messages) await self.mysql_log(,, 'BAN',, 'NULL', reason) @app_commands.command(name="unban") async def unban(self, interaction: discord.Interaction, target: discord.User, reason: str = None): """Unban a user.""" if reason: await interaction.guild.unban(, reason=f"Unbanned by {} for: {reason}") else: await interaction.guild.unban(, reason=f"Unbanned by {}") reason = "No reason given." await interaction.response.send_message(content=f"{target.mention} has been unbanned!\n**Reason** - `{reason}`") try: embed = await self.embed_factory('message', interaction.guild, reason, 'unbanned', await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass await self.mysql_log(,, 'UNBAN',, 'NULL', reason) @checks.admin() async def moderationset(self, ctx: commands.Context): """Manage moderation commands.""" @moderationset.command(name="ignorebots") @checks.admin() async def moderationset_ignorebots(self, ctx: commands.Context): await self.config.ignore_other_bots.set(not await self.config.ignore_other_bots()) await ctx.send(f"Ignore bots setting set to {await self.config.ignore_other_bots()}") @moderationset.command(name="mysql") @checks.is_owner() async def moderationset_mysql(self, ctx: commands.Context): """Configure MySQL connection details.""" await ctx.message.add_reaction("✅") await"Click the button below to configure your MySQL connection details.", view=self.ConfigButtons(60)) class ConfigButtons(discord.ui.View): def __init__(self, timeout): super().__init__() self.config = Config.get_conf(None, cog_name='Moderation', identifier=481923957134912) @discord.ui.button(label="Edit", style=discord.ButtonStyle.success) async def config_button(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument await interaction.response.send_modal(Moderation.MySQLConfigModal(self.config)) class MySQLConfigModal(discord.ui.Modal, title="MySQL Database Configuration"): def __init__(self, config): super().__init__() self.config = config address = discord.ui.TextInput( label="Address", placeholder="Input your MySQL address here.", style=discord.TextStyle.short, required=False, max_length=300 ) database = discord.ui.TextInput( label="Database", placeholder="Input the name of your database here.", style=discord.TextStyle.short, required=False, max_length=300 ) username = discord.ui.TextInput( label="Username", placeholder="Input your MySQL username here.", style=discord.TextStyle.short, required=False, max_length=300 ) password = discord.ui.TextInput( label="Password", placeholder="Input your MySQL password here.", style=discord.TextStyle.short, required=False, max_length=300 ) async def on_submit(self, interaction: discord.Interaction): message = "" if self.address.value != "": await self.config.mysql_address.set(self.address.value) message += f"- Address set to\n - `{self.address.value}`\n" if self.database.value != "": await self.config.mysql_database.set(self.database.value) message += f"- Database set to\n - `{self.database.value}`\n" if self.username.value != "": await self.config.mysql_username.set(self.username.value) message += f"- Username set to\n - `{self.username.value}`\n" if self.password.value != "": await self.config.mysql_password.set(self.password.value) trimmed_password = self.password.value[:8] message += f"- Password set to\n - `{trimmed_password}` - Trimmed for security\n" if message == "": trimmed_password = str(await self.config.mysql_password())[:8] send = f"No changes were made.\nCurrent configuration:\n- Address:\n - `{await self.config.mysql_address()}`\n- Database:\n - `{await self.config.mysql_database()}`\n- Username:\n - `{await self.config.mysql_username()}`\n- Password:\n - `{trimmed_password}` - Trimmed for security" else: send = f"Configuration changed:\n{message}" await interaction.response.send_message(send, ephemeral=True) @commands.command(aliases=["tdc"]) async def timedeltaconvert(self, ctx: commands.Context, *, duration: str): """This command converts a duration to a [`timedelta`]( Python object. **Example usage** `[p]timedeltaconvert 1 day 15hr 82 minutes 52s` **Output** `1 day, 16:22:52`""" try: parsed_time = parse(duration, as_timedelta=True, raise_exception=True) await ctx.send(f"`{str(parsed_time)}`") except ValueError: await ctx.send("Please provide a convertible value!")