Skip to content

Commit cb1b367

Browse files
committed
refactor: Take all rpaths as resource names only in transactions
Proof of concept for a single repo-and-branch-scoped transaction. This ties a transaction to a single repo and branch by taking all file and directory names by resource only instead of a full URI. Naturally, this has the subtle side effect that given full URIs are silently understood as nested paths, and uploaded to the transaction branch without loud errors or warnings. A section was added to the transaction docs that details this behavior, but it might be safer to check the input path against existing repos and branches.
1 parent 166d535 commit cb1b367

File tree

5 files changed

+32
-13
lines changed

5 files changed

+32
-13
lines changed

docs/guides/transactions.md

+10-3
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ from lakefs_spec import LakeFSFileSystem
2020
fs = LakeFSFileSystem()
2121

2222
with fs.transaction("repo", "main") as tx:
23-
fs.put_file("train-data.txt", f"repo/{tx.branch.id}/train-data.txt")
23+
fs.put_file("train-data.txt", "train-data.txt")
2424
tx.commit(message="Add training data")
25-
fs.put_file("test-data.txt", f"repo/{tx.branch.id}/test-data.txt")
25+
fs.put_file("test-data.txt", "test-data.txt")
2626
sha = tx.commit(message="Add test data")
2727
tx.tag(sha, name="My train-test split")
2828
```
@@ -35,6 +35,13 @@ The full list of supported lakeFS versioning operations (by default, these opera
3535
* [`rev_parse`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.rev_parse), for parsing revisions like branch/tag names and SHA fragments into full commit SHAs.
3636
* [`tag`](../reference/lakefs_spec/transaction.md#lakefs_spec.transaction.LakeFSTransaction.tag), for creating a tag pointing to a commit.
3737

38+
## Limitations of transactions
39+
40+
Transactions are scoped to a single repository and branch only, equal to those given to the `fs.transaction()` context manager.
41+
When uploading files in a transaction via `fs.put()` or `fs.put_file()`, you **must** give all remote paths as file names.
42+
If you use a fully qualified URI, leading repository and branch names will be interpreted as subdirectories, which will be created on upload.
43+
No warnings or errors will be thrown, so be sure to double-check your paths in all transaction scopes.
44+
3845
## Lifecycle of ephemeral transaction branches
3946

4047
You can control the lifecycle for a transaction branch with the `delete` argument:
@@ -56,7 +63,7 @@ from lakefs_spec import LakeFSFileSystem
5663
fs = LakeFSFileSystem()
5764

5865
with fs.transaction("repo", "main", delete="onsuccess") as tx:
59-
fs.put_file("my-file.txt", f"repo/{tx.branch.id}/my-file.txt")
66+
fs.put_file("my-file.txt", "my-file.txt")
6067
tx.commit(message="Add my-file.txt")
6168
raise ValueError("oops!")
6269
```

src/lakefs_spec/spec.py

+3
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,9 @@ def put_file(
684684
lpath = stringify_path(lpath)
685685
rpath = stringify_path(rpath)
686686

687+
if self._intrans:
688+
rpath = self.transaction.make_uri(rpath)
689+
687690
if precheck and Path(lpath).is_file():
688691
remote_checksum = self.checksum(rpath)
689692
local_checksum = md5_checksum(lpath, blocksize=self.blocksize)

src/lakefs_spec/transaction.py

+10
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"""
44

55
import logging
6+
import os
67
import random
78
import string
89
import warnings
@@ -145,6 +146,15 @@ def __exit__(self, exc_type, exc_val, exc_tb):
145146
if self.delete == "always" or (success and self.delete == "onsuccess"):
146147
self._ephemeral_branch.delete()
147148

149+
def make_uri(self, path: str | os.PathLike[str]) -> str:
150+
spath = str(path)
151+
# NB: this fails silently if the input path is already fully qualified.
152+
# However, in general, it's impossible to distinguish between a
153+
# fully qualified path and a normal nested path, so at most, we
154+
# could split off the first segment of the input and check it against existing
155+
# repositories.
156+
return "/".join([self.repository, self.branch.id, spath])
157+
148158
@property
149159
def branch(self):
150160
return self._ephemeral_branch

tests/test_put_file.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def test_no_change_postcommit(
2121
message = f"Add file {random_file.name}"
2222

2323
with fs.transaction(repository, temp_branch) as tx:
24-
fs.put(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}")
24+
fs.put(lpath, random_file.name)
2525
tx.commit(message=message)
2626

2727
commits = list(temp_branch.log(max_amount=2))
@@ -31,7 +31,7 @@ def test_no_change_postcommit(
3131

3232
# put the same file again, this time the diff is empty
3333
with fs.transaction(repository, temp_branch) as tx:
34-
fs.put(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}", precheck=False)
34+
fs.put(lpath, random_file.name, precheck=False)
3535
tx.commit(message=f"Add file {random_file.name}")
3636

3737
# check that no other commit has happened.

tests/test_transactions.py

+7-8
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@ def test_transaction_commit(
1717
random_file = random_file_factory.make()
1818

1919
lpath = str(random_file)
20-
rpath = f"{repository.id}/{temp_branch.id}/{random_file.name}"
2120

2221
message = f"Add file {random_file.name}"
2322

2423
with fs.transaction(repository, temp_branch) as tx:
25-
fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}")
24+
fs.put_file(lpath, random_file.name)
2625
assert len(tx.files) == 1
2726
# sha is a placeholder for the actual SHA created on transaction completion.
2827
sha = tx.commit(message=message)
@@ -65,7 +64,7 @@ def test_transaction_merge(
6564
tbname = tx.branch.id
6665
lpath = str(random_file)
6766
# stage a file on the transaction branch...
68-
fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}")
67+
fs.put_file(lpath, random_file.name)
6968
# ... commit it with the above message
7069
tx.commit(message=message)
7170
# ... and merge it into temp_branch.
@@ -90,7 +89,7 @@ def test_transaction_revert(
9089
message = f"Add file {random_file.name}"
9190

9291
with fs.transaction(repository, temp_branch, automerge=True) as tx:
93-
fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}")
92+
fs.put_file(lpath, random_file.name)
9493
tx.commit(message=message)
9594
revert_commit = tx.revert(temp_branch, temp_branch.head)
9695

@@ -113,7 +112,7 @@ def test_transaction_failure(
113112

114113
try:
115114
with fs.transaction(repository, temp_branch) as tx:
116-
fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}")
115+
fs.put_file(lpath, random_file.name)
117116
tx.commit(message=message)
118117
raise RuntimeError("something went wrong")
119118
except RuntimeError:
@@ -159,8 +158,8 @@ def test_warn_uncommitted_changes(
159158
lpath = str(random_file)
160159

161160
with pytest.warns(match="uncommitted changes.*lost"):
162-
with fs.transaction(repository, temp_branch) as tx:
163-
fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}")
161+
with fs.transaction(repository, temp_branch):
162+
fs.put_file(lpath, random_file.name)
164163

165164

166165
def test_warn_uncommitted_changes_on_persisted_branch(
@@ -175,4 +174,4 @@ def test_warn_uncommitted_changes_on_persisted_branch(
175174

176175
with pytest.warns(match="uncommitted changes(?:(?!lost).)*$"):
177176
with fs.transaction(repository, temp_branch, delete="never") as tx:
178-
fs.put_file(lpath, f"{repository.id}/{tx.branch.id}/{random_file.name}")
177+
fs.put_file(lpath, random_file.name)

0 commit comments

Comments
 (0)