forked from nh-server/Kurisu
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase.py
226 lines (185 loc) · 9.57 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
from datetime import datetime
import time
from discord.ext import commands
from discord import utils
import aiosqlite3
import os
class ConnectionHolder:
def __init__(self):
self.dbcon = None
async def load_db(self, database_name, loop):
if not os.path.isfile(database_name):
print(f'Creating database {database_name}')
with open('schema.sql', 'r', encoding='utf-8') as f:
schema = f.read()
self.dbcon = await aiosqlite3.connect(database_name, loop=loop)
await self.dbcon.executescript(schema)
await self.dbcon.commit()
print(f'{database_name} initialized')
else:
self.dbcon = await aiosqlite3.connect(database_name, loop=loop)
print(f'Loaded {database_name}')
async def __aenter__(self):
self.dbcon.__enter__()
cursor = await self.dbcon.cursor()
return cursor
async def __aexit__(self, exc_class, exc, traceback):
self.dbcon.__exit__(exc_class, exc, traceback)
if self.dbcon.in_transaction:
await self.dbcon.commit()
class DatabaseCog(commands.Cog):
"""
Base class for cogs that interact with the database.
"""
def __init__(self, bot):
self.bot = bot
async def add_restriction(self, user_id, role):
async with self.bot.holder as cur:
await cur.execute('SELECT user_id FROM permanent_roles WHERE user_id=? AND role_id=?', (user_id, role.id))
if not await cur.fetchone():
await cur.execute('INSERT INTO permanent_roles VALUES(?, ?)', (user_id, role.id))
return True
return False
async def remove_restriction(self, user_id, role):
async with self.bot.holder as cur:
await cur.execute('SELECT user_id FROM permanent_roles WHERE user_id=? AND role_id=?', (user_id, role.id))
if await cur.fetchone():
await cur.execute('DELETE FROM permanent_roles WHERE user_id=? AND role_id=?', (user_id, role.id))
return True
async def get_restrictions_roles_id(self, user_id):
async with self.bot.holder as cur:
await cur.execute('SELECT role_id FROM permanent_roles WHERE user_id=?', (user_id,))
rows = await cur.fetchall()
if rows:
return [x[0] for x in rows]
return []
async def add_staff(self, user_id, position):
async with self.bot.holder as cur:
if await self.get_stafftrole(user_id) is None:
await cur.execute('INSERT INTO staff VALUES(?, ?)', (user_id, position))
else:
await cur.execute('UPDATE staff SET position=? WHERE user_id=? ', (position, user_id))
async def remove_staff(self, user_id):
async with self.bot.holder as cur:
await cur.execute('DELETE FROM staff WHERE user_id=?', (user_id,))
async def get_staff(self):
async with self.bot.holder as cur:
await cur.execute('SELECT user_id FROM staff')
rows = await cur.fetchall()
if rows:
return [x[0] for x in rows]
return []
async def get_staff_role(self):
async with self.bot.holder as cur:
await cur.execute('SELECT * FROM staff')
rows = await cur.fetchall()
return rows
async def get_helpers(self):
async with self.bot.holder as cur:
await cur.execute('SELECT user_id FROM helpers')
rows = await cur.fetchall()
if rows:
return [x[0] for x in rows]
return []
async def get_helpers_role(self):
async with self.bot.holder as cur:
await cur.execute('SELECT * FROM helpers')
rows = await cur.fetchall()
return rows
async def add_helper(self, user_id, console):
async with self.bot.holder as cur:
if await self.get_console(user_id) is None:
await cur.execute('INSERT INTO helpers VALUES(?, ?)', (user_id, console))
else:
await cur.execute('UPDATE helpers SET console=? WHERE user_id=?', (console, user_id))
async def remove_helper(self, user_id):
async with self.bot.holder as cur:
await cur.execute('DELETE FROM helpers WHERE user_id=?', (user_id,))
async def get_console(self, user_id):
async with self.bot.holder as cur:
await cur.execute('SELECT console FROM helpers WHERE user_id=?', (user_id,))
row = await cur.fetchone()
return row[0] if row is not None else row
async def get_stafftrole(self, user_id):
async with self.bot.holder as cur:
await cur.execute('SELECT position FROM staff WHERE user_id=?', (user_id,))
rank = await cur.fetchone()
return rank[0] if rank is not None else rank
async def add_warn(self, user_id, issuer_id, reason):
async with self.bot.holder as cur:
snowflake = utils.time_snowflake(datetime.now())
await cur.execute('INSERT INTO warns VALUES(?, ?, ?, ?)', (snowflake, user_id, issuer_id, reason))
async def remove_warn_id(self, user_id, index):
# i dont feel so good
async with self.bot.holder as cur:
await cur.execute('DELETE FROM warns WHERE warn_id in (SELECT warn_id FROM warns WHERE user_id=? LIMIT ?,1)', (user_id, index-1))
async def remove_warns(self, user_id):
async with self.bot.holder as cur:
await cur.execute('DELETE FROM warns WHERE user_id=?', (user_id,))
async def get_warns(self, user_id):
async with self.bot.holder as cur:
await cur.execute('SELECT * FROM warns WHERE user_id=?', (user_id,))
return await cur.fetchall()
async def add_timed_restriction(self, user_id, end_date, type):
async with self.bot.holder as cur:
await cur.execute('SELECT timestamp FROM timed_restrictions WHERE user_id=? AND type=?', (user_id, type))
if (timestamp := await cur.fetchone()) is not None:
await cur.execute('UPDATE timed_restrictions SET timestamp=?, alert=0 WHERE user_id=? AND type=?', (end_date, user_id, type))
return timestamp[0]
else:
await cur.execute('INSERT INTO timed_restrictions VALUES(?, ?, ?, ?)', (user_id, end_date, type, 0))
async def remove_timed_restriction(self, user_id, type):
async with self.bot.holder as cur:
await cur.execute('DELETE FROM timed_restrictions WHERE user_id=? AND type=?', (user_id, type))
async def get_time_restrictions_by_user_type(self, userid, type):
async with self.bot.holder as cur:
await cur.execute('SELECT * from timed_restrictions WHERE user_id=? AND type=?', (userid,type))
return await cur.fetchone()
async def get_time_restrictions_by_type(self, type):
async with self.bot.holder as cur:
await cur.execute('SELECT * from timed_restrictions WHERE type=?', (type,))
return await cur.fetchall()
async def set_time_restriction_alert(self, user_id, type):
async with self.bot.holder as cur:
await cur.execute('UPDATE timed_restrictions SET alert=1 WHERE user_id=? AND type=?', (user_id, type))
async def add_softban(self, user_id, issuer_id, reason, timestamp=None):
if not timestamp:
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
async with self.bot.holder as cur:
await cur.execute('INSERT INTO softbans VALUES(?, ? , ?, ?)', (user_id, issuer_id, reason, timestamp))
async def remove_softban(self, user_id):
async with self.bot.holder as cur:
await cur.execute('DELETE FROM softbans WHERE user_id = ?', (user_id,))
async def get_softban(self, user_id):
async with self.bot.holder as cur:
await cur.execute('SELECT * FROM softbans WHERE user_id=?', (user_id,))
return await cur.fetchone()
async def add_watch(self, user_id):
async with self.bot.holder as cur:
await cur.execute('INSERT INTO watchlist VALUES(?)', (user_id,))
async def remove_watch(self, user_id):
async with self.bot.holder as cur:
await cur.execute('DELETE FROM watchlist WHERE user_id=?', (user_id,))
async def is_watched(self, user_id):
async with self.bot.holder as cur:
await cur.execute('SELECT user_id FROM watchlist WHERE user_id=?', (user_id,))
return await cur.fetchone() is not None
async def add_nofilter(self, channel):
async with self.bot.holder as cur:
await cur.execute('INSERT INTO nofilter VALUES(?)', (channel.id,))
async def remove_nofilter(self, channel):
async with self.bot.holder as cur:
await cur.execute('DELETE FROM nofilter WHERE channel_id=?', (channel.id,))
async def check_nofilter(self, channel):
async with self.bot.holder as cur:
await cur.execute('SELECT channel_id FROM nofilter WHERE channel_id=?', (channel.id,))
return await cur.fetchone() is not None
async def add_friendcode(self, user_id, fc):
async with self.bot.holder as cur:
await cur.execute('INSERT INTO friend_codes VALUES (?,?)', (user_id, fc))
async def get_friendcode(self, user_id):
async with self.bot.holder as cur:
return await cur.execute('SELECT * FROM friend_codes WHERE user_id = ?', (user_id,))
async def delete_friendcode(self, user_id):
async with self.bot.holder as cur:
await cur.execute('DELETE FROM friend_codes WHERE user_id = ?', (user_id,))