1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
|
from os import getenv
import sys
import asyncio
from commands import answer, await_n, bot_commands
import config
import discord
from discord.ext import commands
from random import sample
class Cupido(commands.Bot):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.meta_channel = None
self.lobby_channel = None
self.pair_channels = []
self.task = None
self.reaction_map = {}
self.vote_map = {}
self.read_file()
def read_file(self):
try:
with open(config.TUTOR_FILE_PATH, 'r') as f:
self.tutors = {int(line) for line in f.read().split('\n')}
except FileNotFoundError:
self.tutors = set()
def write_file(self):
with open(config.TUTOR_FILE_PATH, 'w') as f:
f.write('\n'.join(str(t) for t in self.tutors))
async def await_coroutine(self, co):
if self.task is not None:
return 'already running'
self.task = asyncio.create_task(co)
try:
result = await self.task
except asyncio.CancelledError:
self.task = None
return 'cancelled'
finally:
self.task = None
return result
async def cancel_coroutine(self):
if self.task is not None:
self.task.cancel()
async def on_ready(self):
print(f'the bot {config.NAME} is logged in as "{self.user}"')
async def create_category(self, ctx):
category = await ctx.guild.create_category(config.CATEGORY_CHANNEL_NAME)
await answer(ctx, f'info: category created "{category}" ({category.id})')
return category
async def create_lobby(self, ctx):
lobby = await ctx.guild.create_voice_channel(
config.LOBBY_CHANNEL_NAME,
topic=config.LOBBY_CHANNEL_TOPIC,
category=self.meta_channel,
)
await answer(ctx, f'info: voice channel created "{lobby}" ({lobby.id})')
return lobby
def get_meta_channel(self, ctx):
return self.meta_channel or discord.utils.get(ctx.guild.categories, name=config.CATEGORY_CHANNEL_NAME)
def get_lobby_channel(self, ctx, meta_channel):
return (self.lobby_channel
or discord.utils.get(
ctx.guild.voice_channels,
name=config.LOBBY_CHANNEL_NAME,
category=meta_channel
))
def get_pair_channels_no_cache(self, ctx, meta_channel):
return sorted((channel for channel in ctx.guild.voice_channels
if channel.category == meta_channel
and channel.name.isdigit()),
key=lambda c: c.name)
def get_pair_channels(self, ctx, meta_channel):
return self.pair_channels or self.get_pair_channels_no_cache(ctx, meta_channel)
async def try_delete_channel(self, channel):
try:
await channel.delete()
return True
except discord.errors.NotFound:
return False
@staticmethod
def generate_panel_text(user, members, emojis, reaction_map=None, selected=None):
text = config.PANEL_TEXT
emojis_chosen = []
for member, emoji in zip(members, emojis):
if member.id == user.id:
continue
name = discord.utils.escape_markdown(member.nick or member.display_name, ignore_links=False)
text += f'\n {"☑️" if member.id == selected else "*"} {emoji} {name}'
if reaction_map is not None:
reaction_map[emoji] = member
emojis_chosen.append(emoji)
return text, emojis_chosen
async def send_panel(self, ctx, user, members):
try:
channel = user.dm_channel or await user.create_dm()
except discord.errors.HTTPException:
print(f'warning: could not send pm to {user}')
return
reaction_map = {}
emojis = sample(config.EMOJI_POOL, k=len(members))
text, emojis = self.generate_panel_text(user, members, emojis, reaction_map)
self.reaction_map[user.id] = reaction_map
embed = discord.Embed(title=config.PANEL_TITLE, type="rich", description=text, colour=discord.Colour.purple())
msg = await channel.send(embed=embed)
await await_n(map(msg.add_reaction, emojis))
return msg
async def destroy_pair_channels(self, ctx, meta_channel):
await await_n(map(self.try_delete_channel, self.get_pair_channels_no_cache(ctx, meta_channel)))
self.pair_channels = []
async def create_pair_channels(self, ctx, meta_channel, n):
await self.destroy_pair_channels(ctx, meta_channel)
futures = []
for i in range(1, n + 1):
futures.append(ctx.guild.create_voice_channel(str(i), category=meta_channel))
return await await_n(futures)
async def get_channels(self, ctx):
meta_channel = self.get_meta_channel(ctx)
lobby_channel = self.get_lobby_channel(ctx, meta_channel)
if meta_channel is None or lobby_channel is None:
await answer(ctx, 'error: cannot start shuffling, you need to initialize channels')
return None
return meta_channel, lobby_channel
async def on_reaction_add(self, reaction, user):
"""Overrides event handler in Bot"""
return await self.toggle_vote(reaction, user)
async def on_reaction_remove(self, reaction, user):
"""Overrides event handler in Bot"""
return await self.toggle_vote(reaction, user)
async def toggle_vote(self, reaction, user):
if reaction.message.author.id == user.id:
return
if (user.id not in self.reaction_map) or (str(reaction.emoji) not in self.reaction_map[user.id]):
return
chosen = self.reaction_map[user.id][str(reaction.emoji)]
if self.vote_map.get(user.id) == chosen:
self.vote_map[user.id] = None
else:
self.vote_map[user.id] = self.reaction_map[user.id][str(reaction.emoji)]
await self.update_message_panel(reaction.message, self.vote_map.get(user.id), user)
async def update_message_panel(self, message, vote, user):
embeds = message.embeds
if not embeds: return None
vote = None if vote is None else vote.id
embeds[0].description = self.generate_panel_text(
user,
list(self.reaction_map[user.id].values()),
list(self.reaction_map[user.id].keys()),
selected=vote)[0]
return await message.edit(embed=embeds[0])
def add_tutor(self, tutor):
self.tutors.add(tutor)
self.write_file()
def remove_tutor(self, tutor):
self.tutors.remove(tutor)
self.write_file()
def main():
token = getenv(config.TOKEN_ENV_VAR)
if token is None:
print('error: no token was given')
sys.exit(1)
bot = Cupido(activity=discord.Game(name=config.GAME_STATUS), command_prefix=config.COMMAND_PREFIX, case_insensitive=True)
bot.remove_command('help')
for cmd in bot_commands:
bot.add_command(cmd)
bot.run(token)
if __name__ == '__main__':
while True:
try:
main()
except KeyboardInterrupt:
break
except Exception as e:
print(f'[DEBUG] encountered exception: {type(e)}: {e}')
if e.args == ('Event loop is closed',):
print('[DEBUG] quit...')
break
print('[DEBUG] restarting...')
|