-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlauncher.py
executable file
·135 lines (106 loc) · 4.64 KB
/
launcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python3
__author__ = "Matteo Golinelli"
__copyright__ = "Copyright (C) 2023 Matteo Golinelli"
__license__ = "MIT"
from time import sleep
import subprocess
import traceback
import argparse
import logging
import random
import shlex
import json
import sys
import os
def main():
MAX = 5 # Max number of processes to run at once
crawler = 'nonce-nce.py'
# Tested sites
tested = []
parser = argparse.ArgumentParser(prog='launcher.py', description='Launcher')
parser.add_argument('-s', '--sites',
help='Sites list', required=True)
parser.add_argument('-m', '--max', default=MAX,
help=f'Maximum number of sites to test concurrently (default: {MAX})')
parser.add_argument('-a', '--arguments', default='--max 10 --domains 10 --reproducible',
help='Additional arguments to pass to the crawler (use with = sign: -a="--arg1 --arg2")')
parser.add_argument('-t', '--testall', default=False,
help='Test also already tested sites', action='store_true')
parser.add_argument('-c', '--crawler', default=crawler,
help='Alternative crawler script name to launch')
parser.add_argument('-d', '--debug', action='store_true',
help='Enable debug mode')
args = parser.parse_args()
if args.max:
MAX = int(args.max)
logging.basicConfig()
logger = logging.getLogger('launcher')
logger.setLevel(logging.INFO)
if args.debug:
logger.setLevel(logging.DEBUG)
# Retrieve already tested sites from tested.json file
if not args.testall and os.path.exists(f'logs/tested.json'):
with open(f'logs/tested.json', 'r') as f:
tested = json.load(f)
if len(tested) > 0:
random.shuffle(tested)
logger.info(f'Already tested sites ({len(tested)}): {", ".join(tested[:min(len(tested), 10)])}' +
f'... and {len(tested) - min(len(tested), 10)} more')
blacklist = ['google', 'facebook', 'amazon', 'twitter', '.gov', 'acm.com', 'jstor.org', 'arxiv']
sites = []
try:
with open(args.sites, 'r') as f:
sites = [s.strip() for s in f.readlines()]
random.shuffle(sites)
processes = {}
for site in sites:
if any(i in site for i in blacklist):
continue
try:
site = site.strip()
first = True # Execute the loop the first time regardless
# Loop until we have less than MAX processes running
while len(processes) >= MAX or first:
first = False
for s in processes.keys():
state = processes[s].poll()
if state is not None: # Process has finished
del processes[s]
logger.info(f'[{len(tested)}/{len(sites)} ({len(tested)/len(sites)*100:.2f}%)] {s} tested, exit-code: {state}.')
if state == 0:
tested.append(s)
with open(f'logs/tested.json', 'w') as f:
json.dump(tested, f)
break
sleep(1)
if site in tested and not args.testall:
continue
# When we have less than MAX processes running, launch a new one
if site != '' and site not in tested:
cmd = f'python3 {args.crawler} -s {site} {args.arguments}'
logger.info(f'Testing {site}')
try:
p = subprocess.Popen(shlex.split(cmd))
processes[site] = p
print('\t\t >>>', cmd)
except subprocess.TimeoutExpired as e:
logger.error(f'Timeout expired for {site}')
except subprocess.CalledProcessError as e:
logger.error(f'Could not test site {site}')
except Exception as e:
logger.error(f'Could not test site {site}')
traceback.print_exc()
except Exception as e:
logger.error(f'Error [{site}] {e}')
traceback.print_exc()
except KeyboardInterrupt:
logger.error('Keyboard interrupt')
except:
logger.error(traceback.format_exc())
finally:
logger.info(f'Tested sites ({len(tested)}): {", ".join(tested[:min(len(tested), 10)])}' +
f'... and {len(tested) - min(len(tested), 10)} more')
with open(f'logs/tested.json', 'w') as f:
json.dump(tested, f)
if __name__ == '__main__':
main()