Skip to content

Commit 2b254d9

Browse files
author
Sumedh Wale
committed
Add caching of resolved relations in SnappySessionCatalog
- resolving relations especially for cases like external file-based tables having large number of partitions can take a long time due to meta-data gather/process, so added a cache for resolved relations in SnappySessionCatalog - invalidate this cache whenever ExternalCatalog is being invalidated; in addition check for whether the CatalogTable looked up from ExternalCatalog matches the one cached previously and if not then invalidate and re-fetch -- this handles cases where table got invalidated from another session - also add invalidation for the case of inserts into hadoop/hive tables since that will result in new files not present in meta-data and can also result in creation of new partitions - added a dunit test to check the above i.e. correct results after adding new data/partitions from a different session
1 parent 97078ae commit 2b254d9

File tree

18 files changed

+152
-36
lines changed

18 files changed

+152
-36
lines changed

cluster/src/dunit/scala/org/apache/spark/sql/ColumnBatchAndExternalTableDUnitTest.scala

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717
package org.apache.spark.sql
1818

1919

20+
import java.io.File
21+
2022
import com.pivotal.gemfirexd.internal.engine.Misc
2123
import io.snappydata.Property
2224
import io.snappydata.cluster.ClusterManagerTestBase
2325
import io.snappydata.test.dunit.{AvailablePortHelper, SerializableCallable}
2426
import io.snappydata.util.TestUtils
27+
import org.apache.commons.io.FileUtils
2528
import org.scalatest.Assertions
2629

2730
import org.apache.spark.internal.Logging
@@ -411,6 +414,54 @@ class ColumnBatchAndExternalTableDUnitTest(s: String) extends ClusterManagerTest
411414
assert(expected - max <= TestUtils.defaultCores,
412415
s"Lower limit of concurrent tasks = $expected, actual = $max")
413416
}
417+
418+
def testExternalTableMetadataCacheWithInserts(): Unit = {
419+
val dataDir = new File("extern1")
420+
FileUtils.deleteQuietly(dataDir)
421+
dataDir.mkdir()
422+
// create external parquet table and insert some data
423+
val session = new SnappySession(sc)
424+
session.sql("create table extern1 (id long, data string, stat string) using parquet " +
425+
s"options (path '${dataDir.getAbsolutePath}') partitioned by (stat)")
426+
session.sql("insert into extern1 select id, 'data_' || id, 'stat' || (id % 10) " +
427+
"from range(100000)")
428+
429+
// check results
430+
assert(session.sql("select * from extern1 where stat = 'stat1'").collect().length === 10000)
431+
assert(session.sql("select * from extern1 where stat = 'stat2'").collect().length === 10000)
432+
433+
// insert more data from another session
434+
val session2 = new SnappySession(sc)
435+
session2.sql("insert into extern1 select id, 'data_' || id, 'stat' || (id % 10) " +
436+
"from range(10000)")
437+
438+
// check results
439+
assert(session.sql("select * from extern1 where stat = 'stat1'").collect().length === 11000)
440+
assert(session.sql("select * from extern1 where stat = 'stat2'").collect().length === 11000)
441+
assert(session.sql("select * from extern1 where stat = 'stat3'").collect().length === 11000)
442+
assert(session.sql("select * from extern1 where stat = 'stat11'").collect().length === 0)
443+
444+
// insert more data with new partitions
445+
session2.sql("insert into extern1 select id, 'data_' || id, 'stat' || (id % 20) " +
446+
"from range(10000)")
447+
448+
// check results
449+
assert(session.sql("select * from extern1 where stat = 'stat1'").collect().length === 11500)
450+
assert(session.sql("select * from extern1 where stat = 'stat2'").collect().length === 11500)
451+
assert(session.sql("select * from extern1 where stat = 'stat3'").collect().length === 11500)
452+
assert(session.sql("select * from extern1 where stat = 'stat11'").collect().length === 500)
453+
454+
assert(session2.sql("select * from extern1 where stat = 'stat1'").collect().length === 11500)
455+
assert(session2.sql("select * from extern1 where stat = 'stat2'").collect().length === 11500)
456+
assert(session2.sql("select * from extern1 where stat = 'stat3'").collect().length === 11500)
457+
assert(session2.sql("select * from extern1 where stat = 'stat11'").collect().length === 500)
458+
459+
session.sql("drop table extern1")
460+
session.clear()
461+
session2.clear()
462+
463+
FileUtils.deleteDirectory(dataDir)
464+
}
414465
}
415466

416467
case class AirlineData(year: Int, month: Int, dayOfMonth: Int,

cluster/src/dunit/scala/org/apache/spark/sql/SmartConnectorFunctions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ object SmartConnectorFunctions {
7676

7777
val sc = SparkContext.getOrCreate(conf)
7878
val snc = SnappyContext(sc)
79-
snc.snappySession.externalCatalog.invalidateAll()
79+
snc.snappySession.sessionCatalog.invalidateAll()
8080
val sqlContext = new SparkSession(sc).sqlContext
8181
val pw = new PrintWriter(new FileOutputStream(
8282
new File(s"ValidateNWQueries_$tableType.out"), true))

cluster/src/main/scala/org/apache/spark/memory/SnappyUnifiedMemoryManager.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -793,9 +793,10 @@ class SnappyUnifiedMemoryManager private[memory](
793793
numBytes: Long,
794794
memoryMode: MemoryMode): Unit = {
795795
// if UMM lock is already held, then release inline else enqueue and be done with it
796-
if (Thread.holdsLock(this) || !pendingStorageMemoryReleases.offer(
797-
(objectName, numBytes, memoryMode), 15, TimeUnit.SECONDS)) {
798-
synchronized(releaseStorageMemoryForObject_(objectName, numBytes, memoryMode))
796+
if (Thread.holdsLock(this)) synchronized {
797+
releaseStorageMemoryForObject_(objectName, numBytes, memoryMode)
798+
} else {
799+
pendingStorageMemoryReleases.put((objectName, numBytes, memoryMode))
799800
}
800801
}
801802

cluster/src/test/scala/org/apache/spark/sql/execution/SnappyTableMutableAPISuite.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -824,7 +824,8 @@ class SnappyTableMutableAPISuite extends SnappyFunSuite with Logging with Before
824824
val message = intercept[AnalysisException] {
825825
df2.write.deleteFrom("col_table")
826826
}.getMessage
827-
assert(message.contains("column `pk3` cannot be resolved on the right side of the operation."))
827+
assert(message.contains("column `pk3` cannot be resolved on the right side of the operation.")
828+
|| message.contains("WHERE clause of the DELETE FROM statement must have all the key"))
828829
}
829830

830831
test("Bug - SNAP-2157") {
@@ -908,7 +909,8 @@ class SnappyTableMutableAPISuite extends SnappyFunSuite with Logging with Before
908909
}.getMessage
909910

910911
assert(message.contains("DeleteFrom operation requires " +
911-
"key columns(s) or primary key defined on table."))
912+
"key columns(s) or primary key defined on table.") ||
913+
message.contains("WHERE clause of the DELETE FROM statement must have all the key"))
912914
}
913915

914916

@@ -930,7 +932,8 @@ class SnappyTableMutableAPISuite extends SnappyFunSuite with Logging with Before
930932
df2.write.deleteFrom("row_table")
931933
}.getMessage
932934

933-
assert(message.contains("column `pk3` cannot be resolved on the right side of the operation."))
935+
assert(message.contains("column `pk3` cannot be resolved on the right side of the operation.")
936+
|| message.contains("WHERE clause of the DELETE FROM statement must have all the key"))
934937
}
935938

936939
test("Delete From SQL using JDBC: row tables") {

compatibilityTests/src/test/scala/org/apache/spark/sql/hive/TestHiveSnappySession.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ class TestHiveSnappySession(@transient protected val sc: SparkContext,
7272
sharedState.cacheManager.clearCache()
7373
loadedTables.clear()
7474
sessionCatalog.clearTempTables()
75-
sessionCatalog.externalCatalog.invalidateAll()
75+
sessionCatalog.invalidateAll()
7676

7777
FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)).
7878
foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }

core/src/main/scala/io/snappydata/Literals.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,10 @@ object Property extends Enumeration {
168168
"(value in bytes or k/m/g suffixes for unit, min 1k). Default is 4MB.", Some("4m"))
169169

170170
val ResultPersistenceTimeout: SparkValue[Long] = Val[Long](
171-
s"${Constant.SPARK_PREFIX}sql.ResultPersistenceTimeout",
171+
s"${Constant.SPARK_PREFIX}sql.resultPersistenceTimeout",
172172
s"Maximum duration in seconds for which results larger than ${MaxMemoryResultSize.name}" +
173-
s"are held on disk after which they are cleaned up. Default is 3600s (1h).", Some(3600L))
173+
"are held on disk after which they are cleaned up. This is to handle cases where a " +
174+
"client does not consume all the results. Default is 14400 (4h).", Some(14400L))
174175

175176
val DisableHashJoin: SQLValue[Boolean] = SQLVal[Boolean](
176177
s"${Constant.PROPERTY_PREFIX}sql.disableHashJoin",

core/src/main/scala/org/apache/spark/sql/CachedDataFrame.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ class CachedDataFrame(snappySession: SnappySession, queryExecution: QueryExecuti
415415
} catch {
416416
case t: Throwable
417417
if CachedDataFrame.isConnectorCatalogStaleException(t, snappySession) =>
418-
snappySession.externalCatalog.invalidateAll()
418+
snappySession.sessionCatalog.invalidateAll()
419419
SnappySession.clearAllCache()
420420
val execution =
421421
snappySession.getContextObject[() => QueryExecution](SnappySession.ExecutionKey)
@@ -1005,7 +1005,7 @@ object CachedDataFrame
10051005
} catch {
10061006
case t: Throwable
10071007
if CachedDataFrame.isConnectorCatalogStaleException(t, snappySession) =>
1008-
snappySession.externalCatalog.invalidateAll()
1008+
snappySession.sessionCatalog.invalidateAll()
10091009
SnappySession.clearAllCache()
10101010
if (attempts < retryCount) {
10111011
Thread.sleep(attempts*100)

core/src/main/scala/org/apache/spark/sql/SnappySession.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,12 +245,11 @@ class SnappySession(_sc: SparkContext) extends SparkSession(_sc) {
245245
val relations = plan.collect {
246246
case _: Command => hasCommand = true; null
247247
case u: UnresolvedRelation =>
248-
val tableIdent = sessionCatalog.resolveTableIdentifier(u.tableIdentifier)
249-
tableIdent.database.get -> tableIdent.table
248+
sessionCatalog.resolveTableIdentifier(u.tableIdentifier)
250249
}
251-
if (hasCommand) externalCatalog.invalidateAll()
250+
if (hasCommand) sessionCatalog.invalidateAll()
252251
else if (relations.nonEmpty) {
253-
relations.foreach(externalCatalog.invalidate)
252+
relations.foreach(sessionCatalog.invalidate(_))
254253
}
255254
throw e
256255
case _ =>
@@ -332,6 +331,7 @@ class SnappySession(_sc: SparkContext) extends SparkSession(_sc) {
332331
@transient
333332
private var sqlWarnings: SQLWarning = _
334333

334+
private[sql] var catalogInitialized: Boolean = _
335335
private[sql] var hiveInitializing: Boolean = _
336336

337337
private[sql] def isHiveSupportEnabled(v: String): Boolean = Utils.toLowerCase(v) match {
@@ -1700,6 +1700,7 @@ class SnappySession(_sc: SparkContext) extends SparkSession(_sc) {
17001700
plan match {
17011701
case LogicalRelation(rls: RowLevelSecurityRelation, _, _) =>
17021702
rls.enableOrDisableRowLevelSecurity(tableIdent, enableRls)
1703+
sessionCatalog.invalidate(tableIdent)
17031704
externalCatalog.invalidateCaches(tableIdent.database.get -> tableIdent.table :: Nil)
17041705
case _ =>
17051706
throw new AnalysisException("ALTER TABLE enable/disable Row Level Security " +

core/src/main/scala/org/apache/spark/sql/execution/CodegenSparkFallback.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ case class CodegenSparkFallback(var child: SparkPlan,
110110
result
111111
} catch {
112112
case t: Throwable if CachedDataFrame.isConnectorCatalogStaleException(t, session) =>
113-
session.externalCatalog.invalidateAll()
113+
session.sessionCatalog.invalidateAll()
114114
SnappySession.clearAllCache()
115115
throw CachedDataFrame.catalogStaleFailure(t, session)
116116
} finally {
@@ -125,7 +125,7 @@ case class CodegenSparkFallback(var child: SparkPlan,
125125
}
126126

127127
private def handleStaleCatalogException[T](f: SparkPlan => T, plan: SparkPlan, t: Throwable) = {
128-
session.externalCatalog.invalidateAll()
128+
session.sessionCatalog.invalidateAll()
129129
SnappySession.clearAllCache()
130130
// fail immediate for insert/update/delete, else retry entire query
131131
val action = plan.find {

core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ abstract class BaseColumnFormatRelation(
436436
createExternalTableForColumnBatches(externalColumnTableName, conn)
437437
// store schema will miss complex types etc, so use the user-provided one
438438
val session = sqlContext.sparkSession.asInstanceOf[SnappySession]
439-
session.externalCatalog.invalidate(schemaName -> tableName)
439+
session.sessionCatalog.invalidate(TableIdentifier(tableName, Some(schemaName)))
440440
_schema = userSchema
441441
_relationInfoAndRegion = null
442442
}

core/src/main/scala/org/apache/spark/sql/hive/SnappySessionState.scala

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -674,14 +674,16 @@ class SnappySessionState(val snappySession: SnappySession)
674674
* Internal catalog for managing table and database states.
675675
*/
676676
override lazy val catalog: SnappySessionCatalog = {
677-
new SnappySessionCatalog(
677+
val sessionCatalog = new SnappySessionCatalog(
678678
snappySession.sharedState.getExternalCatalogInstance(snappySession),
679679
snappySession,
680680
snappySession.sharedState.globalTempViewManager,
681681
functionResourceLoader,
682682
functionRegistry,
683683
conf,
684684
newHadoopConf())
685+
snappySession.catalogInitialized = true
686+
sessionCatalog
685687
}
686688

687689
protected lazy val wrapperCatalog: SessionCatalogWrapper = {
@@ -701,7 +703,7 @@ class SnappySessionState(val snappySession: SnappySession)
701703
python.ExtractPythonUDFs,
702704
TokenizeSubqueries(snappySession),
703705
EnsureRequirements(conf),
704-
OptimizeSortAndFilePlans(conf),
706+
OptimizeSortAndFilePlans(snappySession),
705707
CollapseCollocatedPlans(snappySession),
706708
CollapseCodegenStages(conf),
707709
InsertCachedPlanFallback(snappySession, topLevel),
@@ -916,14 +918,28 @@ class SnappyAnalyzer(sessionState: SnappySessionState)
916918
* Rule to replace Spark's SortExec plans with an optimized SnappySortExec (in SMJ for now).
917919
* Also sets the "spark.task.cpus" property implicitly for file scans/writes.
918920
*/
919-
case class OptimizeSortAndFilePlans(conf: SnappyConf) extends Rule[SparkPlan] {
921+
case class OptimizeSortAndFilePlans(session: SnappySession) extends Rule[SparkPlan] {
920922
override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
921923
case join@joins.SortMergeJoinExec(_, _, _, _, _, sort@SortExec(_, _, child, _)) =>
922924
join.copy(right = SnappySortExec(sort, child))
923-
case s@(_: FileSourceScanExec | _: HiveTableScanExec | _: InsertIntoHiveTable |
925+
case i: InsertIntoHiveTable =>
926+
val table = i.table.catalogTable
927+
// invalidate meta-data since that can change after the insert
928+
val tableIdentifier = session.sessionCatalog.resolveTableIdentifier(table.identifier)
929+
session.externalCatalog.invalidate(tableIdentifier.database.get -> tableIdentifier.table)
930+
session.sessionState.conf.setDynamicCpusPerTask()
931+
i
932+
case c@ExecutedCommandExec(i: InsertIntoHadoopFsRelationCommand) if i.catalogTable.isDefined =>
933+
val table = i.catalogTable.get
934+
// invalidate meta-data since that can change after the insert
935+
val tableIdentifier = session.sessionCatalog.resolveTableIdentifier(table.identifier)
936+
session.externalCatalog.invalidate(tableIdentifier.database.get -> tableIdentifier.table)
937+
session.sessionState.conf.setDynamicCpusPerTask()
938+
c
939+
case s@(_: FileSourceScanExec | _: HiveTableScanExec |
924940
ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand |
925941
_: CreateHiveTableAsSelectCommand)) =>
926-
conf.setDynamicCpusPerTask()
942+
session.sessionState.conf.setDynamicCpusPerTask()
927943
s
928944
}
929945
}

core/src/main/scala/org/apache/spark/sql/internal/ColumnTableBulkOps.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ object ColumnTableBulkOps {
121121
result
122122
} catch {
123123
case t: Throwable if CachedDataFrame.isConnectorCatalogStaleException(t, session) =>
124-
session.externalCatalog.invalidateAll()
124+
session.sessionCatalog.invalidateAll()
125125
SnappySession.clearAllCache()
126126
// throw failure immediately to keep it consistent with insert/update/delete
127127
throw CachedDataFrame.catalogStaleFailure(t, session)

core/src/main/scala/org/apache/spark/sql/internal/SnappySessionCatalog.scala

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@ import java.sql.SQLException
2323
import scala.util.control.NonFatal
2424

2525
import com.gemstone.gemfire.SystemFailure
26+
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
2627
import com.pivotal.gemfirexd.Attribute
2728
import com.pivotal.gemfirexd.internal.iapi.util.IdUtil
2829
import io.snappydata.Constant
2930
import io.snappydata.sql.catalog.CatalogObjectType.getTableType
3031
import io.snappydata.sql.catalog.SnappyExternalCatalog.{DBTABLE_PROPERTY, getTableWithSchema}
31-
import io.snappydata.sql.catalog.{CatalogObjectType, SnappyExternalCatalog}
32+
import io.snappydata.sql.catalog.{CatalogObjectType, ConnectorExternalCatalog, SnappyExternalCatalog}
3233
import org.apache.hadoop.conf.Configuration
3334
import org.apache.hadoop.fs.Path
3435

@@ -96,6 +97,19 @@ class SnappySessionCatalog(val externalCatalog: SnappyExternalCatalog,
9697
defaultName
9798
}
9899

100+
/** A cache of Spark SQL data source tables that have been accessed. */
101+
// noinspection UnstableApiUsage
102+
protected[sql] val cachedDataSourceTables: LoadingCache[TableIdentifier, LogicalPlan] = {
103+
val loader = new CacheLoader[TableIdentifier, LogicalPlan]() {
104+
override def load(tableName: TableIdentifier): LogicalPlan = {
105+
logDebug(s"Creating new cached data source for $tableName")
106+
val table = externalCatalog.getTable(tableName.database.get, tableName.table)
107+
new FindDataSourceTable(snappySession)(SimpleCatalogRelation(table.database, table))
108+
}
109+
}
110+
CacheBuilder.newBuilder().maximumSize(ConnectorExternalCatalog.cacheSize >> 2).build(loader)
111+
}
112+
99113
final def getCurrentSchema: String = getCurrentDatabase
100114

101115
/**
@@ -270,7 +284,11 @@ class SnappySessionCatalog(val externalCatalog: SnappyExternalCatalog,
270284
final def resolveRelationWithAlias(tableIdent: TableIdentifier,
271285
alias: Option[String] = None): LogicalPlan = {
272286
// resolve the relation right away with alias around
273-
new FindDataSourceTable(snappySession)(lookupRelation(tableIdent, alias))
287+
lookupRelation(tableIdent, alias) match {
288+
case lr: LogicalRelation => lr
289+
case a: SubqueryAlias if a.child.isInstanceOf[LogicalRelation] => a
290+
case r => new FindDataSourceTable(snappySession)(r)
291+
}
274292
}
275293

276294
/**
@@ -878,7 +896,17 @@ class SnappySessionCatalog(val externalCatalog: SnappyExternalCatalog,
878896
getPolicyPlan(table)
879897
} else {
880898
view = None
881-
SimpleCatalogRelation(schemaName, table)
899+
if (DDLUtils.isDatasourceTable(table)) {
900+
val resolved = TableIdentifier(tableName, Some(schemaName))
901+
cachedDataSourceTables(resolved) match {
902+
case lr: LogicalRelation
903+
if lr.catalogTable.isDefined && (lr.catalogTable.get ne table) =>
904+
// refresh since table metadata has changed
905+
cachedDataSourceTables.invalidate(resolved)
906+
cachedDataSourceTables(resolved)
907+
case p => p
908+
}
909+
} else SimpleCatalogRelation(schemaName, table)
882910
}
883911
}
884912
case Some(p) => p
@@ -914,13 +942,23 @@ class SnappySessionCatalog(val externalCatalog: SnappyExternalCatalog,
914942
super.refreshTable(table)
915943
} else {
916944
val resolved = resolveTableIdentifier(table)
917-
externalCatalog.invalidate(resolved.database.get -> resolved.table)
945+
invalidate(resolved)
918946
if (snappySession.enableHiveSupport) {
919947
hiveSessionCatalog.refreshTable(resolved)
920948
}
921949
}
922950
}
923951

952+
def invalidate(resolved: TableIdentifier, sessionOnly: Boolean = false): Unit = {
953+
cachedDataSourceTables.invalidate(resolved)
954+
if (!sessionOnly) externalCatalog.invalidate(resolved.database.get -> resolved.table)
955+
}
956+
957+
def invalidateAll(sessionOnly: Boolean = false): Unit = {
958+
cachedDataSourceTables.invalidateAll()
959+
if (!sessionOnly) externalCatalog.invalidateAll()
960+
}
961+
924962
def getDataSourceRelations[T](tableType: CatalogObjectType.Type): Seq[T] = {
925963
externalCatalog.getAllTables().collect {
926964
case table if tableType == CatalogObjectType.getTableType(table) =>

core/src/main/scala/org/apache/spark/sql/internal/session.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,9 @@ class SnappyConf(@transient val session: SnappySession)
239239
key
240240
}
241241

242-
case GAttr.USERNAME_ATTR | GAttr.USERNAME_ALT_ATTR | GAttr.PASSWORD_ATTR => key
242+
case GAttr.USERNAME_ATTR | GAttr.USERNAME_ALT_ATTR | GAttr.PASSWORD_ATTR =>
243+
if (session.catalogInitialized) session.sessionCatalog.invalidateAll(sessionOnly = true)
244+
key
243245

244246
case _ if key.startsWith("spark.sql.aqp.") =>
245247
session.clearPlanCache()

0 commit comments

Comments
 (0)