Description
Feature Request / Improvement
Related #2008
I have a custom credential refreshing process using RefreshableCredentials
that is helpful for long running processes. I would like to connect it with pyiceberg in a seamless way.
Option 1 Add support to pass session
to S3FileSystem
:
Once way to pass this process though is by way of the session
parameter of S3FileSystem. However, that is presently not supported. If you follow the pandas
method of passing on storage_options
to their methods examples:
catalog = load_catalog(
"default",
**{
'type': 'sql',
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"s3://{warehouse_path}",
"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO",
"storage_options": {"session": my_botocore_session}
},
)
Option 2: Hack pyiceberg
Presently, a workaround is to add custom AbstractFileSystem
implementation to SCHEME_TO_FS
iceberg-python/pyiceberg/io/fsspec.py
Line 223 in da88b8d
import pyiceberg.io.fsspec
from custom_package import CustomFileSystem
from fsspec import AbstractFileSystem
from pyiceberg.typedef import Properties
def _custom_file_system(properties: Properties, scheme: str) -> AbstractFileSystem:
storage_optons = properties.get("storage_options")
if not storage_options:
storage_options = {}
return CustomFileSystem(**storage_options)
pyiceberg.io.fsspec.SCHEME_TO_FS["custom"] = _custom_file_system
Option 3: Add support for custom fsspec backends to pyiceberg
If you follow the pandas
method of passing on storage_options
to their methods examples:
catalog = load_catalog(
"default",
**{
'type': 'sql',
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"custom://{warehouse_path}",
"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO",
"storage_options": {"my_param": 1}
},
)
Code to add to pyiceberg/io/fsspec.py
:
from functools import partial
from fsspec import AbstractFileSystem
from fsspec.registry import known_implementations, filesystem
from pyiceberg.typedef import Properties
def _generic_fsspec_filesystem(properties: Properties, scheme: str) -> AbstractFileSystem:
storage_optons = properties.get("storage_options")
if not storage_options:
storage_options = {}
return filesystem(scheme, **storage_options)
for scheme in known_implementations:
if scheme in SCHEME_TO_FS:
continue
SCHEME_TO_FS[scheme] = partial(_generic_fsspec_filesystem, scheme=scheme)
If option 1 and/or option 3 sound like something you would like to add to your project, please let me know.