forked from swirlai/swirl-search
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex_email_elastic.py
93 lines (80 loc) · 3.15 KB
/
index_email_elastic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
'''
@author: Sid Probstein
@contact: sid@swirl.today
'''
import sys
import csv
import argparse
import os
from dateutil import parser
from elasticsearch import Elasticsearch
##################################################
def main(argv):
# arguments
parser = argparse.ArgumentParser(description="Bulk index email in CSV format to elasticsearch/opensearch")
parser.add_argument('filespec', help="path to the csv file to load")
parser.add_argument('-e', '--elasticsearch', help="the URL to elasticsearch", default='http://localhost:9200/')
parser.add_argument('-i', '--index', help="the index to receive the email messages", default='email')
parser.add_argument('-m', '--max', help="maximum number of rows to index", default=0)
parser.add_argument('-u', '--username', default='elastic', help="the elastic user, default 'elastic'")
parser.add_argument('-p', '--password', help="the password for the elastic user")
args = parser.parse_args()
if not os.path.exists(args.filespec):
print(f"Error: file not found: {args.filespec}")
return
csv.field_size_limit(sys.maxsize)
f = open(args.filespec, 'r')
csvr = csv.reader(f, quoting=csv.QUOTE_ALL)
es = Elasticsearch(http_auth=(args.username, args.password), hosts='')
print("Indexing...")
rows = 0
for row in csvr:
if rows == 0:
rows = 1
continue
email = {}
email['url'] = row[0]
# process and field the body row[1]
content = row[1]
# to do: this might be OS dependent, test on windows might need /r/n or different open incantation
list_content = content.strip().split('\n')
body = ""
flag = False
for field in list_content:
if flag:
body = body + field
continue
if field.startswith('Date:'):
s_date = field[field.find(':')+1:].strip()
dt = parser.parse(s_date)
email['date_published'] = dt.strftime('%Y-%m-%d %H:%M:%S.%f')
if field.startswith('Subject:'):
email['subject'] = field[field.find(':')+1:].strip()
if field.startswith('To:'):
email['to'] = field[field.find(':')+1:].strip()
if field.startswith('X-To:'):
# overwrite
email['to'] = field[field.find(':')+1:].strip()
if field.startswith('From:'):
email['author'] = field[field.find(':')+1:].strip()
if field.startswith('X-From:'):
# overwrite
email['author'] = field[field.find(':')+1:].strip()
if field == '':
# the next one is the body
flag = True
# end for
# create the content field
email['content'] = body
res = es.index(index=args.index, document=email)
rows = rows + 1
if rows % 100 == 0:
print(f"Indexed {rows} records so far...")
if int(args.max) > 0:
if rows > args.max:
break
# end for
#############################################
if __name__ == "__main__":
main(sys.argv)
# end