Skip to content

Commit 326c5f4

Browse files
committed
make scanning process multithreaded
1 parent d91c972 commit 326c5f4

25 files changed

+416
-130
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
from glouton.commands.module.moduleCommand import ModuleCommand
22

3+
34
class ObservationModuleCommand(ModuleCommand):
45
def __init__(self, params):
56
ModuleCommand.__init__(self, params)
67

78
def process(self):
8-
self.params.module.runAfterDownload(self.params.file_name, self.params.full_path, self.params.observation)
9+
self.params.module.runAfterDownload(
10+
self.params.file_name, self.params.full_path, self.params.observation)

glouton/commands/module/observationModuleCommandParams.py

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from glouton.commands.module.moduleCommandParams import ModuleCommandParams
22

3+
34
class ObservationModuleCommandParams(ModuleCommandParams):
45
def __init__(self, file_name=None, full_path=None, observation=None, module=None):
56
ModuleCommandParams.__init__(self, file_name, full_path, module)
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
from glouton.commands.module.moduleCommand import ModuleCommand
22

3+
34
class TelemetryModuleCommand(ModuleCommand):
45
def __init__(self, params):
56
ModuleCommand.__init__(self, params)
67

78
def process(self):
8-
self.params.module.runAfterDownload(self.params.file_name, self.params.full_path, self.params.telemetry)
9+
self.params.module.runAfterDownload(
10+
self.params.file_name, self.params.full_path, self.params.telemetry)

glouton/commands/module/telemetryModuleCommandParams.py

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from glouton.commands.module.moduleCommandParams import ModuleCommandParams
22

3+
34
class TelemetryModuleCommandParams(ModuleCommandParams):
45
def __init__(self, file_name=None, full_path=None, telemetry=None, module=None):
56
ModuleCommandParams.__init__(self, file_name, full_path, module)

glouton/infrastructure/satnogClient.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
class SatnogClient:
77
def __init__(self):
8-
self.config = config.read()
8+
self.config = config.read()
99
self.proxies = self._set_proxy()
1010

1111
def _set_proxy(self):

glouton/infrastructure/satnogDbClient.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ def get(self, url, params=None):
1212
return requests.get(url, params=params, proxies=self.proxies)
1313

1414
def get_from_base(self, url, params=None):
15-
return requests.get(self._url + url, params=params, proxies=self.proxies)
15+
return requests.get(self._url + url, params=params, proxies=self.proxies)

glouton/infrastructure/satnogNetworkClient.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ def get(self, url, params=None):
1212
return requests.get(url, params=params, proxies=self.proxies)
1313

1414
def get_from_base(self, url, params=None):
15-
return requests.get(self._url + url, params=params, proxies=self.proxies)
15+
return requests.get(self._url + url, params=params, proxies=self.proxies)

glouton/repositories/demoddata/demoddataRepo.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from glouton.shared import threadHelper
99
from threading import Event
1010

11+
1112
class DemoddataRepo(Downloadable):
1213
def __init__(self, working_dir, modules):
1314
self.__working_dir = working_dir
@@ -25,13 +26,15 @@ def register_command(self, observation, start_date, end_date):
2526

2627
def create_worker(self):
2728
threads = []
28-
downloadWorker = DownloadWorker(self.__demoddata_commands, self.__download_status)
29+
downloadWorker = DownloadWorker(
30+
self.__demoddata_commands, self.__download_status)
2931
threads.append(threadHelper.create_thread(downloadWorker.execute))
3032
if self.__modules is not None:
31-
moduleWorker = ModuleWorker(self.__demoddate_modules_commands, self.__download_status)
33+
moduleWorker = ModuleWorker(
34+
self.__demoddate_modules_commands, self.__download_status)
3235
threads.append(threadHelper.create_thread(moduleWorker.execute))
3336

34-
return threads
37+
return threads
3538

3639
def __create_dir_name(self, target, start_date, end_date):
3740
return target + '__' + start_date.strftime('%m-%d-%YT%H-%M-%S') + '__' + end_date.strftime('%m-%d-%YT%H-%M-%S')
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,60 @@
11
from queue import Queue
2-
from glouton.infrastructure.satnogNetworkClient import SatnogNetworkClient
32
from glouton.shared.logger import logger
3+
from glouton.shared import dateHelper
4+
from glouton.workers.pageScanWorker import PageScanWorker
5+
from glouton.shared import threadHelper
6+
from glouton.infrastructure.satnogNetworkClient import SatnogNetworkClient
47

58

69
class ObservationRepo:
710
def __init__(self, cmd, repos):
811
self.OBSERVATION_URL = 'observations/'
9-
self.__client = SatnogNetworkClient()
1012
self.__repos = repos
1113
self.__cmd = cmd
1214
self.__threads = []
1315

1416
def extract(self):
15-
params = self.__create_request_params()
16-
page = 1
17-
while True:
18-
r = self.__client.get_from_base(
19-
self.OBSERVATION_URL, params)
20-
if r.status_code != 200:
21-
break
22-
23-
logger.Info('scanning page...' + params['page'])
24-
self.__read_page(r.json(), self.__cmd.start_date, self.__cmd.end_date)
25-
page += 1
26-
params['page'] = str(page)
27-
17+
client = SatnogNetworkClient()
18+
diff_days = dateHelper.diff_days(
19+
self.__cmd.start_date, self.__cmd.end_date)
20+
if diff_days < 8:
21+
# no thread needed
22+
url_params = self.__url_param_builder(
23+
self.__cmd.start_date, self.__cmd.end_date)
24+
pageScanner = PageScanWorker(
25+
client, self.__cmd, self.__repos, self.OBSERVATION_URL, url_params, 1)
26+
pageScanner.scan()
27+
else:
28+
threads = []
29+
job = 1
30+
for from_datetime, to_datetime in dateHelper.split_date(self.__cmd.start_date, self.__cmd.end_date, 4):
31+
print(str(from_datetime) + " " + str(to_datetime))
32+
url_params = self.__url_param_builder(
33+
from_datetime, to_datetime)
34+
pageScanner = PageScanWorker(
35+
client, self.__cmd, self.__repos, self.OBSERVATION_URL, url_params, job)
36+
t = threadHelper.create_thread(pageScanner.scan)
37+
threads.append(t)
38+
job += 1
39+
40+
threadHelper.wait(threads)
2841
print('\ndownloading started (Ctrl + C to stop)...\t~( ^o^)~')
2942
self.__create_workers_and_wait()
3043

3144
def __create_workers_and_wait(self):
3245
for repo in self.__repos:
3346
self.__threads.extend(repo.create_worker())
34-
while self.__is_one_thread_alive():
35-
for t in self.__threads:
36-
# let's control to main thread every seconds (in order to be able to capture Ctrl + C if needed)
37-
t.join(1)
38-
39-
def __is_one_thread_alive(self):
40-
for thread in self.__threads:
41-
if thread.is_alive():
42-
return True
43-
44-
return False
45-
46-
def __read_page(self, observations, start_date, end_date):
47-
for observation in observations:
48-
for repo in self.__repos:
49-
repo.register_command(
50-
observation, start_date, end_date)
47+
threadHelper.wait(self.__threads)
5148

52-
def __create_request_params(self):
49+
def __url_param_builder(self, start_date, end_date):
5350
return {'satellite__norad_cat_id': self.__cmd.norad_id,
54-
'ground_station': self.__cmd.ground_station_id,
55-
'start': self.__cmd.start_date.isoformat(),
56-
'end': self.__cmd.end_date.isoformat(),
57-
'vetted_status': self.__cmd.observation_status,
58-
'vetted_user': self.__cmd.user,
59-
'transmitter_uuid': self.__cmd.transmitter_uuid,
60-
'transmitter_mode': self.__cmd.transmitter_mode,
61-
'transmitter_type': self.__cmd.transmitter_type,
62-
'page': '1',
63-
'format': 'json'}
51+
'ground_station': self.__cmd.ground_station_id,
52+
'start': start_date.isoformat(),
53+
'end': end_date.isoformat(),
54+
'vetted_status': self.__cmd.observation_status,
55+
'vetted_user': self.__cmd.user,
56+
'transmitter_uuid': self.__cmd.transmitter_uuid,
57+
'transmitter_mode': self.__cmd.transmitter_mode,
58+
'transmitter_type': self.__cmd.transmitter_type,
59+
'page': '1',
60+
'format': 'json'}

glouton/repositories/payload/payloadRepo.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from glouton.shared import threadHelper
99
from threading import Event
1010

11+
1112
class PayloadRepo(Downloadable):
1213
def __init__(self, working_dir, modules):
1314
self.__working_dir = working_dir
@@ -25,10 +26,12 @@ def register_command(self, observation, start_date, end_date):
2526

2627
def create_worker(self):
2728
threads = []
28-
downloadWorker = DownloadWorker(self.__payload_commands, self.__download_status)
29+
downloadWorker = DownloadWorker(
30+
self.__payload_commands, self.__download_status)
2931
threads.append(threadHelper.create_thread(downloadWorker.execute))
3032
if self.__modules is not None:
31-
moduleWorker = ModuleWorker(self.__payload_modules_commands, self.__download_status)
33+
moduleWorker = ModuleWorker(
34+
self.__payload_modules_commands, self.__download_status)
3235
threads.append(threadHelper.create_thread(moduleWorker.execute))
3336

3437
return threads

glouton/repositories/telemetry/telemetryRepo.py

+31-36
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,55 @@
11
from queue import Queue
2+
from glouton.shared import dateHelper
3+
from glouton.workers.pageScanWorker import PageScanWorker
24
from glouton.infrastructure.satnogDbClient import SatnogDbClient
5+
from glouton.shared import threadHelper
36
from glouton.shared.logger import logger
47

58

69
class TelemetryRepo:
710
def __init__(self, cmd, repos):
811
self.TELEMETRY_URL = 'telemetry/'
9-
self.__client = SatnogDbClient()
1012
self.__repos = repos
1113
self.__cmd = cmd
1214
self.__threads = []
1315

1416
def extract(self):
15-
params = self.__create_request_params()
16-
page = 1
17-
while True:
18-
r = self.__client.get_from_base(
19-
self.TELEMETRY_URL, params)
20-
if r.status_code != 200:
21-
break
22-
23-
logger.Info('scanning page...' + params['page'])
24-
self.__read_page(r.json(), self.__cmd.start_date,
25-
self.__cmd.end_date)
26-
page += 1
27-
28-
params['page'] = str(page)
29-
17+
client = SatnogDbClient()
18+
diff_days = dateHelper.diff_days(
19+
self.__cmd.start_date, self.__cmd.end_date)
20+
if diff_days < 8:
21+
# no thread needed
22+
url_params = self.__url_param_builder(
23+
self.__cmd.start_date, self.__cmd.end_date)
24+
pageScanner = PageScanWorker(
25+
client, self.__cmd, self.__repos, self.TELEMETRY_URL, url_params, 1)
26+
pageScanner.scan()
27+
else:
28+
threads = []
29+
job = 1
30+
for from_datetime, to_datetime in dateHelper.split_date(self.__cmd.start_date, self.__cmd.end_date, 4):
31+
print(str(from_datetime) + " " + str(to_datetime))
32+
url_params = self.__url_param_builder(
33+
from_datetime, to_datetime)
34+
pageScanner = PageScanWorker(
35+
client, self.__cmd, self.__repos, self.TELEMETRY_URL, url_params, job)
36+
t = threadHelper.create_thread(pageScanner.scan)
37+
threads.append(t)
38+
job += 1
39+
40+
threadHelper.wait(threads)
3041
print('\ndownloading started (Ctrl + C to stop)...\t~( ^o^)~')
3142
self.__create_workers_and_wait()
3243

3344
def __create_workers_and_wait(self):
3445
for repo in self.__repos:
3546
self.__threads.extend(repo.create_worker())
36-
while self.__is_one_thread_alive():
37-
for t in self.__threads:
38-
# let's control to main thread every seconds (in order to be able to capture Ctrl + C if needed)
39-
t.join(1)
40-
41-
def __is_one_thread_alive(self):
42-
for thread in self.__threads:
43-
if thread.is_alive():
44-
return True
45-
46-
return False
47-
48-
def __read_page(self, telemetries, start_date, end_date):
49-
for telemetry in telemetries:
50-
for repo in self.__repos:
51-
repo.register_command(
52-
telemetry, start_date, end_date)
47+
threadHelper.wait(self.__threads)
5348

54-
def __create_request_params(self):
49+
def __url_param_builder(self, start_date, end_date):
5550
return {'satellite': self.__cmd.norad_id,
56-
'start': self.__cmd.start_date.isoformat(),
57-
'end': self.__cmd.end_date.isoformat(),
51+
'start': start_date.isoformat(),
52+
'end': end_date.isoformat(),
5853
'observer': self.__cmd.observer,
5954
'transmitter': self.__cmd.transmitter,
6055
'app_source': self.__cmd.app_source,

glouton/repositories/waterfall/waterfallRepo.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from glouton.shared import threadHelper
99
from threading import Event
1010

11+
1112
class WaterfallRepo(Downloadable):
1213
def __init__(self, working_dir, modules):
1314
self.__working_dir = working_dir
@@ -25,10 +26,12 @@ def register_command(self, observation, start_date, end_date):
2526

2627
def create_worker(self):
2728
threads = []
28-
downloadWorker = DownloadWorker(self.__waterfall_commands, self.__download_status)
29+
downloadWorker = DownloadWorker(
30+
self.__waterfall_commands, self.__download_status)
2931
threads.append(threadHelper.create_thread(downloadWorker.execute))
3032
if self.__modules is not None:
31-
moduleWorker = ModuleWorker(self.__waterfall_modules_commands, self.__download_status)
33+
moduleWorker = ModuleWorker(
34+
self.__waterfall_modules_commands, self.__download_status)
3235
threads.append(threadHelper.create_thread(moduleWorker.execute))
3336

3437
return threads

0 commit comments

Comments
 (0)