Skip to content

Commit

Permalink
Debug non blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
forsyth2 committed Oct 31, 2024
1 parent 082afc7 commit b2635f4
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 10 deletions.
61 changes: 61 additions & 0 deletions tests/scripts/test_non_blocking.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# First, we have to set up Globus according to https://github.com/E3SM-Project/zstash/discussions/329

Check failure on line 1 in tests/scripts/test_non_blocking.sh

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

tests/scripts/test_non_blocking.sh#L1

Tips depend on target shell and yours is unknown. Add a shebang or a 'shell' directive.
# Log in to Globus
# Authenticate LCRC Improv DTN
# Authenticate NERSC Perlmutter
source /lcrc/soft/climate/e3sm-unified/load_latest_e3sm_unified_chrysalis.sh
cd /home/ac.forsyth2/ez
mkdir zstash_dirs
cd zstash_dirs/
mkdir zstash_demo; echo 'file0 stuff' > zstash_demo/file0.txt
zstash create --hpss=globus://15288284-7006-4041-ba1a-6b52501e49f1/~/manual_run zstash_demo
# globus_sdk.services.transfer.errors.TransferAPIError: ('POST', 'https://transfer.api.globus.org/v0.10/endpoint/15288284-7006-4041-ba1a-6b52501e49f1/autoactivate?if_expires_in=600', None, 400, 'ClientError.AuthenticationFailed', 'No credentials supplied', 'msYY54WXq')
rm ~/.globus-native-apps.cfg
zstash create --hpss=globus://15288284-7006-4041-ba1a-6b52501e49f1/~/manual_run zstash_demo
# Auth Code prompt appears twice


cd /lcrc/group/e3sm/ac.forsyth2/E3SMv2_test/zstash_extractions
du -sh v2.NARRM.historical_0151/
# That's 22 GB. Let's try to compress it with zstash.
cd v2.NARRM.historical_0151/tests

# From https://docs.e3sm.org/zstash/_build/html/main/usage.html:
# `--maxsize MAXSIZE`` specifies the maximum size (in GB) for tar files. The default is 256 GB. Zstash will create tar files that are smaller than MAXSIZE except when individual input files exceed MAXSIZE (as individual files are never split up between different tar files).
# `--non-blocking` Zstash will submit a Globus transfer and immediately create a subsequent tarball. That is, Zstash will not wait until the transfer completes to start creating a subsequent tarball. On machines where it takes more time to create a tarball than transfer it, each Globus transfer will have one file. On machines where it takes less time to create a tarball than transfer it, the first transfer will have one file, but the number of tarballs in subsequent transfers will grow finding dynamically the most optimal number of tarballs per transfer. NOTE: zstash is currently always non-blocking.

# Make maxsize 1 GB. This will create a new tar after every 1 GB of data.
zstash create -v --hpss=globus://nersc/home/f/forsyth/test_290_v1 --maxsize 1 .

# DEBUG: Closing tar archive 000000.tar
# INFO: Creating new tar archive 000001.tar

# In a different window:
ls /lcrc/group/e3sm/ac.forsyth2/E3SMv2_test/zstash_extractions/v2.NARRM.historical_0151/tests/zstash
# 000000.tar 000001.tar 000002.tar 000003.tar 000004.tar 000005.tar index.db

# So, we can clearly see the tars are being created immediately.
# On the Globus website, test_290_v1 000000 transfer is complete.
# And test_290_v1 000001 transfer is in progress.

# This is the `--non-blocking` behavior, even though we did not specify it.

# Now, with changes in this PR:
conda activate zstash_dev_issue_290
cd /lcrc/group/e3sm/ac.forsyth2/E3SMv2_test/zstash_extractions/v2.NARRM.historical_0151/tests
rm -rf zstash
zstash create -v --hpss=globus://nersc/home/f/forsyth/test_290_v2 --maxsize 1 --non-blocking .
# DEBUG: Closing tar archive 000000.tar
# INFO: Creating new tar archive 000001.tar
# In a different window:
ls /lcrc/group/e3sm/ac.forsyth2/E3SMv2_test/zstash_extractions/v2.NARRM.historical_0151/tests/zstash
# 000000.tar 000001.tar index.db
# # On the Globus website, test_290_v1 000000 transfer is complete.
# And test_290_v1 000001 transfer is in progress.
ls /lcrc/group/e3sm/ac.forsyth2/E3SMv2_test/zstash_extractions/v2.NARRM.historical_0151/tests/zstash
# 000000.tar 000001.tar 000002.tar 000003.tar 000004.tar 000005.tar index.db
ls /lcrc/group/e3sm/ac.forsyth2/E3SMv2_test/zstash_extractions/v2.NARRM.historical_0151/tests/zstash
000000.tar 000002.tar 000004.tar 000006.tar 000008.tar 00000a.tar
000001.tar 000003.tar 000005.tar 000007.tar 000009.tar index.db
# Command completed on the command line
# But on Globus website:
# Completed -- test_290_v2 000000, test_290_v2 000001, test_290_v2 index
6 changes: 5 additions & 1 deletion zstash/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ def create():
failures: List[str] = create_database(cache, args)

# Transfer to HPSS. Always keep a local copy.
hpss_put(hpss, get_db_filename(cache), cache, keep=True)
hpss_put(
hpss, get_db_filename(cache), cache, keep=True, non_blocking=args.non_blocking
)

globus_finalize(non_blocking=args.non_blocking)

Expand Down Expand Up @@ -254,6 +256,7 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:
args.keep,
args.follow_symlinks,
skip_tars_md5=args.no_tars_md5,
non_blocking=args.non_blocking,
)
except FileNotFoundError:
raise Exception("Archive creation failed due to broken symlink.")
Expand All @@ -268,6 +271,7 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:
args.keep,
args.follow_symlinks,
skip_tars_md5=args.no_tars_md5,
non_blocking=args.non_blocking,
)

# Close database
Expand Down
9 changes: 7 additions & 2 deletions zstash/globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def file_exists(name: str) -> bool:


def globus_transfer(
remote_ep: str, remote_path: str, name: str, transfer_type: str
remote_ep: str, remote_path: str, name: str, transfer_type: str, non_blocking: bool
): # noqa: C901
global transfer_client
global local_endpoint
Expand Down Expand Up @@ -247,7 +247,10 @@ def globus_transfer(
sys.exit(1)

if transfer_type == "get" and task_id:
globus_wait(task_id)
# non_blocking => do not wait for the last transfer to finish before creating a new tar
# not non_blocking => blocking => wait for the last transfer to finish before creating a new tar
if not non_blocking:
globus_wait(task_id)


def globus_wait(task_id: str):
Expand Down Expand Up @@ -319,6 +322,8 @@ def globus_finalize(non_blocking: bool = False):
logger.error("Exception: {}".format(e))
sys.exit(1)

# non_blocking => do not wait for the last transfer to finish before creating a new tar
# not non_blocking => blocking => wait for the last transfer to finish before creating a new tar
if not non_blocking:
if task_id:
globus_wait(task_id)
Expand Down
9 changes: 6 additions & 3 deletions zstash/hpss.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def hpss_transfer(
transfer_type: str,
cache: str,
keep: bool = False,
non_blocking: bool = False,
):
if hpss == "none":
logger.info("{}: HPSS is unavailable".format(transfer_type))
Expand Down Expand Up @@ -87,7 +88,7 @@ def hpss_transfer(

if scheme == "globus":
# Transfer file using the Globus Transfer Service
globus_transfer(endpoint, url_path, name, transfer_type)
globus_transfer(endpoint, url_path, name, transfer_type, non_blocking)
else:
# Transfer file using `hsi`
command: str = 'hsi -q "cd {}; {} {}"'.format(hpss, transfer_command, name)
Expand All @@ -104,11 +105,13 @@ def hpss_transfer(
os.remove(file_path)


def hpss_put(hpss: str, file_path: str, cache: str, keep: bool = True):
def hpss_put(
hpss: str, file_path: str, cache: str, keep: bool = True, non_blocking: bool = False
):
"""
Put a file to the HPSS archive.
"""
hpss_transfer(hpss, file_path, "put", cache, keep)
hpss_transfer(hpss, file_path, "put", cache, keep, non_blocking)


def hpss_get(hpss: str, file_path: str, cache: str):
Expand Down
3 changes: 2 additions & 1 deletion zstash/hpss_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def add_files(
keep: bool,
follow_symlinks: bool,
skip_tars_md5: bool = False,
non_blocking: bool = False,
) -> List[str]:

# Now, perform the actual archiving
Expand Down Expand Up @@ -156,7 +157,7 @@ def add_files(
hpss: str = config.hpss
else:
raise TypeError("Invalid config.hpss={}".format(config.hpss))
hpss_put(hpss, os.path.join(cache, tfname), cache, keep)
hpss_put(hpss, os.path.join(cache, tfname), cache, keep, non_blocking)

# Update database with files that have been archived
# Add a row to the "files" table,
Expand Down
22 changes: 19 additions & 3 deletions zstash/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ def update():
hpss = config.hpss
else:
raise TypeError("Invalid config.hpss={}".format(config.hpss))
hpss_put(hpss, get_db_filename(cache), cache, keep=True)
hpss_put(
hpss, get_db_filename(cache), cache, keep=True, non_blocking=args.non_blocking
)

globus_finalize(non_blocking=args.non_blocking)

Expand Down Expand Up @@ -242,14 +244,28 @@ def update_database( # noqa: C901
try:
# Add files
failures = add_files(
cur, con, itar, newfiles, cache, keep, args.follow_symlinks
cur,
con,
itar,
newfiles,
cache,
keep,
args.follow_symlinks,
non_blocking=args.non_blocking,
)
except FileNotFoundError:
raise Exception("Archive update failed due to broken symlink.")
else:
# Add files
failures = add_files(
cur, con, itar, newfiles, cache, keep, args.follow_symlinks
cur,
con,
itar,
newfiles,
cache,
keep,
args.follow_symlinks,
non_blocking=args.non_blocking,
)

# Close database
Expand Down

0 comments on commit b2635f4

Please sign in to comment.