from os import getenv import sys import asyncio from commands import answer, await_n, bot_commands import config import discord from discord.ext import commands from random import sample from collections import defaultdict as dd class Cupido(commands.Bot): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.meta_channel = None self.lobby_channel = None self.pair_channels = [] self.task = None self.reaction_map = {} self.vote_map = {} self.read_file() self.score_map = dd(lambda: 0) self.oldgroups = set() def read_file(self): try: with open(config.TUTOR_FILE_PATH, 'r') as f: self.tutors = {int(line) for line in f.read().split('\n')} except FileNotFoundError: self.tutors = set() def write_file(self): with open(config.TUTOR_FILE_PATH, 'w') as f: f.write('\n'.join(str(t) for t in self.tutors)) async def await_coroutine(self, co): if self.task is not None: return 'already running' self.task = asyncio.create_task(co) try: result = await self.task except asyncio.CancelledError: self.task = None return 'cancelled' finally: self.task = None return result async def cancel_coroutine(self): if self.task is not None: self.task.cancel() async def on_ready(self): print(f'the bot {config.NAME} is logged in as "{self.user}"') async def create_category(self, ctx): category = await ctx.guild.create_category(config.CATEGORY_CHANNEL_NAME) await answer(ctx, f'info: category created "{category}" ({category.id})') return category async def create_lobby(self, ctx): lobby = await ctx.guild.create_voice_channel( config.LOBBY_CHANNEL_NAME, topic=config.LOBBY_CHANNEL_TOPIC, category=self.meta_channel, ) await answer(ctx, f'info: voice channel created "{lobby}" ({lobby.id})') return lobby def get_meta_channel(self, ctx): return self.meta_channel or discord.utils.get(ctx.guild.categories, name=config.CATEGORY_CHANNEL_NAME) def get_lobby_channel(self, ctx, meta_channel): return (self.lobby_channel or discord.utils.get( ctx.guild.voice_channels, name=config.LOBBY_CHANNEL_NAME, category=meta_channel )) def get_pair_channels_no_cache(self, ctx, meta_channel): return sorted((channel for channel in ctx.guild.voice_channels if channel.category == meta_channel and channel.name.isdigit()), key=lambda c: c.name) def get_pair_channels(self, ctx, meta_channel): return self.pair_channels or self.get_pair_channels_no_cache(ctx, meta_channel) async def try_delete_channel(self, channel): try: await channel.delete() return True except discord.errors.NotFound: return False @staticmethod def get_username(user): return discord.utils.escape_markdown(user.nick or user.display_name, ignore_links=False) @classmethod def generate_panel_text(cls, user, members, emojis, reaction_map=None, selected=None): text = config.PANEL_TEXT emojis_chosen = [] for member, emoji in zip(members, emojis): if member.id == user.id: continue name = cls.get_username(member) text += f'\n {"☑️" if member.id == selected else "*"} {emoji} {name}' if reaction_map is not None: reaction_map[emoji] = member emojis_chosen.append(emoji) return text, emojis_chosen async def get_dm_channel(self, user): try: return user.dm_channel or await user.create_dm() except discord.errors.HTTPException: print(f'warning: could not send pm to {user}') return async def send_panel(self, ctx, user, members): channel = await self.get_dm_channel(user) if channel is None: return reaction_map = {} emojis = sample(config.EMOJI_POOL, k=len(members)) text, emojis = self.generate_panel_text(user, members, emojis, reaction_map) self.reaction_map[user.id] = reaction_map embed = discord.Embed(title=config.PANEL_TITLE, type="rich", description=text, colour=discord.Colour.purple()) msg = await channel.send(embed=embed) await await_n(map(msg.add_reaction, emojis)) return msg async def destroy_pair_channels(self, ctx, meta_channel): await await_n(map(self.try_delete_channel, self.get_pair_channels_no_cache(ctx, meta_channel))) self.pair_channels = [] async def create_pair_channels(self, ctx, meta_channel, n): await self.destroy_pair_channels(ctx, meta_channel) futures = [] for i in range(1, n + 1): futures.append(ctx.guild.create_voice_channel(str(i), category=meta_channel)) return await await_n(futures) async def get_channels(self, ctx): meta_channel = self.get_meta_channel(ctx) lobby_channel = self.get_lobby_channel(ctx, meta_channel) if meta_channel is None or lobby_channel is None: await answer(ctx, 'error: cannot start shuffling, you need to initialize channels') return None return meta_channel, lobby_channel async def on_reaction_add(self, reaction, user): """Overrides event handler in Bot""" return await self.toggle_vote(reaction, user) async def on_reaction_remove(self, reaction, user): """Overrides event handler in Bot""" return await self.toggle_vote(reaction, user) async def toggle_vote(self, reaction, user): if reaction.message.author.id == user.id: return if (user.id not in self.reaction_map) or (str(reaction.emoji) not in self.reaction_map[user.id]): return chosen = self.reaction_map[user.id][str(reaction.emoji)] if self.vote_map.get(user.id) == chosen: self.vote_map[user.id] = None else: self.vote_map[user.id] = self.reaction_map[user.id][str(reaction.emoji)] await self.update_message_panel(reaction.message, self.vote_map.get(user.id), user) async def update_message_panel(self, message, vote, user): embeds = message.embeds if not embeds: return None vote = None if vote is None else vote.id embeds[0].description = self.generate_panel_text( user, list(self.reaction_map[user.id].values()), list(self.reaction_map[user.id].keys()), selected=vote)[0] return await message.edit(embed=embeds[0]) def add_tutor(self, tutor): self.tutors.add(tutor) self.write_file() def remove_tutor(self, tutor): self.tutors.remove(tutor) self.write_file() async def update_scores(self, guild): futures = [] for member in (guild.get_member(i) for i in self.vote_map.keys()): futures.append(self.update_score(member)) await await_n(futures) self.vote_map = {} async def update_score(self, member): if member.id in self.tutors or member.id not in self.vote_map or self.vote_map[member.id] is None: return vote = self.vote_map[member.id] channel = self.get_dm_channel(member) name = self.get_username(vote) if vote.id in self.tutors: self.score_map[member.id] += config.SUCCESS_POINTS text = config.SUCCESS_TEXT.format(tutor=name, points=self.score_map[member.id]) color = discord.Colour.green() title = config.SUCCESS_TITLE else: self.score_map[member.id] += config.FAILIURE_POINTS text = config.FAILIURE_TEXT.format(user=name, points=self.score_map[member.id]) color = discord.Colour.red() title = config.FAILIURE_TITLE embed = discord.Embed(title=title, type="rich", description=text, colour=color) await (await channel).send(embed=embed) def main(): token = getenv(config.TOKEN_ENV_VAR) if token is None: print('error: no token was given') sys.exit(1) bot = Cupido(activity=discord.Game(name=config.GAME_STATUS), command_prefix=config.COMMAND_PREFIX, case_insensitive=True) bot.remove_command('help') for cmd in bot_commands: bot.add_command(cmd) bot.run(token) if __name__ == '__main__': while True: try: main() except KeyboardInterrupt: break except Exception as e: print(f'[DEBUG] encountered exception: {type(e)}: {e}') if e.args == ('Event loop is closed',): print('[DEBUG] quit...') break print('[DEBUG] restarting...')