import asyncio from typing import Any, List import random import config import discord from discord.ext import commands async def await_n(it) -> List[Any]: lst = [asyncio.create_task(task) for task in it] if not lst: return [] done, _ = await asyncio.wait(lst) return [task.result() for task in done] async def answer(ctx: commands.Context, msg: str, *args, **kwargs): await ctx.send(f'{ctx.author.mention} {msg}', *args, **kwargs) @commands.command(help='display this help message', aliases=('hepl', 'h', '?')) async def help(ctx: commands.Context): command_doc = '\n'.join( f' * {config.COMMAND_PREFIX}{c.name:15} - {c.help}' for c in ctx.bot.commands) await answer(ctx, f'''``` {config.HELP_TEXT}\nThese are all available commands:\n{command_doc}```''') @commands.command(help='create a new lobby', aliases=('create', 'inti', 'craete', 'cretae', 'c', 'i', '+')) async def init(ctx: commands.Context): ctx.bot.meta_channel = ctx.bot.get_meta_channel(ctx) or await ctx.bot.create_category(ctx) ctx.bot.lobby_channel = ctx.bot.get_lobby_channel(ctx, ctx.bot.meta_channel) or await ctx.bot.create_lobby(ctx) @commands.command(help=f'destruct all {config.NAME} channels', aliases=('kill', 'desctruction', 'genocide', '-')) async def destroy(ctx: commands.Context): await stop(ctx) futures = [] meta_channel = ctx.bot.get_meta_channel(ctx) for channel in (ctx.bot.get_lobby_channel(ctx, meta_channel), meta_channel): if channel: futures.append(channel.delete()) await await_n(futures) ctx.bot.lobby_channel = None ctx.bot.meta_channel = None ctx.bot.pair_channels = [] def round_up_div(a, b): a, b = divmod(a, b) return a+1 if b else a @commands.command(help='start shuffling', aliases=('start', 'run', 'strat', 'rnu')) async def shuffle(ctx: commands.Context, channel_size=2): if channel_size < 1: await answer(ctx, 'error: channel size must be at least 1 (you jerk)') return False channels = await ctx.bot.get_channels(ctx) if not channels: return False meta_channel, lobby_channel = channels members = await await_n(map(ctx.guild.fetch_member, lobby_channel.voice_states.keys())) if not members: await answer(ctx, 'error: nobody wants to get shuffeled :(') return False if len(members) < channel_size: await answer(ctx, f'error: you are too few people ({len(members)}). Group size is {channel_size}') return False channel_count = len(members) // channel_size counts = [0] * channel_count for i, _ in enumerate(members): counts[i % len(counts)] += 1 normals = [] tutors = [] random.shuffle(members) for member in members: if member.id in ctx.bot.tutors: tutors.append(member) else: normals.append(member) groups = [[] for _ in range(channel_count)] for count, group in zip(counts, groups): tutor_count = min(count, round_up_div(len(tutors), len(groups))) for i in range(tutor_count): if tutors: group.append(tutors.pop()) for _ in range(count - tutor_count): for i, member in enumerate(normals): if any((frozenset((member.id, other.id)) in ctx.bot.oldgroups) for other in group): continue group.append(normals.pop(i)) break else: group.append(normals.pop()) futures = [] ctx.bot.pair_channels = await ctx.bot.create_pair_channels(ctx, meta_channel, channel_count) random.shuffle(ctx.bot.pair_channels) for n, group in enumerate(groups): for member in group: futures.append(member.move_to(ctx.bot.pair_channels[n])) for member1 in group: for member2 in group: if member1 != member2: ctx.bot.oldgroups.add(frozenset((member1.id, member2.id))) random.shuffle(futures) for group in groups: for member in group: futures.append(ctx.bot.send_panel(ctx, member, group)) await await_n(futures) return True @commands.command(help='move everyone back to lobby', aliases=('quit', 'exit', 'abort', 'back', 'return')) async def stop(ctx: commands.Context, cancel_loop=True): channels = await ctx.bot.get_channels(ctx) if not channels: return meta_channel, lobby_channel = channels pair_channels = ctx.bot.get_pair_channels(ctx, meta_channel) if cancel_loop: await ctx.bot.cancel_coroutine() futures = [] for channel in pair_channels: for member in channel.members: futures.append(member.move_to(lobby_channel)) await await_n(futures) await ctx.bot.destroy_pair_channels(ctx, meta_channel) async def loop_cycle(ctx, t): if not await shuffle(ctx, 3): return False await asyncio.sleep(t) await ctx.bot.update_scores(ctx.guild) await stop(ctx, cancel_loop=False) return True @commands.command( help=f'repeat "shuffle" and "stop" (default: {config.DEFAULT_LOOP_COUNT}) times and ' f' (default: {config.DEFAULT_LOOP_TIME}) seconds', aliases=('repeat', 'shuffleloop')) async def loop(ctx: commands.Context, n=config.DEFAULT_LOOP_COUNT, t=config.DEFAULT_LOOP_TIME): try: n, t = int(n), int(t) except ValueError: await answer(ctx, 'error: expecting positive integer arguments for and (e.g. "loop 5 60" to repeat 5 times)') return await answer(ctx, f'repeat shuffling {n} times and each {t} seconds') for _ in range(n): result = await ctx.bot.await_coroutine(loop_cycle(ctx, t)) message = { 'cancelled': 'cancelled loop command', 'already running': 'cannot start loop, another task is running...' }.get(result, None) if message is not None: return await answer(ctx, f'error: {message}') if not result: break @commands.command( help='add a tutor', aliases=('tutor', 'hello') ) async def add(ctx: commands.Context, id=None): if id is None: return await answer(ctx, 'error: expecting user id') try: id = int(id) except ValueError: return await answer(ctx, 'error: invalid user id') ctx.bot.add_tutor(id) await answer(ctx, f'added user id {id} to the tutor list') @commands.command( help='remove a tutor', aliases=('rm', 'del', 'delete', 'byebye', 'bb') ) async def remove(ctx: commands.Context, id=None): if id is None: return await answer(ctx, 'error: expecting user id') try: id = int(id) except ValueError: return await answer(ctx, 'error: invalid user id') ctx.bot.remove_tutor(id) await answer(ctx, f'romved user id {id} from the tutor list') @commands.command( help='list all tutors', aliases=('tutors',) ) async def list(ctx: commands.Context): if ctx.bot.tutors: users = [] for tutor in ctx.bot.tutors: try: user = ctx.bot.get_user(tutor) or await ctx.bot.fetch_user(tutor) users.append((str(user), tutor)) except Exception as e: users.append(('*unknown*', tutor)) await answer(ctx, '```\n' + '\n'.join(f' • {name} ({id})' for name, id in users) + '\n```') else: await answer(ctx, 'there is no tutor :/') @commands.command( help='get the scoreboard', aliases=('scores', 'score', 'points') ) async def scoreboard(ctx: commands.Context): text = config.SCOREBOARD_TEXT scores = [i for i in ctx.bot.score_map.items()] scores = sorted(scores, reverse=True, key=lambda i: i[1]) for n, (member, score) in enumerate(scores): if score > 0: text += f'\n {n+1}. {ctx.bot.get_username(member)} hat {score} Punkt{"" if score == 1 else "e"}' embed = discord.Embed(title=config.SCOREBOARD_TITLE, type="rich", description=text, colour=discord.Colour.purple()) await ctx.send(embed=embed) bot_commands = [cmd for cmd in locals().values() if isinstance(cmd, commands.Command)]