Skip to content

Custom Credential Refresh Client (fsspec, s3fs) #2018

Open
@snowman2

Description

@snowman2

Feature Request / Improvement

Related #2008

I have a custom credential refreshing process using RefreshableCredentials that is helpful for long running processes. I would like to connect it with pyiceberg in a seamless way.

Option 1 Add support to pass session to S3FileSystem:

Once way to pass this process though is by way of the session parameter of S3FileSystem. However, that is presently not supported. If you follow the pandas method of passing on storage_options to their methods examples:

catalog = load_catalog(
    "default",
    **{
        'type': 'sql',
        "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
        "warehouse": f"s3://{warehouse_path}",
        "py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO",
        "storage_options": {"session": my_botocore_session}
    },
)

Option 2: Hack pyiceberg

Presently, a workaround is to add custom AbstractFileSystem implementation to SCHEME_TO_FS

SCHEME_TO_FS = {
.

import pyiceberg.io.fsspec
from custom_package import CustomFileSystem
from fsspec import AbstractFileSystem
from pyiceberg.typedef import Properties


def _custom_file_system(properties: Properties, scheme: str) -> AbstractFileSystem:
    storage_optons = properties.get("storage_options")
    if not storage_options:
        storage_options = {}
    return CustomFileSystem(**storage_options)
    
pyiceberg.io.fsspec.SCHEME_TO_FS["custom"] = _custom_file_system

Option 3: Add support for custom fsspec backends to pyiceberg

If you follow the pandas method of passing on storage_options to their methods examples:

catalog = load_catalog(
    "default",
    **{
        'type': 'sql',
        "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
        "warehouse": f"custom://{warehouse_path}",
        "py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO",
        "storage_options": {"my_param": 1}
    },
)

Code to add to pyiceberg/io/fsspec.py:

from functools import partial

from fsspec import AbstractFileSystem
from fsspec.registry import known_implementations, filesystem
from pyiceberg.typedef import Properties


def _generic_fsspec_filesystem(properties: Properties, scheme: str) -> AbstractFileSystem:
    storage_optons = properties.get("storage_options")
    if not storage_options:
        storage_options = {}
    return filesystem(scheme, **storage_options)

for scheme in known_implementations:
    if scheme in SCHEME_TO_FS:
        continue
   SCHEME_TO_FS[scheme] = partial(_generic_fsspec_filesystem, scheme=scheme)

If option 1 and/or option 3 sound like something you would like to add to your project, please let me know.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions