summaryrefslogtreecommitdiff
path: root/bot.py
blob: 1ef8bec3c6376418f3fac4490d3e3a3beab0e60a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
from os import getenv
import sys

import asyncio
from commands import answer, await_n, bot_commands
import config

import discord
from discord.ext import commands
from random import sample
from collections import defaultdict as dd


class Cupido(commands.Bot):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.meta_channel = None
        self.lobby_channel = None
        self.pair_channels = []
        self.task = None
        self.reaction_map = {}
        self.vote_map = {}
        self.read_file()
        self.score_map = dd(lambda: 0)
        self.oldgroups = set()

    def read_file(self):
        try:
            with open(config.TUTOR_FILE_PATH, 'r') as f:
                self.tutors = {int(line) for line in f.read().split('\n')}
        except FileNotFoundError:
            self.tutors = set()

    def write_file(self):
        with open(config.TUTOR_FILE_PATH, 'w') as f:
            f.write('\n'.join(str(t) for t in self.tutors))

    async def await_coroutine(self, co):
        if self.task is not None:
            return 'already running'
        self.task = asyncio.create_task(co)
        try:
            result = await self.task
        except asyncio.CancelledError:
            self.task = None
            return 'cancelled'
        finally:
            self.task = None
        return result

    async def cancel_coroutine(self):
        if self.task is not None:
            self.task.cancel()

    async def on_ready(self):
        print(f'the bot {config.NAME} is logged in as "{self.user}"')

    async def create_category(self, ctx):
        category = await ctx.guild.create_category(config.CATEGORY_CHANNEL_NAME)
        await answer(ctx, f'info: category created "{category}" ({category.id})')
        return category

    async def create_lobby(self, ctx):
        lobby = await ctx.guild.create_voice_channel(
            config.LOBBY_CHANNEL_NAME,
            topic=config.LOBBY_CHANNEL_TOPIC,
            category=self.meta_channel,
        )
        await answer(ctx, f'info: voice channel created "{lobby}" ({lobby.id})')
        return lobby

    def get_meta_channel(self, ctx):
        return self.meta_channel or discord.utils.get(ctx.guild.categories, name=config.CATEGORY_CHANNEL_NAME)

    def get_lobby_channel(self, ctx, meta_channel):
        return (self.lobby_channel
                or discord.utils.get(
                    ctx.guild.voice_channels,
                    name=config.LOBBY_CHANNEL_NAME,
                    category=meta_channel
                ))

    def get_pair_channels_no_cache(self, ctx, meta_channel):
        return sorted((channel for channel in ctx.guild.voice_channels
                       if channel.category == meta_channel
                       and channel.name.isdigit()),
                      key=lambda c: c.name)

    def get_pair_channels(self, ctx, meta_channel):
        return self.pair_channels or self.get_pair_channels_no_cache(ctx, meta_channel)

    async def try_delete_channel(self, channel):
        try:
            await channel.delete()
            return True
        except discord.errors.NotFound:
            return False

    @staticmethod
    def get_username(user):
        return discord.utils.escape_markdown(user.nick or user.display_name, ignore_links=False)

    @classmethod
    def generate_panel_text(cls, user, members, emojis, reaction_map=None, selected=None):
        text = config.PANEL_TEXT
        emojis_chosen = []
        for member, emoji in zip(members, emojis):
            if member.id == user.id:
                continue
            name = cls.get_username(member)
            text += f'\n {"☑️" if member.id == selected else "*"} {emoji} {name}'
            if reaction_map is not None:
                reaction_map[emoji] = member
            emojis_chosen.append(emoji)
        return text, emojis_chosen

    async def get_dm_channel(self, user):
        try:
            return user.dm_channel or await user.create_dm()
        except discord.errors.HTTPException:
            print(f'warning: could not send pm to {user}')
            return

    async def send_panel(self, ctx, user, members):
        channel = await self.get_dm_channel(user)
        if channel is None:
            return
        reaction_map = {}
        emojis = sample(config.EMOJI_POOL, k=len(members))
        text, emojis = self.generate_panel_text(user, members, emojis, reaction_map)
        self.reaction_map[user.id] = reaction_map
        embed = discord.Embed(title=config.PANEL_TITLE, type="rich", description=text, colour=discord.Colour.purple())
        msg = await channel.send(embed=embed)
        await await_n(map(msg.add_reaction, emojis))
        return msg

    async def destroy_pair_channels(self, ctx, meta_channel):
        await await_n(map(self.try_delete_channel, self.get_pair_channels_no_cache(ctx, meta_channel)))
        self.pair_channels = []

    async def create_pair_channels(self, ctx, meta_channel, n):
        await self.destroy_pair_channels(ctx, meta_channel)
        futures = []
        for i in range(1, n + 1):
            futures.append(ctx.guild.create_voice_channel(str(i), category=meta_channel))
        return await await_n(futures)

    async def get_channels(self, ctx):
        meta_channel = self.get_meta_channel(ctx)
        lobby_channel = self.get_lobby_channel(ctx, meta_channel)
        if meta_channel is None or lobby_channel is None:
            await answer(ctx, 'error: cannot start shuffling, you need to initialize channels')
            return None
        return meta_channel, lobby_channel

    async def on_reaction_add(self, reaction, user):
        """Overrides event handler in Bot"""
        return await self.toggle_vote(reaction, user)

    async def on_reaction_remove(self, reaction, user):
        """Overrides event handler in Bot"""
        return await self.toggle_vote(reaction, user)

    async def toggle_vote(self, reaction, user):
        if reaction.message.author.id == user.id:
            return
        if (user.id not in self.reaction_map) or (str(reaction.emoji) not in self.reaction_map[user.id]):
            return
        chosen = self.reaction_map[user.id][str(reaction.emoji)]
        if self.vote_map.get(user.id) == chosen:
            self.vote_map[user.id] = None
        else:
            self.vote_map[user.id] = self.reaction_map[user.id][str(reaction.emoji)]
        await self.update_message_panel(reaction.message, self.vote_map.get(user.id), user)

    async def update_message_panel(self, message, vote, user):
        embeds = message.embeds
        if not embeds: return None
        vote = None if vote is None else vote.id
        embeds[0].description = self.generate_panel_text(
                user,
                list(self.reaction_map[user.id].values()),
                list(self.reaction_map[user.id].keys()),
                selected=vote)[0]
        return await message.edit(embed=embeds[0])

    def add_tutor(self, tutor):
        self.tutors.add(tutor)
        self.write_file()

    def remove_tutor(self, tutor):
        self.tutors.remove(tutor)
        self.write_file()

    async def update_scores(self, guild):
        futures = []
        for member in (guild.get_member(i) for i in self.vote_map.keys()):
            futures.append(self.update_score(member))
        await await_n(futures)
        self.vote_map = {}

    async def update_score(self, member):
        if member.id in self.tutors or member.id not in self.vote_map or self.vote_map[member.id] is None:
            return
        vote = self.vote_map[member.id]
        channel = self.get_dm_channel(member)
        name = self.get_username(vote)
        if vote.id in self.tutors:
            self.score_map[member] += config.SUCCESS_POINTS
            text = config.SUCCESS_TEXT.format(tutor=name, points=self.score_map[member])
            color = discord.Colour.green()
            title = config.SUCCESS_TITLE
        else:
            self.score_map[member] += config.FAILIURE_POINTS
            text = config.FAILIURE_TEXT.format(user=name, points=self.score_map[member])
            color = discord.Colour.red()
            title = config.FAILIURE_TITLE
        embed = discord.Embed(title=title, type="rich", description=text, colour=color)
        await (await channel).send(embed=embed)


def main():
    token = getenv(config.TOKEN_ENV_VAR)
    if token is None:
        print('error: no token was given')
        sys.exit(1)
    bot = Cupido(activity=discord.Game(name=config.GAME_STATUS), command_prefix=config.COMMAND_PREFIX, case_insensitive=True)
    bot.remove_command('help')
    for cmd in bot_commands:
        bot.add_command(cmd)
    bot.run(token)


if __name__ == '__main__':
    while True:
        try:
            main()
        except KeyboardInterrupt:
            break
        except Exception as e:
            print(f'[DEBUG] encountered exception: {type(e)}: {e}')
            if e.args == ('Event loop is closed',):
                print('[DEBUG] quit...')
                break
            print('[DEBUG] restarting...')