forked from FRCDiscord/Dozer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_utils.py
executable file
·361 lines (298 loc) · 13.2 KB
/
_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
"""Utilities for Dozer."""
import asyncio
import inspect
import logging
import typing
from collections.abc import Mapping
from typing import Dict
import discord
from discord.ext import commands
from dozer import db
from dozer.context import DozerContext
__all__ = ['bot_has_permissions', 'command', 'group', 'Cog', 'Reactor', 'Paginator', 'paginate', 'chunk', 'dev_check',
'DynamicPrefixEntry']
DOZER_LOGGER = logging.getLogger("dozer")
class CommandMixin:
"""Example usage processing"""
# Keyword-arg dictionary passed to __init__ when copying/updating commands when Cog instances are created
# inherited from discord.ext.command.Command
__original_kwargs__: typing.Dict[str, typing.Any]
_required_permissions = None
def __init__(self, func, **kwargs):
super().__init__(func, **kwargs)
self.example_usage = kwargs.pop('example_usage', '')
if hasattr(func, '__required_permissions__'):
# This doesn't need to go into __original_kwargs__ because it'll be read from func each time
self._required_permissions = func.__required_permissions__
@property
def required_permissions(self):
"""Required permissions handler"""
if self._required_permissions is None:
self._required_permissions = discord.Permissions()
return self._required_permissions
@property
def example_usage(self):
"""Example usage property"""
return self._example_usage
@example_usage.setter
def example_usage(self, usage):
"""Sets example usage"""
self._example_usage = self.__original_kwargs__['example_usage'] = inspect.cleandoc(usage)
class Command(CommandMixin, commands.Command):
"""Represents a command"""
class Group(CommandMixin, commands.Group):
"""Class for command groups"""
def command(self, *args, **kwargs):
"""Initiates a command"""
kwargs.setdefault('cls', Command)
return super(Group, self).command(*args, **kwargs)
def group(self, *args, **kwargs):
"""Initiates a command group"""
kwargs.setdefault('cls', Group)
return super(Group, self).command(*args, **kwargs)
def command(**kwargs):
"""Represents bot commands"""
kwargs.setdefault('cls', Command)
return commands.command(**kwargs)
def group(**kwargs):
"""Links command groups"""
kwargs.setdefault('cls', Group)
return commands.group(**kwargs)
class Cog(commands.Cog):
"""Initiates cogs."""
def __init__(self, bot: commands.Bot):
super().__init__()
self.bot = bot
def dev_check():
"""Function decorator to check that the calling user is a developer"""
async def predicate(ctx: DozerContext):
if ctx.author.id not in ctx.bot.config['developers']:
raise commands.NotOwner('you are not a developer!')
return True
return commands.check(predicate)
class Reactor:
"""
A simple way to respond to Discord reactions.
Usage:
from ._utils import Reactor
# in a command
initial_reactions = [...] # Initial reactions (str or Emoji) to add
reactor = Reactor(ctx, initial_reactions) # Timeout is optional, and defaults to 1 minute
async for reaction in reactor:
# reaction is the str/Emoji that was added.
# reaction will not necessarily be in initial_reactions.
if reaction == this_emoji:
reactor.do(reactor.message.edit(content='This!')) # Any coroutine
elif reaction == that_emoji:
reactor.do(reactor.message.edit(content='That!'))
elif reaction == stop_emoji:
reactor.stop() # The next time around, the message will be deleted and the async-for will end
# If no action is set (e.g. unknown emoji), nothing happens
"""
_stop_reaction = object()
def __init__(self, ctx: DozerContext, initial_reactions, *, auto_remove: bool = True, timeout: int = 60):
"""
ctx: command context
initial_reactions: iterable of emoji to react with on start
auto_remove: if True, reactions are removed once processed
timeout: time, in seconds, to wait before stopping automatically. Set to None to wait forever.
"""
self.dest = ctx.channel
self.bot = ctx.bot
self.caller = ctx.author
self.me = ctx.guild.get_member(self.bot.user.id)
self._reactions = tuple(initial_reactions)
self._remove_reactions = auto_remove and ctx.channel.permissions_for(
self.me).manage_messages # Check for required permissions
self.timeout = timeout
self._action = None
self.message = None
async def __aiter__(self):
self.message = await self.dest.send(embed=self.pages[self.page])
for emoji in self._reactions:
await self.message.add_reaction(emoji)
while True:
try:
reaction, reacting_member = await self.bot.wait_for('reaction_add', check=self._check_reaction,
timeout=self.timeout)
except asyncio.TimeoutError:
break
yield reaction.emoji
# Caller calls methods to set self._action; end of async for block, control resumes here
if self._remove_reactions:
await self.message.remove_reaction(reaction.emoji, reacting_member)
if self._action is self._stop_reaction:
break
elif self._action is None:
pass
else:
await self._action
for emoji in reversed(self._reactions):
await self.message.remove_reaction(emoji, self.me)
def do(self, action):
"""If there's an action reaction, do the action."""
self._action = action
def stop(self):
"""Listener for stop reactions."""
self._action = self._stop_reaction
def _check_reaction(self, reaction: discord.Reaction, member: discord.Member):
return reaction.message.id == self.message.id and member.id == self.caller.id
class Paginator(Reactor):
"""
Extends functionality of Reactor for pagination.
Left- and right- arrow reactions are used to move between pages.
:stop: will stop the pagination.
Other reactions are given to the caller like normal.
Usage:
from ._utils import Reactor
# in a command
initial_reactions = [...] # Initial reactions (str or Emoji) to add (in addition to normal pagination reactions)
pages = [...] # Embeds to use for each page
paginator = Paginator(ctx, initial_reactions, pages)
async for reaction in paginator:
# See Reactor for how to handle reactions
# Paginator reactions will not be yielded here - only unknowns
"""
pagination_reactions = (
'\N{BLACK LEFT-POINTING DOUBLE TRIANGLE WITH VERTICAL BAR}', # :track_previous:
'\N{BLACK LEFT-POINTING TRIANGLE}', # :arrow_backward:
'\N{BLACK RIGHT-POINTING TRIANGLE}', # :arrow_forward:
'\N{BLACK RIGHT-POINTING DOUBLE TRIANGLE WITH VERTICAL BAR}', # :track_next:
'\N{BLACK SQUARE FOR STOP}' # :stop_button:
)
def __init__(self, ctx: DozerContext, initial_reactions, pages, *, start: int = 0, auto_remove: bool = True,
timeout: int = 60):
all_reactions = list(initial_reactions)
ind = all_reactions.index(Ellipsis)
all_reactions[ind:ind + 1] = self.pagination_reactions
super().__init__(ctx, all_reactions, auto_remove=auto_remove, timeout=timeout)
if pages and isinstance(pages[-1], Mapping):
named_pages = pages.pop()
self.pages = dict(enumerate(pages), **named_pages)
else:
self.pages = pages
self.len_pages = len(pages)
self.page = start
self.message = None
self.reactor = None
async def __aiter__(self):
self.reactor = super().__aiter__()
async for reaction in self.reactor:
try:
ind = self.pagination_reactions.index(reaction)
except ValueError: # Not in list - send to caller
yield reaction
else:
if ind == 0:
self.go_to_page(0)
elif ind == 1:
self.prev() # pylint: disable=not-callable
elif ind == 2:
self.next() # pylint: disable=not-callable
elif ind == 3:
self.go_to_page(-1)
else: # Only valid option left is 4
self.stop()
def go_to_page(self, page: int):
"""Goes to a specific help page"""
if isinstance(page, int):
page = page % self.len_pages
if page < 0:
page += self.len_pages
self.page = page
if self.message is not None:
self.do(self.message.edit(embed=self.pages[self.page]))
def next(self, amt: int = 1):
"""Goes to the next help page"""
if isinstance(self.page, int):
self.go_to_page(self.page + amt)
else:
self.go_to_page(amt - 1)
def prev(self, amt: int = 1):
"""Goes to the previous help page"""
if isinstance(self.page, int):
self.go_to_page(self.page - amt)
else:
self.go_to_page(-amt)
async def paginate(ctx: DozerContext, pages, *, start: int = 0, auto_remove: bool = True, timeout: int = 60):
"""
Simple pagination based on Paginator. Pagination is handled normally and other reactions are ignored.
"""
paginator = Paginator(ctx, ..., pages, start=start, auto_remove=auto_remove, timeout=timeout)
async for reaction in paginator:
pass # The normal pagination reactions are handled - just drop anything else
def chunk(iterable, size: int):
"""
Break an iterable into chunks of a fixed size. Returns an iterable of iterables.
Almost-inverse of itertools.chain.from_iterable - passing the output of this into that function will reconstruct the original iterable.
If the last chunk is not the full length, it will be returned but not padded.
"""
contents = list(iterable)
for i in range(0, len(contents), size):
yield contents[i:i + size]
def bot_has_permissions(**required):
"""Decorator to check if bot has certain permissions when added to a command"""
def predicate(ctx: DozerContext):
"""Function to tell the bot if it has the right permissions"""
given = ctx.channel.permissions_for((ctx.guild or ctx.channel).me)
missing = [name for name, value in required.items() if getattr(given, name) != value]
if missing:
raise commands.BotMissingPermissions(missing)
else:
return True
def decorator(func):
"""Defines the bot_has_permissions decorator"""
if isinstance(func, Command):
func.checks.append(predicate)
func.required_permissions.update(**required)
else:
if hasattr(func, '__commands_checks__'):
func.__commands_checks__.append(predicate)
else:
func.__commands_checks__ = [predicate]
func.__required_permissions__ = discord.Permissions()
func.__required_permissions__.update(**required)
return func
return decorator
class PrefixHandler:
"""Handles dynamic prefixes"""
def __init__(self, default_prefix: str):
self.default_prefix = default_prefix
self.prefix_cache: Dict[int, DynamicPrefixEntry] = {}
def handler(self, bot, message: discord.Message):
"""Process the dynamic prefix for each message"""
dynamic = self.prefix_cache.get(message.guild.id) if message.guild else self.default_prefix
# <@!> is a nickname mention which discord.py doesn't make by default
return [f"<@!{bot.user.id}> ", bot.user.mention, dynamic if dynamic else self.default_prefix]
async def refresh(self):
"""Refreshes the prefix cache"""
prefixes = await DynamicPrefixEntry.get_by() # no filters, get all
for prefix in prefixes:
self.prefix_cache[prefix.guild_id] = prefix.prefix
DOZER_LOGGER.info(f"{len(prefixes)} prefixes loaded from database")
class DynamicPrefixEntry(db.DatabaseTable):
"""Holds the custom prefixes for guilds"""
__tablename__ = 'dynamic_prefixes'
__uniques__ = ['guild_id']
@classmethod
async def initial_create(cls):
"""Create the table in the database"""
async with db.Pool.acquire() as conn:
await conn.execute(f"""
CREATE TABLE {cls.__tablename__} (
guild_id bigint NOT NULL,
prefix text NOT NULL,
PRIMARY KEY (guild_id)
)""")
def __init__(self, guild_id: int, prefix: str):
super().__init__()
self.guild_id = guild_id
self.prefix = prefix
@classmethod
async def get_by(cls, **kwargs):
results = await super().get_by(**kwargs)
result_list = []
for result in results:
obj = DynamicPrefixEntry(guild_id=result.get("guild_id"), prefix=result.get("prefix"))
result_list.append(obj)
return result_list