-
Notifications
You must be signed in to change notification settings - Fork 273
feat: delete orphaned files #1958
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
pyiceberg/table/inspect.py
Outdated
def all_manifests(self) -> "pa.Table": | ||
def all_manifests(self, snapshots: Optional[list[Snapshot]] = None) -> "pa.Table": | ||
import pyarrow as pa | ||
|
||
snapshots = self.tbl.snapshots() | ||
snapshots = snapshots or self.tbl.snapshots() | ||
if not snapshots: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another case of me treating snapshots and snapshot_id's the same... happy to enforce this being snapshot_id's instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's save that for another PR. I don't think we can just change this API since folks might be using this.. We could allow for an Union[list[snapshot], iterable[int]]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified this, let me know what you think
pyiceberg/table/__init__.py
Outdated
if orphaned_files: | ||
deletes = executor.map(self.io.delete, orphaned_files) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unsure if this should be a new executor but looks like its a singleton so shouldnt matter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks fine, we can just re-use the executor 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When one of the deletes would throw an error (maybe some other process had already cleaned up the file), then the whole execution would terminate. Should we add a try
block to swallow any related exception? Would be good to also add a test for this 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thas has been done and a test was added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @jayceslesar, sorry for the late review.
I think this is a great start, I left some comments, let me know what you think!
pyiceberg/table/__init__.py
Outdated
|
||
location = self.location() | ||
|
||
all_known_files = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not make this a set right away?
all_known_files = [] | |
all_known_files = set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
from pyiceberg.io.pyarrow import _fs_from_file_path | ||
|
||
location = self.location() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, should we move this variable assignment downward, where we start using it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done (also this was refactored up a little bit)
pyiceberg/table/__init__.py
Outdated
files_by_snapshots: Iterator["pa.Table"] = executor.map(lambda snapshot_id: self.inspect.files(snapshot_id), snapshot_ids) | ||
all_known_files.extend(pa.concat_tables(files_by_snapshots)["file_path"].to_pylist()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about just returning a set of paths? This way we can nicely union all of them into a set:
files_by_snapshots: Iterator["pa.Table"] = executor.map(lambda snapshot_id: self.inspect.files(snapshot_id), snapshot_ids) | |
all_known_files.extend(pa.concat_tables(files_by_snapshots)["file_path"].to_pylist()) | |
files_by_snapshots: Iterator[Set[str]] = executor.map(lambda snapshot_id: set(self.inspect.files(snapshot_id), snapshot_ids)["file_path"].to_pylist()) | |
datafile_paths = reduce(set.union, files_by_snapshots) | |
all_known_files.extend(datafile_paths) |
There will probably be quite a bit of overlap between the snapshots in terms of data files
pyiceberg/table/__init__.py
Outdated
if orphaned_files: | ||
deletes = executor.map(self.io.delete, orphaned_files) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks fine, we can just re-use the executor 👍
pyiceberg/table/__init__.py
Outdated
if orphaned_files: | ||
deletes = executor.map(self.io.delete, orphaned_files) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When one of the deletes would throw an error (maybe some other process had already cleaned up the file), then the whole execution would terminate. Should we add a try
block to swallow any related exception? Would be good to also add a test for this 👍
pyiceberg/table/__init__.py
Outdated
@@ -1371,6 +1375,45 @@ def to_polars(self) -> pl.LazyFrame: | |||
|
|||
return pl.scan_iceberg(self) | |||
|
|||
def delete_orphaned_files(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to add some options that we also have on the Java side, at a minimum:
older_than
: Remove orphan files created before this timestamp (Defaults to 3 days). It can be that some process is writing to the table, and has some files staged to be added to the metadata tree. If we don't take this into account, it might be that these files are removed in the period between writing and committing.dry_run
: When true, don't actually remove files (defaults to false). I think it would be nice to return a set of the number of files removed:
def delete_orphaned_files(self) -> None: | |
def delete_orphaned_files(self) -> Set[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason that older_than
is not a table property?
pyiceberg/table/inspect.py
Outdated
def all_manifests(self) -> "pa.Table": | ||
def all_manifests(self, snapshots: Optional[list[Snapshot]] = None) -> "pa.Table": | ||
import pyarrow as pa | ||
|
||
snapshots = self.tbl.snapshots() | ||
snapshots = snapshots or self.tbl.snapshots() | ||
if not snapshots: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's save that for another PR. I don't think we can just change this API since folks might be using this.. We could allow for an Union[list[snapshot], iterable[int]]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @jayceslesar, using InpsectTable
to get orphaned files to submit to the executor pool is a nice idea! Just some concerns / suggestions / debugging help 😄
pyiceberg/table/inspect.py
Outdated
files_by_snapshots: Iterator[Set[str]] = executor.map( | ||
lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()) | ||
) | ||
datafile_paths: set[str] = reduce(set.union, files_by_snapshots, set()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this always be empty? I don't see any Iterable
submitted to the executor pool above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed, lost this in a little refactor
pyiceberg/table/inspect.py
Outdated
|
||
from pyiceberg.io.pyarrow import _fs_from_file_path | ||
|
||
all_known_files = set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also want to have manifest list files here (I don't see them now). Otherwise, they'll be removed by the procedure and the table will be "corrupted".
(Related: when looking at Java tests, I noticed apache/iceberg#12957)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same goes for the current metadata JSON file, and I think to match Java behaviour we want to include all files in the metadata log of the current metadata file too.
I think there are more files we might be missing - I think tests would be nice to make sure we're not missing something! (Perhaps inspiration can be taken from the Java ones)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see! I just pushed a change that will capture those, as well as the statistic file paths
pyiceberg/table/inspect.py
Outdated
as_of = datetime.now(timezone.utc) - older_than if older_than else None | ||
all_files = [f for f in fs.get_file_info(selector) if f.type == FileType.File and (as_of is None or (f.mtime < as_of))] | ||
|
||
orphaned_files = set(all_files).difference(all_known_files) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to be careful here. all_files
is a list of these FileInfo
objects I think but all_known_files
is a set of str
s. So the set difference here won't do anything because a FileInfo
object won't be in a str
set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good catch this happened in a little refactor, just need to call f.path
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
pyiceberg/table/inspect.py
Outdated
|
||
from pyiceberg.io.pyarrow import _fs_from_file_path | ||
|
||
all_known_files = set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Part of me wonders whether we could expose this as a method: a public, documented inspect
utility that returns all files referenced by a table. Curious what others think about whether this would be useful, I'm not fully convinced myself. (We could also then restructure orphaned file detection to use that)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would likely make things simpler, inspect could use a little beefing up IMO, I came across #1626 which is a good start
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I am going to play around with this. It makes testing a lot easier
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, let me know what you think about the change I just pushed -- see all_known_files
. @Fokko vis as well -- this should make testing a lot easier (if I have both of your blessings here I will add tests for this function) and allow us to modify smarter going forward
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! I added a few comments. ptal :)
# exhaust | ||
list(deletes) | ||
logger.info(f"Deleted {len(orphaned_files)} orphaned files at {location}!") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: log an else case
deletes = executor.map(_delete, orphaned_files) | ||
# exhaust | ||
list(deletes) | ||
logger.info(f"Deleted {len(orphaned_files)} orphaned files at {location}!") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this might not necessary be always true, esp when _delete errors are suppressed.
what we do count the number of successfully deletes here? maybe _delete
can return True/False
for whether the delete was successful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the spark procedure outputs the orphan_file_location
which are all the files set to be deleted. this is pretty useful for logging
https://iceberg.apache.org/docs/nightly/spark-procedures/#output_7
|
||
def orphaned_files(self, location: str, older_than: Optional[timedelta] = timedelta(days=3)) -> Set[str]: | ||
"""Get all the orphaned files in the table. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a sentence explaining what orphaned files mean, maybe copy/paste from https://iceberg.apache.org/docs/nightly/spark-procedures/#remove_orphan_files
@@ -1371,6 +1376,28 @@ def to_polars(self) -> pl.LazyFrame: | |||
|
|||
return pl.scan_iceberg(self) | |||
|
|||
def delete_orphaned_files(self, older_than: Optional[timedelta] = timedelta(days=3), dry_run: bool = False) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we should always provide an older_than
arg. this protects the orphan file deletion job from deleting recently created files that is currently waiting to be committed.
|
||
return _all_known_files | ||
|
||
def orphaned_files(self, location: str, older_than: Optional[timedelta] = timedelta(days=3)) -> Set[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we expose this as a public function given that there's no equivalent from java/spark side? we modeled the inspect
tables based on java's metadata tables.
maybe we can change this to _orphaned_files
for now
_, _, path = _parse_location(location) | ||
selector = FileSelector(path, recursive=True) | ||
# filter to just files as it may return directories, and filter on time | ||
as_of = datetime.now(timezone.utc) - older_than if older_than else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
older_than
should always be present, see the above comment
a meta question, wydt of moving the orphan file function to its own file/namespace, similar to how to use i like the idea of having all the table maintenance functions together, similar to delta table's optimize |
Closes #1200
Rationale for this change
Ability to do more table maintenance from pyiceberg (iceberg-python?)
Are these changes tested?
Added a test!
Are there any user-facing changes?
Yes, this is a new method on the
Table
class.