1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
from os import getenv
import sys
import asyncio
from commands import answer, await_n, bot_commands
import config
import discord
from discord.ext import commands
class Cupido(commands.Bot):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.meta_channel = None
self.lobby_channel = None
self.pair_channels = []
self.task = None
async def await_coroutine(self, co):
if self.task is not None:
return 'already running'
self.task = asyncio.create_task(co)
try:
result = await self.task
except asyncio.CancelledError:
self.task = None
return 'cancelled'
finally:
self.task = None
return result
async def cancel_coroutine(self):
if self.task is not None:
self.task.cancel()
async def on_ready(self):
print(f'the bot {config.NAME} is logged in as "{self.user}"')
async def create_category(self, ctx):
category = await ctx.guild.create_category(config.CATEGORY_CHANNEL_NAME)
await answer(ctx, f'info: category created "{category}" ({category.id})')
return category
async def create_lobby(self, ctx):
lobby = await ctx.guild.create_voice_channel(
config.LOBBY_CHANNEL_NAME,
topic=config.LOBBY_CHANNEL_TOPIC,
category=self.meta_channel,
)
await answer(ctx, f'info: voice channel created "{lobby}" ({lobby.id})')
return lobby
def get_meta_channel(self, ctx):
return self.meta_channel or discord.utils.get(ctx.guild.categories, name=config.CATEGORY_CHANNEL_NAME)
def get_lobby_channel(self, ctx, meta_channel):
return (self.lobby_channel
or discord.utils.get(
ctx.guild.voice_channels,
name=config.LOBBY_CHANNEL_NAME,
category=meta_channel
))
def get_pair_channels_no_cache(self, ctx, meta_channel):
return sorted((channel for channel in ctx.guild.voice_channels
if channel.category == meta_channel
and channel.name.isdigit()),
key=lambda c: c.name)
def get_pair_channels(self, ctx, meta_channel):
return self.pair_channels or self.get_pair_channels_no_cache(ctx, meta_channel)
async def try_delete_channel(self, channel):
try:
await channel.delete()
return True
except discord.errors.NotFound:
return False
async def send_panel(self, ctx, user, members):
channel = user.dm_channel or await user.create_dm()
text = config.PANEL_TEXT
n = 0
for member in members:
if member == user:
continue
name = discord.utils.escape_markdown(discord.utils.escape_mentions(member.nick), ignore_links=False)
text += f'\n * {config.EMOJI_POOL[n]} {name}'
n += 1
embed = discord.Embed(title=config.PANEL_TITLE, type="rich", description=text, colour=discord.Colour.purple())
await channel.send(embed=embed)
async def destroy_pair_channels(self, ctx, meta_channel):
await await_n(map(self.try_delete_channel, self.get_pair_channels_no_cache(ctx, meta_channel)))
self.pair_channels = []
async def create_pair_channels(self, ctx, meta_channel, n):
await self.destroy_pair_channels(ctx, meta_channel)
futures = []
for i in range(1, n + 1):
futures.append(ctx.guild.create_voice_channel(str(i), category=meta_channel))
return await await_n(futures)
async def get_channels(self, ctx):
meta_channel = self.get_meta_channel(ctx)
lobby_channel = self.get_lobby_channel(ctx, meta_channel)
if meta_channel is None or lobby_channel is None:
await answer(ctx, 'error: cannot start shuffling, you need to initialize channels')
return None
return meta_channel, lobby_channel
def main():
token = getenv(config.TOKEN_ENV_VAR)
if token is None:
print('error: no token was given')
sys.exit(1)
bot = Cupido(activity=discord.Game(name=config.GAME_STATUS), command_prefix=config.COMMAND_PREFIX, case_insensitive=True)
bot.remove_command('help')
for cmd in bot_commands:
bot.add_command(cmd)
bot.run(token)
if __name__ == '__main__':
while True:
try:
main()
except KeyboardInterrupt:
break
except Exception as e:
print(f'[DEBUG] encountered exception: {type(e)}: {e}')
if e.args == ('Event loop is closed',):
print('[DEBUG] quit...')
break
print('[DEBUG] restarting...')
|