import asyncio from typing import Any, List import random import config import discord from discord.ext import commands async def try_await(future): try: return await future except Exception as e: print(f'error occured: {e}') async def await_n(it) -> List[Any]: lst = [asyncio.create_task(try_await(task)) for task in it] if not lst: return [] done, _ = await asyncio.wait(lst) return [task.result() for task in done] async def answer(ctx: commands.Context, msg: str, *args, **kwargs): await ctx.send(f'{ctx.author.mention} {msg}', *args, **kwargs) def is_admin_channel(ctx: commands.Context): return ctx.message.channel.id == ctx.bot.admin_channel async def get_members(guild, channel): futures = [] insts = [] for i in channel.voice_states.keys(): inst = guild.get_member(i) if inst is None: futures.append(guild.fetch_member(i)) else: insts.append(inst) members = insts + await await_n(futures) return members @commands.command(help='display this help message', aliases=('hepl', 'h', '?')) async def help(ctx: commands.Context): command_doc = '\n'.join( f' • {config.COMMAND_PREFIX}{c.name:15} - {c.help}' for c in ctx.bot.commands) doc = f'{config.HELP_TEXT}\nThese are all available commands:\n{command_doc}' embed = discord.Embed(title="Hilfe", type="rich", description=doc, colour=discord.Colour.purple()) await answer(ctx, '', embed=embed) @commands.command(help='create a new lobby', aliases=('create', 'inti', 'craete', 'cretae', 'c', 'i', '+')) async def init(ctx: commands.Context): if not is_admin_channel(ctx): return ctx.bot.meta_channel = ctx.bot.get_meta_channel(ctx) or await ctx.bot.create_category(ctx) ctx.bot.lobby_channel = ctx.bot.get_lobby_channel(ctx, ctx.bot.meta_channel) or await ctx.bot.create_lobby(ctx) ctx.bot.notify_channel = ctx.bot.get_notify_channel(ctx, ctx.bot.meta_channel) or await ctx.bot.create_notify_channel(ctx) @commands.command(help=f'destruct all {config.NAME} channels', aliases=('kill', 'desctruction', 'genocide', '-')) async def destroy(ctx: commands.Context): if not is_admin_channel(ctx): return await stop(ctx) futures = [] meta_channel = ctx.bot.get_meta_channel(ctx) for channel in (ctx.bot.get_lobby_channel(ctx, meta_channel), ctx.bot.get_notify_channel(ctx, meta_channel), meta_channel): if channel: futures.append(channel.delete()) await await_n(futures) ctx.bot.lobby_channel = None ctx.bot.meta_channel = None ctx.bot.notify_channel = None ctx.bot.pair_channels = [] def round_up_div(a, b): a, b = divmod(a, b) return a+1 if b else a @commands.command(help='start shuffling', aliases=('start', 'run', 'strat', 'rnu')) async def shuffle(ctx: commands.Context, channel_size=2, frompairchannels=False, do_score=True): if not is_admin_channel(ctx): return if channel_size < 1: await answer(ctx, 'error: channel size must be at least 1 (you jerk)') return False channels = await ctx.bot.get_channels(ctx) if not channels: return False meta_channel, lobby_channel = channels if frompairchannels: await await_n(pc.edit(name=pc.name+" Old") for pc in ctx.bot.pair_channels) members = sum(await await_n(get_members(ctx.guild, pc) for pc in ctx.bot.pair_channels), start=[]) oldpairs = ctx.bot.pair_channels ctx.bot.pair_channels = [] else: members = await get_members(ctx.guild, lobby_channel) if not members: await answer(ctx, 'error: nobody wants to get shuffeled :(') return False if len(members) < channel_size: await answer(ctx, f'error: you are too few people ({len(members)}). Group size is {channel_size}') return False channel_count = len(members) // channel_size counts = [0] * channel_count for i, _ in enumerate(members): counts[i % len(counts)] += 1 normals = [] tutors = [] random.shuffle(members) for member in members: if member.id in ctx.bot.tutors: tutors.append(member) else: normals.append(member) groups = [[] for _ in range(channel_count)] for count, group in zip(counts, groups): tutor_count = min(count, round_up_div(len(tutors), len(groups))) for i in range(tutor_count): if tutors: group.append(tutors.pop()) for _ in range(count - tutor_count): for i, member in enumerate(normals): if any((frozenset((member.id, other.id)) in ctx.bot.oldgroups) for other in group): continue group.append(normals.pop(i)) break else: group.append(normals.pop()) futures = [] ctx.bot.pair_channels = await ctx.bot.create_pair_channels(ctx, meta_channel, channel_count) random.shuffle(ctx.bot.pair_channels) for n, group in enumerate(groups): for member in group: futures.append(member.move_to(ctx.bot.pair_channels[n])) for member1 in group: for member2 in group: if member1 != member2: ctx.bot.oldgroups.add(frozenset((member1.id, member2.id))) random.shuffle(futures) await await_n(futures) if do_score: futures = [] for group in groups: for member in group: futures.append(ctx.bot.send_panel(ctx, member, group)) await await_n(futures) if frompairchannels: await await_n(ctx.bot.try_delete_channel(c) for c in oldpairs) return True @commands.command(help='move everyone back to lobby', aliases=('quit', 'exit', 'abort', 'back', 'return')) async def stop(ctx: commands.Context, cancel_loop=True): if not is_admin_channel(ctx): return channels = await ctx.bot.get_channels(ctx, msg=False) if not channels: return meta_channel, lobby_channel = channels pair_channels = ctx.bot.get_pair_channels(ctx, meta_channel) if cancel_loop: await ctx.bot.cancel_coroutine() futures = [] for channel in pair_channels: for member in channel.members: futures.append(member.move_to(lobby_channel)) await await_n(futures) await ctx.bot.destroy_pair_channels(ctx, meta_channel) async def loop_cycle(ctx, t, nt, g, isfirst, islast, do_score): if not await shuffle(ctx, g, frompairchannels=not isfirst, do_score=do_score): return False if nt >= t: await asyncio.sleep(t) else: await asyncio.sleep(nt) notify_channel = ctx.bot.get_notify_channel(ctx, ctx.bot.get_meta_channel(ctx)) embed = discord.Embed(title=f"Noch {t-nt} Sekunden", type="rich", description="", colour=discord.Colour.blue()) await notify_channel.send(embed=embed) await asyncio.sleep(t - nt) if do_score: await ctx.bot.update_scores(ctx.guild) if islast: await stop(ctx, cancel_loop=False) return True @commands.command( help=f'repeat "shuffle" and "stop" (default: {config.DEFAULT_LOOP_COUNT}) times and ' f' (default: {config.DEFAULT_LOOP_TIME}) seconds', aliases=('repeat', 'shuffleloop')) async def loop(ctx: commands.Context, n=config.DEFAULT_LOOP_COUNT, t=config.DEFAULT_LOOP_TIME, nt=config.DEFAULT_NOTIFY_TIME, g=3, do_score=True): if not is_admin_channel(ctx): return try: n, t = int(n), int(t) except ValueError: await answer(ctx, 'error: expecting positive integer arguments for and (e.g. "loop 5 60" to repeat 5 times)') return await answer(ctx, f'repeat shuffling {n} times and each {t} seconds') for i in range(n): result = await ctx.bot.await_coroutine(loop_cycle(ctx, t, nt, g, i == 0, i == n-1, do_score)) message = { 'cancelled': 'cancelled loop command', 'already running': 'cannot start loop, another task is running...' }.get(result, None) if message is not None: return await answer(ctx, f'error: {message}') if not result: break @commands.command( help='add a tutor', aliases=('tutor', 'hello') ) async def add(ctx: commands.Context, id=None): if not is_admin_channel(ctx): return if id is None: return await answer(ctx, 'error: expecting user id') try: id = int(id) except ValueError: return await answer(ctx, 'error: invalid user id') ctx.bot.add_tutor(id) await answer(ctx, f'added user id {id} to the tutor list') @commands.command( help='remove a tutor', aliases=('rm', 'del', 'delete', 'byebye', 'bb') ) async def remove(ctx: commands.Context, id=None): if not is_admin_channel(ctx): return if id is None: return await answer(ctx, 'error: expecting user id') try: id = int(id) except ValueError: return await answer(ctx, 'error: invalid user id') ctx.bot.remove_tutor(id) await answer(ctx, f'romved user id {id} from the tutor list') @commands.command( help='list all tutors', aliases=('tutors',) ) async def list(ctx: commands.Context): if not is_admin_channel(ctx): return if ctx.bot.tutors: users = [] for tutor in ctx.bot.tutors: try: user = ctx.bot.get_user(tutor) or await ctx.bot.fetch_user(tutor) users.append((str(user), tutor)) except Exception as e: users.append(('*unknown*', tutor)) await answer(ctx, '```\n' + '\n'.join(f' • {name} ({id})' for name, id in users) + '\n```') else: await answer(ctx, 'there is no tutor :/') @commands.command( help='get the scoreboard', aliases=('scores', 'score', 'points') ) async def scoreboard(ctx: commands.Context): await ctx.bot.print_scoreboard(ctx, ctx) @commands.command( help='notify the scoreboard', aliases=('notify',) ) async def notifyscoreboard(ctx: commands.Context): if is_admin_channel(ctx): await ctx.bot.print_scoreboard(ctx, ctx.bot.get_notify_channel(ctx, ctx.bot.get_meta_channel(ctx))) @commands.command( help='reset the scoreboard', aliases=(), ) async def reset(ctx: commands.Context): if is_admin_channel(ctx): ctx.bot.reset_scoreboard() await answer(ctx, 'resetted the scoreboard') @commands.command( help='move members from channel1 to channel2', aliases=('mvoe', 'omve', 'moev'), ) async def move(ctx: commands.Context, id1=0, id2=0): if not is_admin_channel(ctx): return if not id1 or not id2: return await answer(ctx, 'you must provide 2 ids') c = [] for i in (id1, id2): c.append(ctx.guild.get_channel(i) or await ctx.guild.fetch_channel(i)) c1, c2 = c futures = [] for member in await get_members(ctx.guild, c1): futures.append(member.move_to(c2)) await await_n(futures) await answer(ctx, 'moved everyone 😊') bot_commands = [cmd for cmd in locals().values() if isinstance(cmd, commands.Command)]