-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdiscordbot.py
194 lines (163 loc) · 7.98 KB
/
discordbot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
from disco.bot import Plugin
from disco.types.message import MessageEmbed
from disco.util.sanitize import S as sanitize
import io
import magic
import re
import requests
import zipfile
SIZE_LIMIT_MB = 64 * 1024 * 1024
ALLOWED_CHANNELS = ["dev", "help", "secret", "bot_spam"]
COLOR_RED = 0x992D22
COLOR_BLUE = 0x22992D
class XeniaBot(Plugin):
def parse_log_file(self, file_name, file):
"""
Parses a log file, and returns a Discord MessageEmbed describing it.
"""
embed = MessageEmbed()
embed.title = "**{}**\n".format(sanitize(file_name,
escape_codeblocks=True))
embed.description = ''
embed.color = COLOR_BLUE
build_info = {}
message_levels = {
'w': [],
'!': [],
}
seen = set()
lines = 0
for line in file:
# Process up to 500,000 lines
if lines > 500000:
break
# Decode the line if it needs it.
try:
line = line.decode('utf-8')
except AttributeError:
pass
sanitized_line = sanitize(line, escape_codeblocks=True).replace('\r\n', '').replace('\r', '')
if 'date' not in build_info:
# Scan for build info
res = re.search(
# Log prefix (with thread number)
r'^i> (?:[0-9a-fA-f]{8}) Build: '
# Optional PR repo, branch, commit
r'(?:PR#(?P<pr_number>\d+) (?P<pr_repo>.*) (?P<pr_branch>.*)(?:@)(?P<pr_commit>[0-9a-fA-F]{40}|[0-9a-fA-F]{9}) against )?'
# Branch, commit, date
r'(?P<branch>.*)(?: / |@)(?P<commit>[0-9a-fA-F]{40}|[0-9a-fA-F]{9}) on (?P<date>.*)'
# End
r'$',
sanitized_line)
if res:
build_info.update({
"branch": res.group('branch'),
"commit": res.group('commit'),
"date": res.group('date'),
})
if res.group('pr_number') is not None:
build_info.update({
"pr_number": res.group('pr_number'),
"pr_repo": res.group('pr_repo'),
"pr_branch": res.group('pr_branch'),
"pr_commit": res.group('pr_commit'),
})
# See if we can find a game ID.
if 'title_id' not in build_info:
res = re.search(r'^\s*Title ID: (?P<title_id>[0-9a-fA-F]{8})$', sanitized_line)
if res:
build_info.update({"title_id": res.group('title_id')})
if len(sanitized_line) > 1 and (sanitized_line[0] in message_levels):
if sanitized_line not in seen:
seen.add(sanitized_line)
message_levels[sanitized_line[0]].append(sanitized_line)
lines += 1
if 'date' not in build_info:
embed.color = COLOR_RED
embed.description = "\t**Invalid file**. Could not find build information - is this a Xenia logfile?"
return embed
# Setup the description
if 'pr_number' in build_info and 'pr_repo' in build_info and 'pr_branch' in build_info and 'pr_commit' in build_info:
embed.description += "PR# `{pr_number}`\nPR Repository: `{pr_repo}`\nPR Branch: `{pr_branch}`\nPR Commit: `{pr_commit}`\n".format(
**build_info)
if 'branch' in build_info and 'date' in build_info and 'commit' in build_info:
embed.description += "Branch: `{branch}`\nDate: {date}\nCommit: `{commit}`\n".format(
**build_info)
if 'title_id' in build_info:
embed.description += "Title ID: `{title_id}`".format(**build_info)
# Errors
if len(message_levels['!']) > 0:
errors = "```\n"
for line in message_levels['!']:
if len(errors) + len(line) > 997:
errors += '...'
break
errors += "{}\n".format(line)
errors += "```\n"
embed.add_field(name="Errors", value=errors)
# Warnings
if len(message_levels["w"]) > 0:
warnings = "```\n"
for line in message_levels['w']:
if len(warnings) + len(line) > 997:
warnings += '...'
break
warnings += "{}\n".format(line)
warnings += "```\n"
embed.add_field(name="Warnings", value=warnings)
return embed
@Plugin.command('a')
@Plugin.command('analyze')
def on_analyze_command(self, event):
if event.channel.name not in ALLOWED_CHANNELS:
event.msg.reply(
"{}, please run this command in #help or #bot_spam.".format(event.author.mention))
return
if len(event.msg.attachments) < 1:
event.msg.reply("{}, usage: Attach Xenia's logfile with your message (preferably compressed in a .zip).".format(
event.author.mention))
return
# Fire off a typing event.
self.client.api.channels_typing(event.channel.id)
for _, attach in event.msg.attachments.items():
s_file_name = sanitize(attach.filename, escape_codeblocks=True)
if attach.size > SIZE_LIMIT_MB:
event.msg.reply(event.author.mention, embed=MessageEmbed(title=s_file_name, color=COLOR_RED,
description="**File above size limit, not analyzed**. Did you compress it?"))
continue
r = requests.get(attach.url)
if r.status_code != 200:
event.msg.reply(event.author.mention, embed=MessageEmbed(title=s_file_name, color=COLOR_RED,
description="**Failed to fetch file from Discord**, status code {}".format(r.status_code)))
continue
mime = magic.from_buffer(r.content, mime=True)
if mime == 'text/plain':
# Plaintext, straight to the parser!
event.msg.reply(event.author.mention, embed=self.parse_log_file(
attach.filename, io.StringIO(r.text)))
elif mime == 'application/zip':
z = zipfile.ZipFile(io.BytesIO(r.content))
if len(z.namelist()) != 1:
event.msg.reply(event.author.mention, embed=MessageEmbed(
title=s_file_name, color=COLOR_RED, description="**Archives must contain only a single file**."))
continue
# Parse every file in the zip file.
for name in z.namelist():
# Check the guessed type as well. No voodoo embedding zip files inside one another.
mime = magic.from_buffer(z.open(name).read(1024), mime=True)
if mime != 'text/plain':
event.msg.reply(event.author.mention, embed=MessageEmbed(
title=s_file_name, color=COLOR_RED, description="**Contents not plaintext, ignored**."))
continue
event.msg.reply(event.author.mention, embed=self.parse_log_file(
name, z.open(name)))
z.close()
else:
event.msg.reply(event.author.mention, embed=MessageEmbed(
title=s_file_name, color=COLOR_RED, description="**Unsupported file type, not analyzed**."))
continue
@Plugin.command('help')
def on_help_command(self, event):
message = '{}, commands available:\n'.format(event.author.mention)
message += '\tanalyze (a): Analyze an attached logfile (zipped or uncompressed). Must be < 64MB, and must be ran in #help or #bot_spam.\n'
event.msg.reply(message)