Skip to content

Commit 0605ffe

Browse files
itaigilonopcoder
authored andcommitted
Spark Client StorageID support (#8918)
1 parent a20b343 commit 0605ffe

File tree

3 files changed

+20
-10
lines changed

3 files changed

+20
-10
lines changed

clients/spark/build.sbt

+2-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ Compile / PB.targets := Seq(
2727

2828
testFrameworks += new TestFramework("org.scalameter.ScalaMeterFramework")
2929
Test / logBuffered := false
30+
3031
// Uncomment to get accurate benchmarks with just "sbt test".
3132
// Otherwise tell sbt to
3233
// "testOnly io.treeverse.clients.ReadSSTableBenchmark"
@@ -42,7 +43,7 @@ buildInfoPackage := "io.treeverse.clients"
4243
enablePlugins(S3Plugin, BuildInfoPlugin)
4344

4445
libraryDependencies ++= Seq(
45-
"io.lakefs" % "sdk" % "1.0.0",
46+
"io.lakefs" % "sdk" % "1.53.1",
4647
"org.apache.spark" %% "spark-sql" % "3.1.2" % "provided",
4748
"com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf",
4849
"org.apache.hadoop" % "hadoop-aws" % hadoopVersion % "provided",

clients/spark/src/main/scala/io/treeverse/clients/ApiClient.scala

+16-8
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ class ApiClient private (conf: APIConfigurations) {
168168
val storageNamespace = key.storageClientType match {
169169
case StorageClientType.HadoopFS =>
170170
ApiClient
171-
.translateURI(URI.create(repo.getStorageNamespace), getBlockstoreType)
171+
.translateURI(URI.create(repo.getStorageNamespace), getBlockstoreType(repo.getStorageId))
172172
.normalize()
173173
.toString
174174
case StorageClientType.SDKClient => repo.getStorageNamespace
@@ -210,19 +210,27 @@ class ApiClient private (conf: APIConfigurations) {
210210
retryWrapper.wrapWithRetry(prepareGcCommits)
211211
}
212212

213-
def getGarbageCollectionRules(repoName: String): String = {
214-
val getGcRules = new dev.failsafe.function.CheckedSupplier[GarbageCollectionRules]() {
215-
def get(): GarbageCollectionRules = repositoriesApi.getGCRules(repoName).execute()
213+
def getRepository(repoName: String): Repository = {
214+
val getRepo = new dev.failsafe.function.CheckedSupplier[Repository]() {
215+
def get(): Repository = repositoriesApi.getRepository(repoName).execute()
216216
}
217-
val gcRules = retryWrapper.wrapWithRetry(getGcRules)
218-
gcRules.toString()
217+
retryWrapper.wrapWithRetry(getRepo)
219218
}
220219

221-
def getBlockstoreType: String = {
220+
def getBlockstoreType(storageID: String): String = {
222221
val getStorageConfig = new dev.failsafe.function.CheckedSupplier[StorageConfig]() {
223222
def get(): StorageConfig = {
224223
val cfg = configApi.getConfig.execute()
225-
cfg.getStorageConfig
224+
val storageConfigList = cfg.getStorageConfigList
225+
if (storageConfigList.isEmpty || storageConfigList.size() == 1) {
226+
cfg.getStorageConfig
227+
} else {
228+
storageConfigList.asScala
229+
.find(_.getBlockstoreId == storageID)
230+
.getOrElse(
231+
throw new IllegalArgumentException(s"Storage config not found for ID: $storageID")
232+
)
233+
}
226234
}
227235
}
228236
val storageConfig = retryWrapper.wrapWithRetry(getStorageConfig)

clients/spark/src/main/scala/io/treeverse/gc/GarbageCollection.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@ object GarbageCollection {
143143
val apiConf =
144144
APIConfigurations(apiURL, accessKey, secretKey, connectionTimeout, readTimeout, sourceName)
145145
val apiClient = ApiClient.get(apiConf)
146-
val storageType = apiClient.getBlockstoreType
146+
val storageID = apiClient.getRepository(repo).getStorageId
147+
val storageType = apiClient.getBlockstoreType(storageID)
147148
var storageNamespace = apiClient.getStorageNamespace(repo, StorageClientType.HadoopFS)
148149
if (!storageNamespace.endsWith("/")) {
149150
storageNamespace += "/"

0 commit comments

Comments
 (0)