Skip to content

Spark Client StorageID support #8918

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 27, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clients/spark/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ buildInfoPackage := "io.treeverse.clients"
enablePlugins(S3Plugin, BuildInfoPlugin)

libraryDependencies ++= Seq(
"io.lakefs" % "sdk" % "1.0.0",
"io.lakefs" % "sdk" % "1.53.1",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's have this as a separate PR! It's not that I'm afraid, the lakeFS compatibility guarantees means this is a perfectly safe change. But I'd still rather be careful because I'm actually scared by this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd don't mind separating this, but since the CI tests pass here, I'm curious -
What difference will it make?
If merging the lib bump and right after it the MSB changes -
How will it give more confidence?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we squash merges, 2 PRs give a more detailed commit history. That makes it easier to git bisect and/or roll back some changes. But not critical.

"org.apache.spark" %% "spark-sql" % "3.1.2" % "provided",
"com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf",
"org.apache.hadoop" % "hadoop-aws" % hadoopVersion % "provided",
Expand Down
24 changes: 16 additions & 8 deletions clients/spark/src/main/scala/io/treeverse/clients/ApiClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class ApiClient private (conf: APIConfigurations) {
val storageNamespace = key.storageClientType match {
case StorageClientType.HadoopFS =>
ApiClient
.translateURI(URI.create(repo.getStorageNamespace), getBlockstoreType)
.translateURI(URI.create(repo.getStorageNamespace), getBlockstoreType(repo.getStorageId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The storage namespace information is cached in this client.
The blockstore type is not cached and it means that we call the server multiple times.
Suggest to add this information to the cache or cache the configucation at the same level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding a getRepo() call to the server, but it's not here, and only for the GC.

What cache are you suggesting here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was looking at the namespace cache and missed the key to namespace cache - can ignore my comment.

.normalize()
.toString
case StorageClientType.SDKClient => repo.getStorageNamespace
Expand Down Expand Up @@ -210,19 +210,27 @@ class ApiClient private (conf: APIConfigurations) {
retryWrapper.wrapWithRetry(prepareGcCommits)
}

def getGarbageCollectionRules(repoName: String): String = {
val getGcRules = new dev.failsafe.function.CheckedSupplier[GarbageCollectionRules]() {
def get(): GarbageCollectionRules = repositoriesApi.getGCRules(repoName).execute()
def getRepository(repoName: String): Repository = {
val getRepo = new dev.failsafe.function.CheckedSupplier[Repository]() {
def get(): Repository = repositoriesApi.getRepository(repoName).execute()
}
val gcRules = retryWrapper.wrapWithRetry(getGcRules)
gcRules.toString()
retryWrapper.wrapWithRetry(getRepo)
}

def getBlockstoreType: String = {
def getBlockstoreType(storageID: String): String = {
val getStorageConfig = new dev.failsafe.function.CheckedSupplier[StorageConfig]() {
def get(): StorageConfig = {
val cfg = configApi.getConfig.execute()
cfg.getStorageConfig
val storageConfigList = cfg.getStorageConfigList
if (storageConfigList.isEmpty || storageConfigList.size() == 1) {
cfg.getStorageConfig
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could still check storageID when the list has size 1 and specifies a storage ID, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm complying with the other clients (WebUI, lakectl, Everest) -
Were doing the same check.
It's about making the distinction, and using the list only for actual MSB cases.

} else {
Comment on lines +225 to +227
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of a non empty list, we still need to pick the configuration from the list - right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In such case, it'll be returned in the non-list part of the response.
Same handling as in the other clients.

storageConfigList.asScala
.find(_.getBlockstoreId == storageID)
.getOrElse(
throw new IllegalArgumentException(s"Storage config not found for ID: $storageID")
)
}
}
}
val storageConfig = retryWrapper.wrapWithRetry(getStorageConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ object GarbageCollection {
val apiConf =
APIConfigurations(apiURL, accessKey, secretKey, connectionTimeout, readTimeout, sourceName)
val apiClient = ApiClient.get(apiConf)
val storageType = apiClient.getBlockstoreType
val storageID = apiClient.getRepository(repo).getStorageId
val storageType = apiClient.getBlockstoreType(storageID)
var storageNamespace = apiClient.getStorageNamespace(repo, StorageClientType.HadoopFS)
if (!storageNamespace.endsWith("/")) {
storageNamespace += "/"
Expand Down
Loading