-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathextractMtree.py
executable file
·1786 lines (1334 loc) · 66.1 KB
/
extractMtree.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
#
# vim: set ts=4 sw=4 expandtab :
# Copyright (c) 2017 Timothy Savannah - All Rights Reserved
# This code is licensed under the terms of the APACHE license version 2.0
#
# extractMtree.py - Extracts the mtree from all available packages,
# attempting short-reads where possible to limit bandwidth,
# and assemble a compressed json database for use by whatprovides_upstream
#
# IMPORTANT:
#
# PLEASE - ENSURE THAT YOU HAVE AT LEAST 6 MIRRORS UNCOMMENTED IN
# /etc/pacman.d/mirrorlist .
#
# You can run with less, but you will be prompted, unless you specify
# an alternate number of threads (see --help)
#
#
# REPO Friendly -
# The MTREE is at the start of the tar archive, and is almost always found
# within the first 200K. We try to download just the first 200K of each package first,
# which will be a total of 1.9G split amongst the mirrors, using 6 mirrors (default),
# means each mirror will be hit by, on average, just 300MB in order to build one of these files.
#
# Of course, a few files are in different-than-stard tar formats (Why? Someone is using an alternate
# than GNU tar and submitting packages, methinks. Or maybe is has something to do with an optional
# GPG signing that works different on some signed packages than others... either way, for these
# snall handful of packages, we have to download the full archive package.
#
# Also, by default, only UPDATES will be processed. The existing database is scanned and imported,
# and the versions compared against the latest versions. If a mismatch, default (can be slightly expanded, see #usage)
# is if the version is lower, then download and refresh the list. Can be changed to just "if different" (needed sometimes,
# like if pkg version was a git commit and then changed to a version, it could be seen as having decreased version)
#
# This means only the FIRST generation will average 300MB load to repos, and subseqant updates (I generally update
# once a week which equals about 300 package updates)
# yield only 60MB total, ** average 6MB per repo! **
#
# This means that the repos should get less traffic from this application than from my normal package update traffic
#
# IMPORTANT NOTE:
#
# Even though, as mentioned the load added to repos is about the size of a -Syu operation from a signle client for first generation,
# PLEASE
# PLEASE
# PLEASE
# Don't generate your own providesDB unless you absolutely HAVE to. And if you do, PLEASE download my provided
# database and update from that.
# This will shrink your bandwidth requirements to, on average, from 300MB per repo, to 10MB per repo.
#
# I don't want to have this tool add extra load to the already generous folks who provide
# the mirrors
import copy
import errno
import gzip
import os
import json
import pprint
import random
import re
import subprocess
import sys
import tarfile
import tempfile
import traceback
import time
import gc
from io import BytesIO
try:
import func_timeout
from func_timeout.dafunc import raise_exception
from func_timeout.StoppableThread import StoppableThread
except ImportError:
sys.stderr.write('ERROR: Cannot import python module func_timeout - not installed?\n')
sys.exit(1)
try:
from cmp_version import VersionString
canCompareVersions = True
# "Release" support (i.e. -5) was added in cmp_version 3.0.0, so alert if we are using an older version
import cmp_version as cmp_version_mod
if VersionString(cmp_version_mod.__version__) < VersionString('3.0.0'):
sys.stderr.write('WARNING: cmp_version module is older than 3.0.0 and does not properly compare versions with releases. Please update to a newer version.\n')
except ImportError:
sys.stderr.write('WARNING: Cannot import python module cmp_version - not installed?\nWARNING: Cannot compare versions. Will assume all != versions are >.\n')
canCompareVersions = False
try:
PermissionError
except NameError:
PermissionError = IOError
__version__ = '1.1.0'
__version_tuple__ = (1, 1, 0)
####################
### Constants
################
global LATEST_FILE_FORMAT
LATEST_FILE_FORMAT = '0.2'
SUPPORTED_DATABASE_VERSIONS = ('0.1', '0.2')
global PROVIDES_DB_LOCATION
PROVIDES_DB_LOCATION = "/var/lib/pacman/.providesDB"
# USE_ARCH - Package arch to use. TODO: Allow others
# NOTE: If this arch is not found, "any" will be tried
global USE_ARCH
USE_ARCH = "x86_64"
####################
### Tuneables
################
DEFAULT_SUBPROCESS_BUFSIZE = 1024 * 500 # 500K Bufsize
DEFAULT_SHORT_FETCH_SIZE = 1024 * 200 # Try to fetch first 200K to find MTREE
# MAX_THREADS - Max number of threads
MAX_THREADS = 6
# SHORT_TIMEOUT/LONG_TIMEOUT - Timeouts for short read and full read, in seconds
SHORT_TIMEOUT = 15
LONG_TIMEOUT = ( 60 * 8 )
# Max extra urls added to each thread.
# Normally, a repo is assigned to a thread, but if any are extra
# up to this many will be made available to each thread.
global MAX_EXTRA_URLS
MAX_EXTRA_URLS = 3
MAX_REPOS = MAX_THREADS + MAX_EXTRA_URLS
####################
### Definitions
################
if sys.version_info.major == 2:
ALL_STR_TYPES = ( str, unicode )
ALL_DECODED_STR_TYPES = ( str, unicode )
else:
ALL_STR_TYPES = ( str, bytes )
ALL_DECODED_STR_TYPES = ( str, )
isStrType = lambda arg : issubclass(arg.__class__, ALL_STR_TYPES)
isDecodedStrType = lambda arg : issubclass(arg.__class__, ALL_DECODED_STR_TYPES)
# Try to use shared memory slot, if available
global USE_TEMP_DIR
USE_TEMP_DIR = None
def getUseTempDir():
'''
getUseTempDir - Get the directory that should be used for
short-term temp files.
We prefer to use /dev/shm (directly in memory), if available.
Otherwise, fall back to system temp dir ( /tmp )
@return <str> Temporary directory name to use
'''
global USE_TEMP_DIR
if USE_TEMP_DIR is None:
if os.path.exists('/dev/shm') and os.access('/dev/shm', os.W_OK):
USE_TEMP_DIR = '/dev/shm'
else:
USE_TEMP_DIR = tempfile.gettmpdir()
return USE_TEMP_DIR
class FailedToConvertDatabaseException(ValueError):
'''
FailedToConvertDatabaseException - Exception raised when we try (but fail)
to convert provides database from an old format to current format
'''
pass
def convertOldDatabase(oldVersion, data):
'''
convertOldDatabase - Convert providesDB from an older format to the current format
@param oldVersion <str> - The old database format version
@param data <dict> - The old database dict
@raises - FailedToConvertDatabaseException if failure to convert
'''
global LATEST_FILE_FORMAT
if oldVersion == LATEST_FILE_FORMAT:
return
if oldVersion == '0.1':
for key in list(data.keys()):
if key == '__vers': # Not in version 0.1, but whatever..
continue
oldData = data[key]
if isStrType(oldData.__class__):
# If string, was an error
newData = {
'files' : [], # No files
'version' : '', # Unknown version
'error' : oldData, # Error string
}
data[key] = newData
elif issubclass(oldData.__class__, (list, tuple)): # Should be list, but test tuple too for some reason..
newData = {
'files' : copy.copy(oldData), # Copy the data ( list of files ). Probably not required to copy, but simplifies GC
'version' : '', # Unknown version
'error' : None, # No error
}
data[key] = newData
else:
raise FailedToConvertDatabaseException('Failed to convert old data (version %s) to latest version: %s' %(oldVersion, LATEST_FILE_FORMAT))
else:
raise FailedToConvertDatabaseException('Old database version "%s" is not supported for update.' %(oldVersion, ))
data['__vers'] = LATEST_FILE_FORMAT
# No return - data modified inline
def writeDatabase(results):
'''
writeDatabase - Writes the database to disk
First, it will try to write to
@param results <dict> - The dict to write
MUST BE IN CURRENT DATABASE FORMAT!
NOTE: Garbage collector runs at the end of this function.
NOTE: If we fail to write to PROVIDES_DB_LOCATION, we will write to
a tempfile which will be printed to stderr
'''
global PROVIDES_DB_LOCATION
wroteTo = PROVIDES_DB_LOCATION
compressed = gzip.compress( json.dumps(results).encode('utf-8') )
try:
with open(PROVIDES_DB_LOCATION, 'wb') as f:
f.write( compressed )
except Exception as exc:
tempFile = tempfile.NamedTemporaryFile(mode='wb', delete=False)
sys.stderr.write('\nFailed to open "%s" for writing ( %s ). Dumping to tempfile:\n%s\n' %(PROVIDES_DB_LOCATION, str(exc), tempFile.name, ))
tempFile.write( compressed )
tempFile.close()
wroteTo = tempFile.name
# Force this now - it's big!
del compressed
gc.collect()
return wroteTo
def decompressDataSubprocess(data, cmd, bufSize=DEFAULT_SUBPROCESS_BUFSIZE):
'''
decompressDataSubprocess - Decompress given compressed #data, using
a provided decompression command, #cmd
Likely this will use /dev/shm as an intermediary (if available) @see getUseTempDir
This is used in lieu of the builtin python modules for speed sake
(several orders of magnitude faster)
@param data <bytes> - Compressed data
@param cmd list<str> - Decompression command. Will NOT use a whell to launch,
so first element should be a fully-resolved command, followed by
args to decompress, read from stdin, write to stdout
@param bufSize <int> default DEFAULT_SUBPROCESS_BUFSIZE - Number of bytes to use for buffers
@return <bytes> - Decompressed data
NOTE: If FunctionTimedOut is raised while this method is being called,
such as if this is called from a func_timeout function or StoppableThread,
it will clean up and then raise that exception after cleanup.
'''
useTempDir = getUseTempDir()
fte = None
tempFile = None
devnull = open(os.devnull, 'w')
# Short delay to ensure everything is init'd and ready to go
time.sleep(.002)
try:
tempFile = tempfile.NamedTemporaryFile(mode='w+b', buffering=bufSize, dir=useTempDir, prefix='mtree_', delete=True)
tempFile.write(data)
tempFile.flush()
tempFile.seek(0)
pipe = subprocess.Popen(cmd, shell=False, stdin=tempFile, stdout=subprocess.PIPE, stderr=devnull, close_fds=True, bufsize=bufSize)
time.sleep(.01)
result = pipe.stdout.read()
nextResult = True
while True:
time.sleep(.005)
nextResult = pipe.stdout.read()
if nextResult != b'':
result += nextResult
else:
break
pipe.wait()
except func_timeout.FunctionTimedOut as _fte:
result = None
fte = _fte
if pipe.poll() is None:
try:
pipe.terminate()
except:
pass
time.sleep(.2)
if pipe.poll() is None:
try:
pipe.kill()
except:
pass
pipe.wait()
devnull.close()
if tempFile is not None:
try:
tempFile.close()
except:
pass
if fte:
raise_exception( [fte] )
return result
def decompressZlib(data, bufSize=DEFAULT_SUBPROCESS_BUFSIZE):
'''
decompressZlib - Decompress zlib/gz/DEFLATE data using external executable
@see decompressDataSubprocess
@param data <bytes> - Compressed data
@param bufSize <int> default DEFAULT_SUBPROCESS_BUFSIZE - Number of bytes to use for buffer size
@return data <bytes> - Decompressed data
'''
# -dnc - Decompress to stdout and don't worry about file
return decompressDataSubprocess(data, ['/usr/bin/gzip', '-dnc'], bufSize)
def decompressXz(data, bufSize=DEFAULT_SUBPROCESS_BUFSIZE):
'''
decompressXz - Decompress lzma/xz data using external executable
@see decompressDataSubprocess
@param data <bytes> - Compressed data
@param bufSize <int> default DEFAULT_SUBPROCESS_BUFSIZE - Number of bytes to use for buffer size
@return data <bytes> - Decompressed data
'''
# -dnc - Decompress to stdout and don't worry about file
# -dc - Decompress to stdout
return decompressDataSubprocess(data, ['/usr/bin/xz', '-dc'], bufSize)
def getFileSizeFromTarHeader(header):
'''
getFileSizeFromTarHeader - Try to get the file size from a file's TAR header
@param header <bytes/str> - The TAR header relating to the file of interest
@return <int> - The size, in bytes, of the file stored in the TAR archive
NOTE: This function is not fullproof. There are lots of extensions that
modify the header and offsets, and this does not try to detect them all.
It uses the most common GNU Tar extension that system "tar" command generates
NOTE: If an exception is raised, this function should do a full fetch and
use the "tar" module to support all extensions. This function is intended
to be used in the "short-read" path.
TODO: Extend to support more files
'''
# Expected locations in tar header of size. Very variable because of multiple extensions, etc
# These are used in the "short read" path. Upon failure, the full tar will be downloaded
# and the "tar" module used (which supports more format versions)
SIZE_IDX_START = 124
SIZE_IDX_END = 124 + 12
trySection = header[SIZE_IDX_START : SIZE_IDX_END]
# Size is octal
return int(trySection, 8)
def getFilenamesFromMtree(mtreeContents):
'''
getFilenamesFromMtree - Extracts all the "provides" filenames from
the package's .MTREE file.
@param mtreeContents <str> - The .MTREE file extracted from archive
@return list<str> - A list of filenames this package provides.
'''
lines = mtreeContents.split('\n')
rePat = re.compile('^\.(?P<filename>.+) time')
ret = []
for line in lines:
if not line or line[0] != '.':
continue
matchObj = rePat.match(line)
if not matchObj:
# Uh oh..
continue
ret.append(matchObj.groupdict()['filename'])
return ret
def fetchFromUrl(url, numBytes, isSuperVerbose=False):
'''
fetchFromUrl - Fetches #numBytes bytes of data from a given #url
@param url <str> - Url to fetch
@param numBytes <None/int> - If None, fetch entire file.
Otherwise, fetch first N bytes.
@param isSuperVerbose <bool> default False, if True will print curl progress
This will get messy if numThreads > 1
@return <bytes> - File data
NOTE: This function uses "curl" to best handle ftp vs http vs https
NOTE: If "isSuperVerbose" is set to True, the curl progress will be output.
This will get real ugly when number of threads are > 1
'''
useStderr = None
if not isSuperVerbose:
extraArgs = ['--silent']
useStderr = open(os.devnull, 'w')
else:
extraArgs = []
pipe = subprocess.Popen(["/usr/bin/curl", '-k'] + extraArgs + [url], shell=False, stdout=subprocess.PIPE, stderr=useStderr)
if numBytes:
urlContents = pipe.stdout.read(numBytes)
pipe.stdout.close()
else:
urlContents = pipe.stdout.read()
ret = pipe.wait()
if useStderr is not None:
useStderr.close()
if b'404 Not Found' in urlContents and '-x86_64' in url:
return fetchFromUrl(url.replace('-x86_64', '-any'), numBytes, isSuperVerbose)
return urlContents
def refreshPacmanDatabase():
'''
refreshPacmanDatabase - Refreshes the pacman package database ( -Sy )
@return <bool> - True if successful, otherwise False
NOTE: You must be root to refresh the database.
You should be root running this script at all though...
'''
if os.getuid() != 0:
sys.stderr.write('WARNING: Cannot refresh pacman database.\n')
return False
else:
ret = subprocess.Popen(['/usr/bin/pacman', '-Sy'], shell=False).wait()
if ret != 0:
sys.stderr.write('WARNING: pacman -Sy returned non-zero: %d\n' %(ret,))
return False
return True
def getAllPackagesInfo():
'''
getAllPackagesInfo - Get the "info" for all packages.
This includes repo, package name, package version
@return list< tuple<str(repo), str(name), str(version)> > - The collected info
for all packages in the repos
'''
devnull = open(os.devnull, 'w')
pipe = subprocess.Popen(["/usr/bin/pacman", "-Sl"], shell=False, stdout=subprocess.PIPE, stderr=devnull)
contents = pipe.stdout.read()
pipe.wait()
devnull.close()
return [tuple(x.split(' ')[0:3]) for x in contents.decode('utf-8').split('\n') if x and ' ' in x]
def getRepoUrls(maxRepos=MAX_REPOS):
'''
getRepoUrls - Extract the repo urls from /etc/pacman.d/mirrorlist
@return list<str> - A list of repos, with "%s" replacing $repo and $arch.
TODO: This replace should happen in another function, in case $repo comes AFTER $arch
No repos as far as I can tell do this, as they mirror a fixed format, but it IS possible
'''
global USE_ARCH
if not maxRepos:
gatheredEnough = lambda _repos : False
else:
gatheredEnough = lambda _repos : len(_repos) >= maxRepos
nextLine = True
repos = []
repoRE = re.compile('^[ \t]*[sS]erver[ \t]*=[ \t]*(?P<repo_url>[^ \t#]+)[ \t]*([#].*){0,1}$')
with open('/etc/pacman.d/mirrorlist', 'rt') as f:
nextLine = f.readline()
while nextLine != '' and not gatheredEnough(repos):
matchObj = repoRE.match(nextLine.strip())
if matchObj:
groupDict = matchObj.groupdict()
if groupDict['repo_url']:
ret = groupDict['repo_url'].replace('$repo', '%s').replace('$arch', USE_ARCH)
while ret.endswith('/'):
ret = ret[:-1]
ret += '/%s'
repos.append(ret)
nextLine = f.readline()
if not repos:
raise Exception('Failed to find repo URL from /etc/pacman.d/mirrorlist. Are any not commented?')
return repos
def shuffleLst(lst):
'''
shuffleLst - Randomly sort provided list
@param lst list - List to be randomly sorted
@return randomly sorted list
NOTE: #lst is NOT modified
'''
if not lst:
return list()
lstCopy = lst[:]
ret = []
while len(lstCopy) > 1:
ret.append( lstCopy.pop( random.randint(0, len(lstCopy)-1) ) )
ret.append( lstCopy.pop() )
return ret
class RefObj(object):
'''
RefObj - An object that holds a reference to another object
Call to retrieve refrence, i.e. myObj = myRefObj()
'''
def __init__(self, ref):
'''
__init__ - Create a refObj
@param ref <object> - An object to hold reference to
'''
self.ref = ref
def __call__(self):
'''
__call__ - Return the reference this object holds
@return <object> - The object this RefObj is holding
'''
return self.ref
#REPO_URL = "http://mirrors.acm.wpi.edu/archlinux/%s/os/x86_64/%s"
#REPO_URLS = [ "http://mirrors.acm.wpi.edu/archlinux/%s/os/x86_64/%s" ]
class RetryWithFullTarException(Exception):
'''
RetryWithFullTarException - Exception raised when we failed to use
a short-read of the tar, to indicate to retry with full fetch
and tar module
'''
pass
class RetryWithNextMirrorException(Exception):
'''
RetryWithNextMirrorException - Indicate that there is an issue with this package
on the selected mirror (such as no data returned), and to try the next mirror
'''
pass
def getFileData(filename, decodeWith=None):
'''
getFileData - Read and decode a filename
@param filename <str> - Filename
@param decodeWith <str/None> - codec to decode results in, or None to leave as bytse
@return <bytes/str> - File data, either decoded with #decodeWith, or bytes if #decodeWith was None
'''
with open(filename, 'rb') as f:
contents = f.read()
if decodeWith:
contents = contents.decode(decodeWith)
return contents
class RunnerWorker(StoppableThread):
'''
RunnerWorker - A StoppableThread set to run a subset of packages.
@see createThreads
'''
def __init__(self, doPackages, resultsRef, failedPackageInfos, repoUrls, shortFetchSize=DEFAULT_SHORT_FETCH_SIZE, timeout=SHORT_TIMEOUT, longTimeout=LONG_TIMEOUT, isVerbose=False, isSuperVerbose=False):
'''
__init__ - Create a "RunnerWorker" object
@see createThreads
@param doPackages list < tuple < str, str, str > > - A list of package infos this thread should process
@param resultsRef RefObj < dict > - Reference to the global results
@param failedPackageInfos list - Global list where failed package infos should be appended
@param repoUrls list<str> - A list of urls, ready to be used as a format string (contains two %s, "repo" and "arch").
First is primary url
@param shortFetchSize <int> default DEFAULT_SHORT_FETCH_SIZE - Number of bytes to fetch for a "short fetch"
@param timeout <float> Default SHORT_TIMEOUT , The "short"/standard timeout period per package
@param longTimeout <float> default LONG_TIMEOUT - The "long"/retry timeout period per package
@param isVerbose <bool> default False - Whether to be verbose or not
@param isSuperVerbose <bool> default False - Whether to be "super verbose"
'''
StoppableThread.__init__(self)
self.doPackages = doPackages
self.resultsRef = resultsRef
self.failedPackageInfos = failedPackageInfos
self.repoUrls = repoUrls
self.timeout = timeout
self.longTimeout = longTimeout
self.shortFetchSize = shortFetchSize
self.isVerbose = isVerbose
self.isSuperVerbose = isSuperVerbose
##############################################
######## doOne - Do a single package
########################################
def doOne(self, repoName, packageName, packageVersion, repoUrl, fetchedData=None, useTarMod=False):
'''
doOne - Do a single package. This is an internal function.
Use RunnerWorker.run instead.
@param repoName <str> - Repo name to use
@param packageName <str> - Package name to fetch
@param packageVersion <str> - The package version
@param repoUrl <str> - Repo url to try
@param fetchedData <None/bytes> default None - Data that has already been fetched, or None to do fetch
@param useTarMod <bool> default False - Whether to do a full fetch and use tar module. If False,
will be a "short fetch"
NOTES:
* May call itself with useTarMod=True if originally useTarMod=False but short-read failed
'''
isVerbose = self.isVerbose
shortFetchSize = self.shortFetchSize
resultsRef = self.resultsRef
if isVerbose and useTarMod is True:
print ( "Using full fetch and tar module for %s - %s" %(repoName, packageName) )
results = resultsRef()
if fetchedData is None:
finalUrl = repoUrl %( repoName, packageName + "-" + packageVersion + "-x86_64.pkg.tar.xz" )
if isVerbose:
print ( "Fetching url: " + finalUrl )
if useTarMod is False:
maxSize = shortFetchSize
else:
maxSize = None
tarContents = fetchFromUrl(finalUrl, maxSize)
else:
finalUrl = '[cached data]'
tarContents = fetchedData
if len(tarContents) == 0:
msg = 'Unable to fetch %s from: %s\n' %(packageName, finalUrl)
raise RetryWithNextMirrorException(msg)
if useTarMod is False:
data = decompressXz(tarContents[:shortFetchSize])
# Sometimes we don't find it, maybe format error, maybe didn't fetch
# enough (doTarMod will do a full fetch)
try:
# Try an rindex, as some tar's have an extra section which also contains filenames
mtreeIdx = data.rindex(b'.MTREE')
except Exception as ex1:
if isVerbose is True or useTarMod is False:
msg = "Could not find .MTREE in %s - %s - %s." %( repoName, packageName, packageVersion )
if useTarMod is False:
msg += ' retrying with full fetch and tar mod.\n\n'
raise RetryWithFullTarException(msg)
else:
if isVerbose is True:
msg += '\n\n'
sys.stderr.write(msg)
raise ex1
headerStart = data[mtreeIdx:]
try:
mtreeSize = getFileSizeFromTarHeader(headerStart)
compressedData = headerStart[512 : 512 + mtreeSize] # 512 is header size.
except Exception as ex2:
# If we failed with the "short fetch", try again with full fetch and tar module
if useTarMod is False:
return self.doOne(repoName, packageName, packageVersion, repoUrl, fetchedData, useTarMod=True)
raise ex2
else:
# doTarMod is True
data = decompressXz(tarContents)
if not data:
# Bad repo?
errorMsg = "WARNING: %s/%s on repo at %s did not return any data (mirror out of date? file corrupt?), " %( repoName, packageName, repoUrl)
sys.stderr.write("%s\n\n" %(errorMsg, ))
raise RetryWithNextMirrorException(errorMsg)
bio = BytesIO()
bio.write(data)
bio.seek(0)
tf = tarfile.open(fileobj=bio)
extractedMtreeFile = tf.extractfile('.MTREE')
compressedData = extractedMtreeFile.read()
try:
extractedMtreeFile.close()
except:
pass
mtreeData = decompressZlib(compressedData).decode('utf-8')
files = getFilenamesFromMtree(mtreeData)
results[packageName] = { 'files' : files, 'version' : packageVersion, 'error' : None }
if isVerbose:
sys.stdout.write("Got %d files for %s.\n\n" %(len(files), packageName ))
# END: doOne
###################################################
######## run -
######### Run through a list of packages
######### on a list of repos
#############################################
def run(self):
'''
run - Thread main. Runs through a list of packages on a list of repos.
May be called standalone (i.e. not via thread.start() ) for non-threaded run.
Uses args from init -
doPackages
resultsRef
failedPackageInfos
repoUrls
timeout
longTimeout
isVerbose
'''
# repoUrls - First is primary, others may or may not be used
doPackages = self.doPackages
resultsRef = self.resultsRef
failedPackageInfos = self.failedPackageInfos
repoUrls = self.repoUrls
# TODO: Rename self.timeout to self.shortTimeout
shortTimeout = self.timeout
longTimeout = self.longTimeout
numRepoUrls = len(repoUrls)
isVerbose = self.isVerbose
results = resultsRef()
for repoName, packageName, packageVersion in doPackages:
startTime = time.time()
gc.collect()
endTime = time.time()
time.sleep(1.5 - (endTime - startTime))
if isVerbose:
sys.stdout.write("Processing %s - %s: %s" %(repoName, packageName, isVerbose and '\n' or '') )
sys.stdout.flush()
wasSuccessful = False
isPackageMarkedFailed = False
needsFullTar = False
try:
# Iterate through all repo urls here, and break at the end of the loop
# if no exception is raised. If we "continue", we will try using the next repo.
global repoUrlIdx
global useLongTimeout
repoUrlIdx = 0
useLongTimeout = False
def moveToNextRepo():
'''
moveToNextRepo - Move to the next repo in the below loop.
This increases the iterator index and resets the "useLongTimeout" flag to False
'''
global repoUrlIdx
global useLongTimeout
repoUrlIdx += 1
# Only restore the short timeout if we haven't determined
# we need the full tar (full tar = long timeout always)
if needsFullTar is False:
useLongTimeout = False
else:
useLongTimeout = True
while repoUrlIdx < numRepoUrls:
useRepoUrl = repoUrls[repoUrlIdx]
if needsFullTar:
# If we got a RetryWithFullTarException once for this package,
# it means we downloaded the partial and were unable to extract
# the mtree. This will be true on all other mirrors, so it is per-package info tuple element
doOneKwargs = {'useTarMod' : True}
# Always use long timeout when doing full tar mode
useTimeout = useLongTimeout
else:
doOneKwargs = {'useTarMod' : False}
if useLongTimeout:
useTimeout = longTimeout
else:
useTimeout = shortTimeout
# Try to run a fetch
try:
func_timeout.func_timeout(useTimeout, self.doOne, (repoName, packageName, packageVersion, useRepoUrl), kwargs=doOneKwargs)
except RetryWithFullTarException as retryWithFullTarException1:
# If RetryWithFullTarException is raised, we could not parse the tar file,
# so retry with a full read and long timeout
if isVerbose:
sys.stderr.write( "Got RetryWithFullTarException [iter %d / %d ] on package %s at repo url %s. %s\n" % \
(repoUrlIdx + 1, numRepoUrls, packageName, useRepoUrl, str(retryWithFullTarException1) )
)
if needsFullTar:
sys.stderr.write('UNEXPECTED!! Got a "retry with full tar" but already was trying with full tar on package %s at repo url %s.\n\tGoing to move onto next mirror anyway....\n')
moveToNextRepo()
continue
else:
# Otherwise, DON'T increment the repoUrlIdx, instead just set needsFullTar
# and retry with same mirror. Also set to use longer timeout
needsFullTar = True
useLongTimeout = True
continue
except RetryWithNextMirrorException as retryNextMirrorException1:
# If RetryWithNextMirrorException is raised, we repeat the effort on the next mirror.
# so iterate next in loop
if isVerbose:
sys.stderr.write( "Got RetryWithNextMirrorException [ iter %d / %d ] on package %s at repo url %s. %s\n" % \
(repoUrlIdx + 1, numRepoUrls, packageName, useRepoUrl, str(retryNextMirrorException1) )
)
moveToNextRepo()
continue
except func_timeout.FunctionTimedOut as fte:
# Got a func_timeout, if we did the short timeout, move to long timeout.
# If we did the long timeout, move to next repo.
if isVerbose:
if useLongTimeout:
timeoutTypeStr = "using long timeout (will move onto next repo)"
else:
timeoutTypeStr = "using short timeout (will retry with long timeout)"
sys.stderr.write( "Got func_timeout %s [ iter %d / %d ] on package %s at repo url %s. %s\n" % \
(timeoutTypeStr, repoUrlIdx + 1, numRepoUrls, packageName, useRepoUrl, str(fte) )
)
if not useLongTimeout:
# We failed on short timeout, so switch to long timeout
useLongTimeout = True
# Do not move to next repo
continue
else:
# We failed on long timeout, switch back to short timeout and
# move to next repo
moveToNextRepo()
continue
except KeyboardInterrupt as kie:
# If control+c is hit, raise it to be handled higher in stack
raise kie
except Exception as e:
if isinstance(e, KeyboardInterrupt):