from os import getenv import sys import asyncio from commands import answer, await_n, bot_commands import config import discord from discord.ext import commands from random import sample from collections import defaultdict as dd class Cupido(commands.Bot): def __init__(self, admin_channel, *args, **kwargs): super().__init__(*args, **kwargs) self.meta_channel = None self.lobby_channel = None self.notify_channel = None self.pair_channels = [] self.task = None self.reaction_map = {} self.vote_map = {} self.read_file() self.reset_scoreboard() self.oldgroups = set() self.admin_channel = admin_channel def reset_scoreboard(self): self.score_map = dd(lambda: 0) def read_file(self): try: with open(config.TUTOR_FILE_PATH, 'r') as f: self.tutors = {int(line) for line in f.read().split('\n')} except FileNotFoundError: self.tutors = set() def write_file(self): with open(config.TUTOR_FILE_PATH, 'w') as f: f.write('\n'.join(str(t) for t in self.tutors)) async def await_coroutine(self, co): if self.task is not None: return 'already running' self.task = asyncio.create_task(co) try: result = await self.task except asyncio.CancelledError: self.task = None return 'cancelled' finally: self.task = None return result async def cancel_coroutine(self): if self.task is not None: self.task.cancel() async def on_ready(self): print(f'the bot {config.NAME} is logged in as "{self.user}"') async def create_category(self, ctx): category = await ctx.guild.create_category(config.CATEGORY_CHANNEL_NAME) await answer(ctx, f'info: category created "{category}" ({category.id})') return category async def create_voice_channel(self, ctx, name, category=None): channel = await ctx.guild.create_voice_channel( name, category=category, ) await channel.set_permissions(ctx.guild.default_role, **config.CHANNEL_PERMISSIONS) await channel.set_permissions(self.user, **config.BOT_PERMISSIONS) return channel async def create_lobby(self, ctx): lobby = await self.create_voice_channel( ctx, config.LOBBY_CHANNEL_NAME, category=self.meta_channel, ) await answer(ctx, f'info: voice channel created "{lobby}" ({lobby.id})') return lobby async def create_notify_channel(self, ctx): channel = await ctx.guild.create_text_channel( config.NOTIFY_CHANNEL_NAME, category=self.meta_channel, topic=config.NOTIFY_CHANNEL_TOPIC, overwrites={ self.user: discord.PermissionOverwrite( add_reactions=True, send_messages=True, mention_everyone=True, manage_roles=True, manage_permissions=True, manage_messages=True, manage_channels=True, read_messages=True, read_message_history=True, embed_links=True, ), ctx.guild.default_role: discord.PermissionOverwrite( add_reactions=False, send_messages=False, send_tts_messages=False, embed_links=False, attach_files=False, mention_everyone=False, ), } ) await answer(ctx, f'info: text channel created "{channel}" ({channel.id})') return channel def get_meta_channel(self, ctx): return self.meta_channel or discord.utils.get(ctx.guild.categories, name=config.CATEGORY_CHANNEL_NAME) def get_lobby_channel(self, ctx, meta_channel): return (self.lobby_channel or discord.utils.get( ctx.guild.voice_channels, name=config.LOBBY_CHANNEL_NAME, category=meta_channel )) def get_notify_channel(self, ctx, meta_channel): return (self.notify_channel or discord.utils.get( ctx.guild.text_channels, name=config.NOTIFY_CHANNEL_NAME, category=meta_channel )) def get_pair_channels_no_cache(self, ctx, meta_channel): return sorted((channel for channel in ctx.guild.voice_channels if channel.category == meta_channel and channel.name.isdigit()), key=lambda c: c.name) def get_pair_channels(self, ctx, meta_channel): return self.pair_channels or self.get_pair_channels_no_cache(ctx, meta_channel) async def try_delete_channel(self, channel): try: await channel.delete() return True except discord.errors.NotFound: return False @staticmethod def get_username(user): return discord.utils.escape_markdown(user.nick or user.display_name, ignore_links=False) @classmethod def generate_panel_text(cls, user, members, emojis, reaction_map=None, selected=None): text = config.PANEL_TEXT emojis_chosen = [] for member, emoji in zip(members, emojis): if member.id == user.id: continue name = cls.get_username(member) text += f'\n {"☑️" if member.id == selected else "*"} {emoji} {name}' if reaction_map is not None: reaction_map[emoji] = member emojis_chosen.append(emoji) return text, emojis_chosen async def get_dm_channel(self, user): try: return user.dm_channel or await user.create_dm() except discord.errors.HTTPException: print(f'warning: could not send pm to {user}') return async def send_panel(self, ctx, user, members): channel = await self.get_dm_channel(user) if channel is None: return reaction_map = {} emojis = sample(config.EMOJI_POOL, k=len(members)) text, emojis = self.generate_panel_text(user, members, emojis, reaction_map) self.reaction_map[user.id] = reaction_map embed = discord.Embed(title=config.PANEL_TITLE, type="rich", description=text, colour=discord.Colour.purple()) msg = await channel.send(embed=embed) await await_n(map(msg.add_reaction, emojis)) self.vote_map[user.id] = None return msg async def destroy_pair_channels(self, ctx, meta_channel): await await_n(map(self.try_delete_channel, self.get_pair_channels_no_cache(ctx, meta_channel))) self.pair_channels = [] async def create_pair_channels(self, ctx, meta_channel, n): await self.destroy_pair_channels(ctx, meta_channel) futures = [] for i in range(1, n + 1): futures.append(self.create_voice_channel(ctx, str(i), category=meta_channel)) channels = await await_n(futures) return channels async def get_channels(self, ctx, msg=True): meta_channel = self.get_meta_channel(ctx) lobby_channel = self.get_lobby_channel(ctx, meta_channel) if meta_channel is None or lobby_channel is None: if msg: await answer(ctx, 'error: cannot start shuffling, you need to initialize channels') return None return meta_channel, lobby_channel async def on_reaction_add(self, reaction, user): """Overrides event handler in Bot""" return await self.toggle_vote(reaction, user) async def on_reaction_remove(self, reaction, user): """Overrides event handler in Bot""" return await self.toggle_vote(reaction, user) async def toggle_vote(self, reaction, user): if reaction.message.author.id == user.id: return if (user.id not in self.reaction_map) or (str(reaction.emoji) not in self.reaction_map[user.id]): return chosen = self.reaction_map[user.id][str(reaction.emoji)] if self.vote_map.get(user.id) == chosen: self.vote_map[user.id] = None else: self.vote_map[user.id] = self.reaction_map[user.id][str(reaction.emoji)] await self.update_message_panel(reaction.message, self.vote_map.get(user.id), user) async def update_message_panel(self, message, vote, user): embeds = message.embeds if not embeds: return None vote = None if vote is None else vote.id embeds[0].description = self.generate_panel_text( user, list(self.reaction_map[user.id].values()), list(self.reaction_map[user.id].keys()), selected=vote)[0] return await message.edit(embed=embeds[0]) def add_tutor(self, tutor): self.tutors.add(tutor) self.write_file() def remove_tutor(self, tutor): self.tutors.remove(tutor) self.write_file() async def update_scores(self, guild): futures = [] for member in (guild.get_member(i) for i in self.vote_map.keys()): futures.append(self.update_score(member)) await await_n(futures) self.vote_map = {} async def update_score(self, member): if member.id in self.tutors or member.id not in self.vote_map: return vote = self.vote_map[member.id] channel = self.get_dm_channel(member) name = None if vote is None else self.get_username(vote) tutorset = set(self.tutors) possible_votes = self.reaction_map[member.id].values() tutors = {tutor for tutor in possible_votes if tutor.id in tutorset} if vote is None and tutors: self.score_map[member] += config.PASS_BAD_POINTS text = config.PASS_BAD_TEXT.format(points=self.score_map[member], tutor=', '.join(self.get_username(tutor) for tutor in tutors)) color = discord.Colour.red() title = config.PASS_BAD_TITLE elif vote is None: self.score_map[member] += config.PASS_GOOD_POINTS text = config.PASS_GOOD_TEXT.format(points=self.score_map[member]) color = discord.Colour.green() title = config.PASS_GOOD_TITLE elif vote.id in self.tutors: self.score_map[member] += config.SUCCESS_POINTS text = config.SUCCESS_TEXT.format(tutor=name, points=self.score_map[member]) color = discord.Colour.green() title = config.SUCCESS_TITLE else: self.score_map[member] += config.FAILIURE_POINTS text = config.FAILIURE_TEXT.format(user=name, points=self.score_map[member]) color = discord.Colour.red() title = config.FAILIURE_TITLE embed = discord.Embed(title=title, type="rich", description=text, colour=color) await (await channel).send(embed=embed) async def print_scoreboard(self, ctx, target): text = config.SCOREBOARD_TEXT scores = [i for i in self.score_map.items()] scores = sorted(scores, reverse=True, key=lambda i: i[1]) for n, (member, score) in enumerate(scores): if score > 0: text += f'\n {n+1}. {self.get_username(member)} hat {score} Punkt{"" if score == 1 else "e"}' embed = discord.Embed(title=config.SCOREBOARD_TITLE, type="rich", description=text, colour=discord.Colour.purple()) await target.send(embed=embed) def main(): token = getenv(config.TOKEN_ENV_VAR) if token is None: print('error: no token was given') sys.exit(1) admin_channel = getenv(config.ADMIN_CHANNEL_ENV_VAR) if admin_channel is None: print('error: no admin channel was given') sys.exit(1) bot = Cupido(int(admin_channel), activity=discord.Game(name=config.GAME_STATUS), command_prefix=config.COMMAND_PREFIX, case_insensitive=True) bot.remove_command('help') for cmd in bot_commands: bot.add_command(cmd) bot.run(token) if __name__ == '__main__': while True: try: main() except KeyboardInterrupt: break except Exception as e: print(f'[DEBUG] encountered exception: {type(e)}: {e}') if e.args == ('Event loop is closed',): print('[DEBUG] quit...') break print('[DEBUG] restarting...')