import asyncio from typing import Any, List import random import config from discord.ext import commands async def await_n(it) -> List[Any]: lst = [asyncio.create_task(task) for task in it] if not lst: return [] done, _ = await asyncio.wait(lst) return [task.result() for task in done] async def answer(ctx: commands.Context, msg: str): await ctx.send(f'{ctx.author.mention} {msg}') @commands.command(help='display this help message', aliases=('hepl', 'h', '?')) async def help(ctx: commands.Context): command_doc = '\n'.join( f' * {config.COMMAND_PREFIX.strip()} {c.name:15} - {c.help}' for c in ctx.bot.commands) await answer(ctx, f'''``` {config.HELP_TEXT}\nThese are all available commands:\n{command_doc}```''') @commands.command(help='create a new lobby', aliases=('create', 'inti', 'craete', 'cretae', 'c', 'i', '+')) async def init(ctx: commands.Context): ctx.bot.meta_channel = ctx.bot.get_meta_channel(ctx) or await ctx.bot.create_category(ctx) ctx.bot.lobby_channel = ctx.bot.get_lobby_channel(ctx, ctx.bot.meta_channel) or await ctx.bot.create_lobby(ctx) @commands.command(help=f'destruct all {config.NAME} channels', aliases=('kill', 'desctruction', 'genocide', '-')) async def destroy(ctx: commands.Context): await stop(ctx) futures = [] meta_channel = ctx.bot.get_meta_channel(ctx) for channel in (ctx.bot.get_lobby_channel(ctx, meta_channel), meta_channel): if channel: futures.append(channel.delete()) await await_n(futures) ctx.bot.lobby_channel = None ctx.bot.meta_channel = None ctx.bot.pair_channels = [] @commands.command(help='start shuffling', aliases=('start', 'run', 'strat', 'rnu')) async def shuffle(ctx: commands.Context, channel_size=2): if channel_size < 1: await answer(ctx, 'error: channel size must be at least 1 (you jerk)') return False channels = await ctx.bot.get_channels(ctx) if not channels: return False meta_channel, lobby_channel = channels members = lobby_channel.members[:] if not members: await answer(ctx, 'error: nobody wants to get shuffeled :(') return False if len(members) < channel_size: await answer(ctx, f'error: you are too few people ({len(members)}). Group size is {channel_size}') return False # round up the quotient between the member count and the channel size channel_count = (len(members) + channel_size - 1) // channel_size ctx.bot.pair_channels = await ctx.bot.create_pair_channels(ctx, meta_channel, channel_count) slots = [] for i, _ in enumerate(ctx.bot.pair_channels): slots.extend([i] * channel_size) random.shuffle(slots) futures = [] group_members = [[] for _ in ctx.bot.pair_channels] slot_members = [] for slot in slots: member = members.pop() channel = ctx.bot.pair_channels[slot] futures.append(member.move_to(channel)) group_members[slot].append(member) slot_members.append(member) for member, slot in zip(slot_members, slots): group = group_members[slot] await ctx.bot.send_panel(ctx, member, group) if members: await answer(ctx, 'warning: not all members got shuffeled') await await_n(futures) return True @commands.command(help='move everyone back to lobby', aliases=('quit', 'exit', 'abort', 'back', 'return')) async def stop(ctx: commands.Context, cancel_loop=True): channels = await ctx.bot.get_channels(ctx) if not channels: return meta_channel, lobby_channel = channels pair_channels = ctx.bot.get_pair_channels(ctx, meta_channel) if cancel_loop: await ctx.bot.cancel_coroutine() futures = [] for channel in pair_channels: for member in channel.members: futures.append(member.move_to(lobby_channel)) await await_n(futures) await ctx.bot.destroy_pair_channels(ctx, meta_channel) async def loop_cycle(ctx, t): if not await shuffle(ctx): return False await asyncio.sleep(t) await stop(ctx, cancel_loop=False) return True @commands.command( help=f'repeat "shuffle" and "stop" (default: {config.DEFAULT_LOOP_COUNT}) times and ' f' (default: {config.DEFAULT_LOOP_TIME}) seconds', aliases=('repeat', 'shuffleloop')) async def loop(ctx: commands.Context, n=config.DEFAULT_LOOP_COUNT, t=config.DEFAULT_LOOP_TIME): try: n, t = int(n), int(t) except ValueError: await answer(ctx, 'error: expecting positive integer arguments for and (e.g. "loop 5 60" to repeat 5 times)') return await answer(ctx, f'error: repeat shuffling {n} times and each {t} seconds') for _ in range(n): result = await ctx.bot.await_coroutine(loop_cycle(ctx, t)) message = { 'cancelled': 'cancelled loop command', 'already running': 'cannot start loop, another task is running...' }.get(result, None) if message is not None: return await answer(ctx, f'error: {message}') if not result: break bot_commands = [cmd for cmd in locals().values() if isinstance(cmd, commands.Command)]