diff --git a/README.md b/README.md index e0d689049..31e2019f8 100644 --- a/README.md +++ b/README.md @@ -839,6 +839,24 @@ Offset types available at the moment: Only ingestion jobs support incremental schedule at the moment. Incremental transformations and sinks are planned to be available soon. +### Incremental transformers and sinks (experimental) +In order for a transformer or a sink to use a table from metastore in incremental way, the code should invoke +`metastore.getCurrentBatch()` method instead of `metastore.getTable()`. `metastore.getCurrentBatch()` also works for +normal batch pipelines. + +- When `getCurrentBatch()` is used with daily, weekly or monthly schedule, it returns data for the information date + corresponding to the running job, same as invoking `metastore.getTable("my_table", Some(infoDate), Some(infoDate))`. +- When `getCurrentBatch()` is used with incremental schedule, it returns only latests non-processed data. The offset + management is used to keep tracked of processed data. +- The column `pramen_batchid` is added automatically to output tables of ingested and transformed data in order to track + offsets. The exception is metastore `raw` format, which keeps original files as they are, and so we can't add the + `pramen_batchid` column to such tables. +- The offsets manager updates the offsets only after output of transformers or sinks have succeeded. It does the update + in transactional manner. But if update failed in the middle, duplicates are possible on next runs, so we can say that + Pramen provides 'AT LEAST ONCE' semantics for incremental transformation pipelines. +- Reruns are possible for full days to remove duplicates. But for incremental sinks, such ask Kafka sink duplicates still + might happen. + ### Sinks Sinks define a way data needs to be sent to a target system. Built-in sinks include: - Kafka sink. diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/DataFormat.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/DataFormat.scala index a4e005794..895acec35 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/DataFormat.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/DataFormat.scala @@ -23,6 +23,8 @@ sealed trait DataFormat { def isTransient: Boolean def isLazy: Boolean + + def isRaw: Boolean } object DataFormat { @@ -32,6 +34,8 @@ object DataFormat { override val isTransient: Boolean = false override val isLazy: Boolean = false + + override val isRaw: Boolean = false } case class Delta(query: Query, recordsPerPartition: Option[Long]) extends DataFormat { @@ -40,6 +44,8 @@ object DataFormat { override val isTransient: Boolean = false override val isLazy: Boolean = false + + override val isRaw: Boolean = false } // This format is used for metatables which are just files and can only be used for further sourcing @@ -49,6 +55,8 @@ object DataFormat { override val isTransient: Boolean = false override val isLazy: Boolean = false + + override val isRaw: Boolean = true } // This format is used for tables that exist only for the duration of the process, and is not persisted @@ -58,6 +66,8 @@ object DataFormat { override val isTransient: Boolean = true override val isLazy: Boolean = false + + override val isRaw: Boolean = false } // This format is used for tables are calculated only if requested, and is not persisted @@ -67,6 +77,8 @@ object DataFormat { override val isTransient: Boolean = true override val isLazy: Boolean = true + + override val isRaw: Boolean = false } // This format is used for metatables which do not support persistence, e.g. for sink or transfer jobs @@ -76,5 +88,7 @@ object DataFormat { override val isTransient: Boolean = false override val isLazy: Boolean = false + + override val isRaw: Boolean = false } } diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGeneratorBase.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGeneratorBase.scala index d60a9db65..5a377c536 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGeneratorBase.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlGeneratorBase.scala @@ -216,7 +216,7 @@ abstract class SqlGeneratorBase(sqlConfig: SqlConfig) extends SqlGenerator { } object SqlGeneratorBase { - val MAX_STRING_OFFSET_CHARACTERS = 64 + val MAX_STRING_OFFSET_CHARACTERS = 128 val forbiddenCharacters = ";'\\" val normalCharacters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_." diff --git a/pramen/core/src/main/resources/reference.conf b/pramen/core/src/main/resources/reference.conf index fd97e654b..31ff7873f 100644 --- a/pramen/core/src/main/resources/reference.conf +++ b/pramen/core/src/main/resources/reference.conf @@ -147,6 +147,9 @@ pramen { initial.sourcing.date.weekly.expr = "@runDate - 6" initial.sourcing.date.monthly.expr = "beginOfMonth(@runDate)" + # If true, Pramen always adds 'pramen_batchid' column, even for non-incremental pipelines + always.add.batchid.column = false + # Pramen can stop the Spark session at the end of execution. This can help cleanly finalize running # jobs started from 'spark-submit'. But when running on Databriks this results in the job failure. # Use it with caution. diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContextImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContextImpl.scala index 91970eed0..f659e71b2 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContextImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContextImpl.scala @@ -20,7 +20,7 @@ import com.typesafe.config.Config import org.apache.spark.sql.SparkSession import za.co.absa.pramen.api.MetadataManager import za.co.absa.pramen.core.PramenImpl -import za.co.absa.pramen.core.app.config.InfoDateConfig +import za.co.absa.pramen.core.app.config.{InfoDateConfig, RuntimeConfig} import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.journal.Journal import za.co.absa.pramen.core.lock.{TokenLockFactory, TokenLockFactoryAllow} @@ -57,7 +57,7 @@ object AppContextImpl { val (bookkeeper, tokenLockFactory, journal, metadataManager, closable) = Bookkeeper.fromConfig(appConfig.bookkeepingConfig, appConfig.runtimeConfig, batchId) - val metastore: Metastore = MetastoreImpl.fromConfig(conf, appConfig.infoDateDefaults, bookkeeper, metadataManager, batchId) + val metastore: Metastore = MetastoreImpl.fromConfig(conf, appConfig.runtimeConfig, appConfig.infoDateDefaults, bookkeeper, metadataManager, batchId) PramenImpl.instance.asInstanceOf[PramenImpl].setMetadataManager(metadataManager) PramenImpl.instance.asInstanceOf[PramenImpl].setWorkflowConfig(conf) @@ -83,8 +83,9 @@ object AppContextImpl { val appConfig = AppConfig.fromConfig(conf) val metadataManager = new MetadataManagerNull(isPersistenceEnabled = false) + val runtimeConfig = RuntimeConfig.default - val metastore: Metastore = MetastoreImpl.fromConfig(conf, infoDateConfig, bookkeeper, metadataManager, 0L) + val metastore: Metastore = MetastoreImpl.fromConfig(conf, runtimeConfig, infoDateConfig, bookkeeper, metadataManager, 0L) val appContext = new AppContextImpl( appConfig, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala index 655b8edf1..f546b22b3 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala @@ -42,6 +42,7 @@ case class RuntimeConfig( parallelTasks: Int, stopSparkSession: Boolean, allowEmptyPipeline: Boolean, + alwaysAddBatchIdColumn: Boolean, historicalRunMode: RunMode, sparkAppDescriptionTemplate: Option[String] ) @@ -67,6 +68,7 @@ object RuntimeConfig { val STOP_SPARK_SESSION = "pramen.stop.spark.session" val VERBOSE = "pramen.verbose" val ALLOW_EMPTY_PIPELINE = "pramen.allow.empty.pipeline" + val ALWAYS_ADD_BATCHID_COLUMN = "pramen.always.add.batchid.column" val SPARK_APP_DESCRIPTION_TEMPLATE = "pramen.job.description.template" def fromConfig(conf: Config): RuntimeConfig = { @@ -130,6 +132,7 @@ object RuntimeConfig { } val allowEmptyPipeline = ConfigUtils.getOptionBoolean(conf, ALLOW_EMPTY_PIPELINE).getOrElse(false) + val alwaysAddBatchIdColumn = ConfigUtils.getOptionBoolean(conf, ALWAYS_ADD_BATCHID_COLUMN).getOrElse(false) val sparkAppDescriptionTemplate = ConfigUtils.getOptionString(conf, SPARK_APP_DESCRIPTION_TEMPLATE) RuntimeConfig( @@ -147,8 +150,31 @@ object RuntimeConfig { parallelTasks = parallelTasks, stopSparkSession = conf.getBoolean(STOP_SPARK_SESSION), allowEmptyPipeline, + alwaysAddBatchIdColumn, runMode, sparkAppDescriptionTemplate ) } + + def default: RuntimeConfig = { + RuntimeConfig( + isDryRun = false, + isRerun = false, + runTables = Seq.empty, + isUndercover = false, + useLocks = true, + checkOnlyLateData = false, + checkOnlyNewData = true, + emailIfNoChanges = false, + runDate = LocalDate.now(), + runDateTo = None, + isInverseOrder = false, + parallelTasks = 1, + stopSparkSession = true, + allowEmptyPipeline = false, + alwaysAddBatchIdColumn = false, + historicalRunMode = RunMode.CheckUpdates, + sparkAppDescriptionTemplate = None + ) + } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala index de82d0ea3..8a6fbf9e3 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala @@ -33,7 +33,7 @@ class BookkeeperJdbc(db: Database, batchId: Long) extends BookkeeperBase(true) { import za.co.absa.pramen.core.utils.FutureImplicits._ private val log = LoggerFactory.getLogger(this.getClass) - private val offsetManagement = new OffsetManagerJdbc(db, batchId) + private val offsetManagement = new OffsetManagerCached(new OffsetManagerJdbc(db, batchId)) override val bookkeepingEnabled: Boolean = true diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala index fbbb0d486..684825429 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.bookkeeper import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset import za.co.absa.pramen.api.offset.{DataOffset, OffsetType, OffsetValue} -import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest} +import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest, OffsetCommitRequest} import java.time.LocalDate @@ -79,6 +79,11 @@ trait OffsetManager { */ def commitRerun(request: DataOffsetRequest, minOffset: OffsetValue, maxOffset: OffsetValue): Unit + /** + * Combines both startWriteOffsets() and commitOffsets() into one operation when it is applicable. + */ + def postCommittedRecords(commitRequests: Seq[OffsetCommitRequest]): Unit + /** * Rolls back an offset request */ diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerCached.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerCached.scala new file mode 100644 index 000000000..8da69ce30 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerCached.scala @@ -0,0 +1,103 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.bookkeeper + +import org.slf4j.LoggerFactory +import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset +import za.co.absa.pramen.api.offset.{DataOffset, OffsetType, OffsetValue} +import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest, OffsetCommitRequest} + +import java.time.LocalDate +import scala.collection.mutable + +/** + * The offset manager decorator handles caching or repeated queries. + */ +class OffsetManagerCached(offsetManager: OffsetManager) extends OffsetManager { + private val log = LoggerFactory.getLogger(this.getClass) + private val aggregatedOffsetsCache = new mutable.HashMap[(String, Option[LocalDate]), Option[DataOffsetAggregated]] + + def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset] = { + offsetManager.getOffsets(table, infoDate) + } + + def getUncommittedOffsets(table: String, onlyForInfoDate: Option[LocalDate]): Array[UncommittedOffset] = { + offsetManager.getUncommittedOffsets(table, onlyForInfoDate) + } + + def getMaxInfoDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = synchronized { + val tbl = onlyForInfoDate match { + case Some(date) => s"'$table' for '$date'" + case None => s"'$table'" + } + + if (aggregatedOffsetsCache.contains((table, onlyForInfoDate))) { + val value = aggregatedOffsetsCache((table, onlyForInfoDate)) + log.info(s"Got min/max offsets for $tbl from cache (${renderAggregatedOptionalOffset(value)}).") + value + } else { + val value = offsetManager.getMaxInfoDateAndOffset(table, onlyForInfoDate) + log.info(s"Got min/max offsets for $tbl from the database (${renderAggregatedOptionalOffset(value)}). Saving to cache...") + aggregatedOffsetsCache += (table, onlyForInfoDate) -> value + value + } + } + + def startWriteOffsets(table: String, infoDate: LocalDate, offsetType: OffsetType): DataOffsetRequest = { + offsetManager.startWriteOffsets(table, infoDate, offsetType) + } + + def commitOffsets(request: DataOffsetRequest, minOffset: OffsetValue, maxOffset: OffsetValue): Unit = { + offsetManager.commitOffsets(request, minOffset, maxOffset) + + this.synchronized { + aggregatedOffsetsCache --= aggregatedOffsetsCache.keys.filter(_._1 == request.tableName) + } + } + + def commitRerun(request: DataOffsetRequest, minOffset: OffsetValue, maxOffset: OffsetValue): Unit = { + this.synchronized { + aggregatedOffsetsCache --= aggregatedOffsetsCache.keys.filter(_._1 == request.tableName) + } + + offsetManager.commitRerun(request, minOffset, maxOffset) + } + + def postCommittedRecords(commitRequests: Seq[OffsetCommitRequest]): Unit = { + offsetManager.postCommittedRecords(commitRequests) + + val updatedTables = commitRequests.map(_.table).toSet + this.synchronized { + aggregatedOffsetsCache --= aggregatedOffsetsCache.keys.filter(k => updatedTables.contains(k._1)) + } + } + + def rollbackOffsets(request: DataOffsetRequest): Unit = { + offsetManager.rollbackOffsets(request) + } + + private def renderAggregatedOptionalOffset(offsetsOpt: Option[DataOffsetAggregated]): String = { + offsetsOpt match { + case Some(offsets) => + val minOffsetStr = offsets.minimumOffset.valueString + val maxOffsetStr = offsets.maximumOffset.valueString + s"max_info_date=${offsets.maximumInfoDate} min: '$minOffsetStr', max: $maxOffsetStr" + case None => + "offsets are not defined" + } + } +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala index 9d9ef6f68..6da3d18bc 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala @@ -16,7 +16,6 @@ package za.co.absa.pramen.core.bookkeeper -import org.slf4j.LoggerFactory import slick.jdbc.H2Profile.api._ import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset import za.co.absa.pramen.api.offset.{DataOffset, OffsetType, OffsetValue} @@ -29,8 +28,6 @@ import scala.util.control.NonFatal class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager { import za.co.absa.pramen.core.utils.FutureImplicits._ - private val log = LoggerFactory.getLogger(this.getClass) - override def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset] = { val offsets = getOffsetRecords(table, infoDate) @@ -116,6 +113,29 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager { ).execute() } + override def postCommittedRecords(commitRequests: Seq[OffsetCommitRequest]): Unit = { + val committedAt = Instant.now() + val committedAtMilli = committedAt.toEpochMilli + + val records = commitRequests.map { req => + OffsetRecord(req.table, req.infoDate.toString, req.minOffset.dataType.dataTypeString, req.minOffset.valueString, req.maxOffset.valueString, batchId, req.createdAt.toEpochMilli, Some(committedAtMilli)) + } + + db.run( + OffsetRecords.records ++= records + ).execute() + + commitRequests.map(r => (r.table, r.infoDate)) + .distinct + .foreach { case (table, infoDate) => + db.run( + OffsetRecords.records + .filter(r => r.pramenTableName === table && r.infoDate === infoDate.toString && r.committedAt =!= committedAtMilli) + .delete + ).execute() + } + } + override def rollbackOffsets(request: DataOffsetRequest): Unit = { db.run( OffsetRecords.records diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerUtils.scala new file mode 100644 index 000000000..e863b1744 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerUtils.scala @@ -0,0 +1,43 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.bookkeeper + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions.{col, max, min} +import org.apache.spark.sql.types.StringType +import za.co.absa.pramen.api.offset.{OffsetType, OffsetValue} +import za.co.absa.pramen.api.sql.SqlGeneratorBase + +object OffsetManagerUtils { + def getMinMaxValueFromData(df: DataFrame, offsetColumn: String, offsetType: OffsetType): Option[(OffsetValue, OffsetValue)] = { + if (df.isEmpty) { + None + } else { + val row = df.agg(min(offsetType.getSparkCol(col(offsetColumn)).cast(StringType)), + max(offsetType.getSparkCol(col(offsetColumn))).cast(StringType)) + .collect()(0) + + val minValue = OffsetValue.fromString(offsetType.dataTypeString, row(0).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(0)}")) + val maxValue = OffsetValue.fromString(offsetType.dataTypeString, row(1).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(1)}")) + + SqlGeneratorBase.validateOffsetValue(minValue) + SqlGeneratorBase.validateOffsetValue(maxValue) + + Some(minValue, maxValue) + } + } +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetCommitRequest.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetCommitRequest.scala new file mode 100644 index 000000000..6d775d3f7 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetCommitRequest.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.bookkeeper.model + +import za.co.absa.pramen.api.offset.OffsetValue + +import java.time.{Instant, LocalDate} + +case class OffsetCommitRequest( + table: String, + infoDate: LocalDate, + minOffset: OffsetValue, + maxOffset: OffsetValue, + createdAt: Instant + ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala index 4f7b56fdc..73491d565 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala @@ -19,11 +19,13 @@ package za.co.absa.pramen.core.bookkeeper.model import slick.jdbc.H2Profile.api._ class OffsetRecords(tag: Tag) extends Table[OffsetRecord](tag, "offsets") { - def pramenTableName = column[String]("table_name", O.Length(128)) + def pramenTableName = column[String]("table_name", O.Length(256)) def infoDate = column[String]("info_date", O.Length(20)) def dataType = column[String]("data_type", O.Length(20)) - def minOffset = column[String]("min_offset", O.Length(64)) - def maxOffset = column[String]("max_offset", O.Length(64)) + + def minOffset = column[String]("min_offset", O.Length(128)) + + def maxOffset = column[String]("max_offset", O.Length(128)) def batchId = column[Long]("batch_id") def createdAt = column[Long]("created_at") def committedAt = column[Option[Long]]("committed_at") diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala index 1209c882f..55fa62b5b 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SaveMode} import za.co.absa.pramen.api._ import za.co.absa.pramen.api.status.TaskRunReason -import za.co.absa.pramen.core.metastore.model.MetaTable +import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode, TrackingTable} import za.co.absa.pramen.core.utils.hive.HiveHelper import java.time.LocalDate @@ -54,5 +54,11 @@ trait Metastore { def getStats(tableName: String, infoDate: LocalDate): MetaTableStats - def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean): MetastoreReader + def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, readMode: ReaderMode): MetastoreReader + + def addTrackingTables(trackingTables: Seq[TrackingTable]) + + def commitIncrementalTables(): Unit + + def rollbackIncrementalTables(): Unit } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala index ab2d3a6c9..cc704062f 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala @@ -23,30 +23,34 @@ import org.apache.spark.sql.types.{DateType, StringType, StructField, StructType import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import org.slf4j.LoggerFactory import za.co.absa.pramen.api._ -import za.co.absa.pramen.api.offset.DataOffset +import za.co.absa.pramen.api.offset.{OffsetType, OffsetValue} import za.co.absa.pramen.api.status.TaskRunReason -import za.co.absa.pramen.core.app.config.InfoDateConfig import za.co.absa.pramen.core.app.config.InfoDateConfig.DEFAULT_DATE_FORMAT -import za.co.absa.pramen.core.app.config.RuntimeConfig.UNDERCOVER -import za.co.absa.pramen.core.bookkeeper.Bookkeeper +import za.co.absa.pramen.core.app.config.{InfoDateConfig, RuntimeConfig} +import za.co.absa.pramen.core.bookkeeper.model.OffsetCommitRequest +import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManagerUtils} import za.co.absa.pramen.core.config.Keys -import za.co.absa.pramen.core.metastore.model.MetaTable +import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode, TrackingTable} import za.co.absa.pramen.core.metastore.peristence.{MetastorePersistence, TransientJobManager} import za.co.absa.pramen.core.utils.ConfigUtils import za.co.absa.pramen.core.utils.hive.{HiveFormat, HiveHelper} import java.time.{Instant, LocalDate} +import scala.collection.mutable.ListBuffer class MetastoreImpl(appConfig: Config, tableDefs: Seq[MetaTable], bookkeeper: Bookkeeper, metadata: MetadataManager, batchId: Long, + isRerun: Boolean, skipBookKeepingUpdates: Boolean)(implicit spark: SparkSession) extends Metastore { import MetastoreImpl._ private val log = LoggerFactory.getLogger(this.getClass) + private val globalTrackingTables = new ListBuffer[TrackingTable] + override def getRegisteredTables: Seq[String] = tableDefs.map(_.name) override def getRegisteredMetaTables: Seq[MetaTable] = tableDefs @@ -201,73 +205,27 @@ class MetastoreImpl(appConfig: Config, MetastorePersistence.fromMetaTable(mt, appConfig, batchId = batchId).getStats(infoDate, onlyForCurrentBatchId = false) } - override def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean): MetastoreReader = { - val metastore = this - - new MetastoreReader { - override def getTable(tableName: String, infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = { - validateTable(tableName) - val from = infoDateFrom.orElse(Option(infoDate)) - val to = infoDateTo.orElse(Option(infoDate)) - metastore.getTable(tableName, from, to) - } - - override def getCurrentBatch(tableName: String): DataFrame = { - validateTable(tableName) - if (isIncremental) - metastore.getBatch(tableName, infoDate, None) - else - metastore.getTable(tableName, Option(infoDate), Option(infoDate)) - } - - override def getLatest(tableName: String, until: Option[LocalDate] = None): DataFrame = { - validateTable(tableName) - val untilDate = until.orElse(Option(infoDate)) - metastore.getLatest(tableName, untilDate) - } - - override def getLatestAvailableDate(tableName: String, until: Option[LocalDate] = None): Option[LocalDate] = { - validateTable(tableName) - val untilDate = until.orElse(Option(infoDate)) - bookkeeper.getLatestProcessedDate(tableName, untilDate) - } - - override def isDataAvailable(tableName: String, from: Option[LocalDate], until: Option[LocalDate]): Boolean = { - validateTable(tableName) - val fromDate = from.orElse(Option(infoDate)) - val untilDate = until.orElse(Option(infoDate)) - metastore.isDataAvailable(tableName, fromDate, untilDate) - } - - override def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset] = { - val om = bookkeeper.getOffsetManager - - om.getOffsets(table, infoDate) - } - - override def getTableDef(tableName: String): MetaTableDef = { - validateTable(tableName) // ToDo Consider removing - - MetaTable.getMetaTableDef(metastore.getTableDef(tableName)) - } - - override def getTableRunInfo(tableName: String, infoDate: LocalDate): Option[MetaTableRunInfo] = { - bookkeeper.getLatestDataChunk(tableName, infoDate, infoDate) - .map(chunk => - MetaTableRunInfo(tableName, LocalDate.parse(chunk.infoDate), chunk.inputRecordCount, chunk.outputRecordCount, Instant.ofEpochSecond(chunk.jobStarted), Instant.ofEpochSecond(chunk.jobFinished)) - ) - } + override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, readMode: ReaderMode): MetastoreReader = { + if (readMode == ReaderMode.Batch) + new MetastoreReaderBatchImpl(this, metadata, bookkeeper, tables, infoDate, runReason) + else + new MetastoreReaderIncrementalImpl(this, metadata, bookkeeper, tables, outputTable, infoDate, runReason, readMode, isRerun) + } - override def getRunReason: TaskRunReason = runReason + override def commitIncrementalTables(): Unit = synchronized { + val threadId = Thread.currentThread().getId + val tablesToCommit = globalTrackingTables.filter(_.threadId == threadId) + commitIncremental(tablesToCommit.toSeq) + globalTrackingTables --= tablesToCommit + } - override def metadataManager: MetadataManager = metadata + override def rollbackIncrementalTables(): Unit = synchronized { + val threadId = Thread.currentThread().getId + globalTrackingTables --= globalTrackingTables.filter(_.threadId == threadId) + } - private def validateTable(tableName: String): Unit = { - if (!tables.contains(tableName)) { - throw new TableNotConfigured(s"Attempt accessing non-dependent table: $tableName") - } - } - } + override def addTrackingTables(trackingTables: Seq[TrackingTable]): Unit = synchronized { + globalTrackingTables ++= trackingTables } private[core] def prepareHiveSchema(schema: StructType, mt: MetaTable): StructType = { @@ -279,6 +237,50 @@ class MetastoreImpl(appConfig: Config, StructType(fieldsWithPartitionColumn) } + + private[core] def commitIncremental(trackingTables: Seq[TrackingTable]): Unit = { + if (trackingTables.isEmpty) + return + + val om = bookkeeper.getOffsetManager + val batchIdValue = OffsetValue.IntegralValue(batchId) + + val commitRequests = trackingTables.flatMap { trackingTable => + val tableDef = getTableDef(trackingTable.inputTable) + if (tableDef.format.isRaw) { + val df = getTable(trackingTable.inputTable, Option(trackingTable.infoDate), Option(trackingTable.infoDate)) + getMinMaxOffsetFromMetastoreDf(df, trackingTable.batchIdColumn, trackingTable.currentMaxOffset) match { + case Some((minOffset, maxOffset)) => + log.info(s"Committed offsets for table '${trackingTable.trackingName}' for '${trackingTable.infoDate}' with min='${minOffset.valueString}', max='${maxOffset.valueString}'.") + Some(OffsetCommitRequest( + trackingTable.trackingName, + trackingTable.infoDate, + minOffset, + maxOffset, + trackingTable.createdAt + )) + case None => + log.info(s"No new data processed that requires offsets update of table '${trackingTable.trackingName}' for '${trackingTable.infoDate}'.") + None + } + } else { + val minOffset = trackingTable.currentMinOffset.getOrElse(batchIdValue) + log.info(s"Committed offsets for table '${trackingTable.trackingName}' for '${trackingTable.infoDate}' with min='${minOffset.valueString}', max='$batchId'.") + Some(OffsetCommitRequest( + trackingTable.trackingName, + trackingTable.infoDate, + minOffset, + batchIdValue, + trackingTable.createdAt + )) + } + } + + if (commitRequests.nonEmpty) { + om.postCommittedRecords(commitRequests) + log.info(s"Committed ${commitRequests.length} requests.'") + } + } } object MetastoreImpl { @@ -288,15 +290,20 @@ object MetastoreImpl { val DEFAULT_RECORDS_PER_PARTITION = 500000 def fromConfig(conf: Config, + runtimeConfig: RuntimeConfig, infoDateConfig: InfoDateConfig, bookkeeper: Bookkeeper, metadataManager: MetadataManager, batchId: Long)(implicit spark: SparkSession): MetastoreImpl = { val tableDefs = MetaTable.fromConfig(conf, infoDateConfig, METASTORE_KEY) - val isUndercover = ConfigUtils.getOptionBoolean(conf, UNDERCOVER).getOrElse(false) - - new MetastoreImpl(conf, tableDefs, bookkeeper, metadataManager, batchId, isUndercover) + new MetastoreImpl(conf, + tableDefs, + bookkeeper, + metadataManager, + batchId, + runtimeConfig.isRerun, + runtimeConfig.isUndercover) } private[core] def withSparkConfig(sparkConfig: Map[String, String]) @@ -329,5 +336,9 @@ object MetastoreImpl { } } } -} + private[core] def getMinMaxOffsetFromMetastoreDf(df: DataFrame, batchIdColumn: String, currentMax: Option[OffsetValue]): Option[(OffsetValue, OffsetValue)] = { + val offsetType = if (df.schema.fields.find(_.name == batchIdColumn).get.dataType == StringType) OffsetType.StringType else OffsetType.IntegralType + OffsetManagerUtils.getMinMaxValueFromData(df, batchIdColumn, offsetType) + } +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala new file mode 100644 index 000000000..a44377fd8 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala @@ -0,0 +1,90 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.metastore + +import org.apache.spark.sql.DataFrame +import za.co.absa.pramen.api.offset.DataOffset +import za.co.absa.pramen.api.status.TaskRunReason +import za.co.absa.pramen.api.{MetaTableDef, MetaTableRunInfo, MetadataManager, MetastoreReader} +import za.co.absa.pramen.core.bookkeeper.Bookkeeper +import za.co.absa.pramen.core.metastore.model.MetaTable + +import java.time.{Instant, LocalDate} + +abstract class MetastoreReaderBase(metastore: Metastore, + metadata: MetadataManager, + bookkeeper: Bookkeeper, + tables: Seq[String], + infoDate: LocalDate, + runReason: TaskRunReason) extends MetastoreReader { + override def getTable(tableName: String, infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = { + validateTable(tableName) + val from = infoDateFrom.orElse(Option(infoDate)) + val to = infoDateTo.orElse(Option(infoDate)) + metastore.getTable(tableName, from, to) + } + + override def getLatest(tableName: String, until: Option[LocalDate]): DataFrame = { + validateTable(tableName) + val untilDate = until.orElse(Option(infoDate)) + metastore.getLatest(tableName, untilDate) + } + + override def getLatestAvailableDate(tableName: String, until: Option[LocalDate]): Option[LocalDate] = { + validateTable(tableName) + val untilDate = until.orElse(Option(infoDate)) + bookkeeper.getLatestProcessedDate(tableName, untilDate) + } + + override def isDataAvailable(tableName: String, from: Option[LocalDate], until: Option[LocalDate]): Boolean = { + validateTable(tableName) + val fromDate = from.orElse(Option(infoDate)) + val untilDate = until.orElse(Option(infoDate)) + metastore.isDataAvailable(tableName, fromDate, untilDate) + } + + override def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset] = { + val om = bookkeeper.getOffsetManager + + om.getOffsets(table, infoDate) + } + + override def getTableDef(tableName: String): MetaTableDef = { + validateTable(tableName) + + MetaTable.getMetaTableDef(metastore.getTableDef(tableName)) + } + + override def getTableRunInfo(tableName: String, infoDate: LocalDate): Option[MetaTableRunInfo] = { + bookkeeper.getLatestDataChunk(tableName, infoDate, infoDate) + .map(chunk => + MetaTableRunInfo(tableName, LocalDate.parse(chunk.infoDate), chunk.inputRecordCount, chunk.outputRecordCount, Instant.ofEpochSecond(chunk.jobStarted), Instant.ofEpochSecond(chunk.jobFinished)) + ) + } + + override def getRunReason: TaskRunReason = { + runReason + } + + override def metadataManager: MetadataManager = metadata + + protected def validateTable(tableName: String): Unit = { + if (!tables.contains(tableName)) { + throw new TableNotConfigured(s"Attempt accessing non-dependent table: $tableName") + } + } +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBatchImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBatchImpl.scala new file mode 100644 index 000000000..bd24022d5 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBatchImpl.scala @@ -0,0 +1,39 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.metastore + +import org.apache.spark.sql.DataFrame +import org.slf4j.LoggerFactory +import za.co.absa.pramen.api.MetadataManager +import za.co.absa.pramen.api.status.TaskRunReason +import za.co.absa.pramen.core.bookkeeper.Bookkeeper + +import java.time.LocalDate + +class MetastoreReaderBatchImpl(metastore: Metastore, + metadataManager: MetadataManager, + bookkeeper: Bookkeeper, + tables: Seq[String], + infoDate: LocalDate, + runReason: TaskRunReason) extends MetastoreReaderBase(metastore, metadataManager, bookkeeper, tables, infoDate, runReason) { + private val log = LoggerFactory.getLogger(this.getClass) + + override def getCurrentBatch(tableName: String): DataFrame = { + log.info(s"Getting daily data for table '$tableName' at '$infoDate'...") + metastore.getTable(tableName, Option(infoDate), Option(infoDate)) + } +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderIncremental.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderIncremental.scala new file mode 100644 index 000000000..a37cb5560 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderIncremental.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.metastore + +import za.co.absa.pramen.api.MetastoreReader + +trait MetastoreReaderIncremental extends MetastoreReader { + def commitIncrementalOutputTable(tableName: String, trackingName: String): Unit + + def commitIncrementalStage(): Unit +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderIncrementalImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderIncrementalImpl.scala new file mode 100644 index 000000000..dbb451757 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderIncrementalImpl.scala @@ -0,0 +1,136 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.metastore + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions.col +import org.slf4j.LoggerFactory +import za.co.absa.pramen.api.MetadataManager +import za.co.absa.pramen.api.status.TaskRunReason +import za.co.absa.pramen.core.bookkeeper.Bookkeeper +import za.co.absa.pramen.core.metastore.model.{ReaderMode, TrackingTable} + +import java.time.{Instant, LocalDate} +import scala.collection.mutable.ListBuffer + +class MetastoreReaderIncrementalImpl(metastore: Metastore, + metadataManager: MetadataManager, + bookkeeper: Bookkeeper, + tables: Seq[String], + outputTable: String, + infoDate: LocalDate, + runReason: TaskRunReason, + readMode: ReaderMode, + isRerun: Boolean) + extends MetastoreReaderBase(metastore, metadataManager, bookkeeper, tables, infoDate, runReason) + with MetastoreReaderIncremental { + + private val log = LoggerFactory.getLogger(this.getClass) + private val trackingTables = new ListBuffer[TrackingTable] + + override def getCurrentBatch(tableName: String): DataFrame = { + validateTable(tableName) + if (readMode == ReaderMode.IncrementalPostProcessing && !isRerun) { + log.info(s"Getting the current batch for table '$tableName' at '$infoDate'...") + metastore.getBatch(tableName, infoDate, None) + } else if ((readMode == ReaderMode.IncrementalValidation || readMode == ReaderMode.IncrementalRun) && !isRerun) { + log.info(s"Getting the current incremental chunk for table '$tableName' at '$infoDate'...") + getIncremental(tableName, infoDate) + } else { + log.info(s"Getting daily data for table '$tableName' at '$infoDate'...") + metastore.getTable(tableName, Option(infoDate), Option(infoDate)) + } + } + + override def commitIncrementalOutputTable(tableName: String, trackingName: String): Unit = { + if (readMode != ReaderMode.Batch) { + val om = bookkeeper.getOffsetManager + val minMax = om.getMaxInfoDateAndOffset(trackingName, Option(infoDate)) + log.info(s"Starting offset commit for output table '$trackingName' for '$infoDate'.") + val trackingTable = TrackingTable( + Thread.currentThread().getId, + tableName, + outputTable, + trackingName, + "", + minMax.map(_.minimumOffset), + minMax.map(_.maximumOffset), + infoDate, + Instant.now() + ) + + trackingTables += trackingTable + } + } + + override def commitIncrementalStage(): Unit = { + metastore.addTrackingTables(trackingTables.toSeq) + trackingTables.clear() + } + + private def getIncremental(tableName: String, infoDate: LocalDate): DataFrame = { + val commitChanges = readMode == ReaderMode.IncrementalRun + val trackingName = s"$tableName->$outputTable" + + getIncrementalDf(tableName, trackingName, infoDate, commitChanges) + } + + private def getIncrementalDf(tableName: String, trackingName: String, infoDate: LocalDate, commit: Boolean): DataFrame = { + val tableDef = metastore.getTableDef(tableName) + val om = bookkeeper.getOffsetManager + val tableDf = metastore.getTable(tableName, Option(infoDate), Option(infoDate)) + val offsets = om.getMaxInfoDateAndOffset(trackingName, Option(infoDate)) + + val df = if (tableDf.isEmpty) { + tableDf + } else { + if (!tableDf.schema.exists(_.name == tableDef.batchIdColumn)) { + log.error(tableDf.schema.treeString) + throw new IllegalArgumentException(s"Table '$tableName' does not contain column '${tableDef.batchIdColumn}' needed for incremental processing.") + } + + offsets match { + case Some(values) => + log.info(s"Getting incremental table '$trackingName' for '$infoDate', column '${tableDef.batchIdColumn}' > ${values.maximumOffset.valueString}") + tableDf.filter(col(tableDef.batchIdColumn) > values.maximumOffset.getSparkLit) + case None => + log.info(s"Getting incremental table '$trackingName' for '$infoDate''") + tableDf + } + } + + if (commit && !trackingTables.exists(t => t.trackingName == trackingName && t.infoDate == infoDate)) { + log.info(s"Starting offset commit for table '$trackingName' for '$infoDate'") + + val trackingTable = TrackingTable( + Thread.currentThread().getId, + tableName, + outputTable, + trackingName, + tableDef.batchIdColumn, + offsets.map(_.minimumOffset), + offsets.map(_.maximumOffset), + infoDate, + Instant.now() + ) + + trackingTables += trackingTable + } + + df + } +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetaTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetaTable.scala index f3b3c42f7..eb56be568 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetaTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetaTable.scala @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory import za.co.absa.pramen.api.{DataFormat, MetaTableDef} import za.co.absa.pramen.core.app.config.InfoDateConfig import za.co.absa.pramen.core.config.InfoDateOverride +import za.co.absa.pramen.core.metastore.peristence.MetastorePersistenceRaw.RAW_OFFSET_FIELD_KEY import za.co.absa.pramen.core.utils.{AlgorithmUtils, ConfigUtils} import java.time.LocalDate @@ -137,7 +138,6 @@ object MetaTable { val startDate = infoDateOverride.startDate.getOrElse(defaultStartDate) val trackDays = ConfigUtils.getOptionInt(conf, TRACK_DAYS_KEY).getOrElse(defaultTrackDays) val trackDaysExplicitlySet = conf.hasPath(TRACK_DAYS_KEY) - val batchIdColumn = ConfigUtils.getOptionString(conf, BATCH_ID_COLUMN_KEY).getOrElse(defaultBatchIdColumn) val format = Try { DataFormatParser.fromConfig(conf, appConf) @@ -146,6 +146,12 @@ object MetaTable { case Failure(ex) => throw new IllegalArgumentException(s"Unable to read data format from config for the metastore table: $name", ex) } + val batchIdColumn = if (format.isRaw) { + ConfigUtils.getOptionString(conf, BATCH_ID_COLUMN_KEY).getOrElse(RAW_OFFSET_FIELD_KEY) + } else { + ConfigUtils.getOptionString(conf, BATCH_ID_COLUMN_KEY).getOrElse(defaultBatchIdColumn) + } + val hiveTable = ConfigUtils.getOptionString(conf, HIVE_TABLE_KEY) val hivePath = ConfigUtils.getOptionString(conf, HIVE_PATH_KEY) val hivePreferAddPartition = ConfigUtils.getOptionBoolean(conf, HIVE_PREFER_ADD_PARTITION_KEY).getOrElse(defaultPreferAddPartition) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/ReaderMode.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/ReaderMode.scala new file mode 100644 index 000000000..643d6e7b8 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/ReaderMode.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.metastore.model + +trait ReaderMode + +object ReaderMode { + case object Batch extends ReaderMode + + case object IncrementalValidation extends ReaderMode + + case object IncrementalRun extends ReaderMode + + case object IncrementalPostProcessing extends ReaderMode +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/TrackingTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/TrackingTable.scala new file mode 100644 index 000000000..bc9843806 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/TrackingTable.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.metastore.model + +import za.co.absa.pramen.api.offset.OffsetValue + +import java.time.{Instant, LocalDate} + +case class TrackingTable( + threadId: Long, + inputTable: String, + outputTable: String, + trackingName: String, + batchIdColumn: String, + currentMinOffset: Option[OffsetValue], + currentMaxOffset: Option[OffsetValue], + infoDate: LocalDate, + createdAt: Instant + ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala index b8ccbe095..a3c9016c6 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} import org.slf4j.LoggerFactory import za.co.absa.pramen.core.metastore.MetaTableStats import za.co.absa.pramen.core.metastore.model.HiveConfig -import za.co.absa.pramen.core.metastore.peristence.TransientTableManager.{RAW_OFFSET_FIELD_KEY, RAW_PATH_FIELD_KEY} import za.co.absa.pramen.core.utils.hive.QueryExecutor import za.co.absa.pramen.core.utils.{FsUtils, SparkUtils} @@ -35,6 +34,7 @@ class MetastorePersistenceRaw(path: String, saveModeOpt: Option[SaveMode]) (implicit spark: SparkSession) extends MetastorePersistence { + import MetastorePersistenceRaw._ import spark.implicits._ private val log = LoggerFactory.getLogger(this.getClass) @@ -72,7 +72,7 @@ class MetastorePersistenceRaw(path: String, fsUtilsTrg.createDirectoryRecursive(outputDir) - var totalSize = 0L + var copiedSize = 0L if (files.isEmpty) { log.info("Nohting to save") @@ -84,16 +84,37 @@ class MetastorePersistenceRaw(path: String, log.info(s"Copying file from $srcPath to $trgPath") - totalSize += fsSrc.getContentSummary(srcPath).getLength + copiedSize += fsSrc.getContentSummary(srcPath).getLength fsUtilsTrg.copyFile(srcPath, trgPath) }) } - MetaTableStats( - Option(totalSize), - None, - Some(totalSize) - ) + val stats = if (saveModeOpt.contains(SaveMode.Append)) { + val list = getListOfFilesRange(infoDate, infoDate) + if (list.isEmpty) { + MetaTableStats( + Option(copiedSize), + None, + Some(copiedSize) + ) + } else { + val totalSize = list.map(_.getLen).sum + MetaTableStats( + Option(totalSize), + Some(copiedSize), + Some(totalSize) + ) + } + } else { + MetaTableStats( + Option(copiedSize), + None, + Some(copiedSize) + ) + } + + log.info(s"Stats: ${stats}") + stats } override def getStats(infoDate: LocalDate, onlyForCurrentBatchId: Boolean): MetaTableStats = { @@ -180,3 +201,8 @@ class MetastorePersistenceRaw(path: String, spark.createDataFrame(emptyRDD, schema) } } + +object MetastorePersistenceRaw { + val RAW_PATH_FIELD_KEY = "path" + val RAW_OFFSET_FIELD_KEY = "file_name" +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientTableManager.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientTableManager.scala index edb1ff9ad..9c107ecfe 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientTableManager.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientTableManager.scala @@ -32,9 +32,6 @@ import scala.util.Random object TransientTableManager { private val log = LoggerFactory.getLogger(this.getClass) - val RAW_PATH_FIELD_KEY = "path" - val RAW_OFFSET_FIELD_KEY = "file_name" - private val rawDataframes = new mutable.HashMap[MetastorePartition, DataFrame]() private val cachedDataframes = new mutable.HashMap[MetastorePartition, DataFrame]() private val persistedLocations = new mutable.HashMap[MetastorePartition, String]() diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala index ecbf834a3..36c07e888 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala @@ -610,9 +610,10 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot task.runStatus match { case s: Succeeded => - s.recordCount match { - case Some(recordCount) => renderDifferenceSize(recordCount, s.recordCountOld) - case None => "" + (s.recordCount, s.recordsAppended) match { + case (Some(sizeTotal), Some(sizeAppended)) => renderDifferenceSize(sizeTotal, Some(sizeTotal - sizeAppended)) + case (Some(sizeTotal), None) => renderDifferenceSize(sizeTotal, s.recordCountOld) + case _ => "" } case d: InsufficientData => renderDifferenceSize(d.actual, d.recordCountOld) case _ => "" diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala index 502e56597..a9a757eaf 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala @@ -22,14 +22,13 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession} import za.co.absa.pramen.api.jobdef.SourceTable import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset -import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType, OffsetValue} -import za.co.absa.pramen.api.sql.SqlGeneratorBase +import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType} import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason} -import za.co.absa.pramen.api.{DataFormat, Reason, Source} +import za.co.absa.pramen.api.{Reason, Source} import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest} -import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManager} +import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManager, OffsetManagerUtils} import za.co.absa.pramen.core.metastore.Metastore -import za.co.absa.pramen.core.metastore.model.MetaTable +import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode} import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental} import za.co.absa.pramen.core.utils.SparkUtils._ @@ -131,42 +130,34 @@ class IncrementalIngestionJob(operationDef: OperationDef, jobStarted: Instant, inputRecordCount: Option[Long]): SaveResult = { val isRerun = runReason == TaskRunReason.Rerun - val dfToSave = df.withColumn(outputTable.batchIdColumn, lit(batchId)) - val om = bookkeeper.getOffsetManager - val offsetInfo = source.getOffsetInfo.getOrElse( - throw new IllegalArgumentException(s"Offset type is not configured for the source '$sourceName' outputting to '${outputTable.name}''") + throw new IllegalArgumentException(s"Offset type is not configured for the source '$sourceName' outputting to '${outputTable.name}'") ) validateOffsetColumn(df, offsetInfo) - val req = om.startWriteOffsets(outputTable.name, infoDate, offsetInfo.offsetType) val stats = try { - val statsToReturn = if (isRerun) { - metastore.saveTable(outputTable.name, infoDate, dfToSave, inputRecordCount, saveModeOverride = Some(SaveMode.Overwrite)) - } else { - metastore.saveTable(outputTable.name, infoDate, dfToSave, inputRecordCount, saveModeOverride = Some(SaveMode.Append)) - } + val saveMode = if (isRerun) SaveMode.Overwrite else SaveMode.Append + val statsToReturn = metastore.saveTable(outputTable.name, infoDate, dfToSave, inputRecordCount, saveModeOverride = Some(saveMode)) - val updatedDf = if (outputTable.format.isInstanceOf[DataFormat.Raw]) + val updatedDf = if (outputTable.format.isRaw) df else metastore.getBatch(outputTable.name, infoDate, None) - if (updatedDf.isEmpty) { - om.rollbackOffsets(req) - } else { - val (minOffset, maxOffset) = getMinMaxOffsetFromDf(updatedDf, offsetInfo) - - if (isRerun) { - om.commitRerun(req, minOffset, maxOffset) - } else { - om.commitOffsets(req, minOffset, maxOffset) - } + OffsetManagerUtils.getMinMaxValueFromData(updatedDf, offsetInfo.offsetColumn, offsetInfo.offsetType) match { + case Some((minOffset, maxOffset)) => + if (isRerun) { + om.commitRerun(req, minOffset, maxOffset) + } else { + om.commitOffsets(req, minOffset, maxOffset) + } + case _ => om.rollbackOffsets(req) } + statsToReturn } catch { case ex: Throwable => @@ -178,7 +169,7 @@ class IncrementalIngestionJob(operationDef: OperationDef, source.postProcess( sourceTable.query, outputTable.name, - metastore.getMetastoreReader(Seq(outputTable.name), infoDate, runReason, isIncremental = true), + metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, ReaderMode.IncrementalPostProcessing), infoDate, operationDef.extraOptions ) @@ -267,7 +258,7 @@ class IncrementalIngestionJob(operationDef: OperationDef, throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' not found in the output table '${outputTable.name}'. Cannot update uncommitted offsets.") } - val (newMinOffset, newMaxOffset) = getMinMaxOffsetFromDf(df, offsetInfo) + val (newMinOffset, newMaxOffset) = OffsetManagerUtils.getMinMaxValueFromData(df, offsetInfo.offsetColumn, offsetInfo.offsetType).get log.warn(s"Fixing uncommitted offsets. New offset to commit for ${outputTable.name} at $infoDate: " + s"min offset: ${newMinOffset.valueString}, max offset: ${newMaxOffset.valueString}.") @@ -290,20 +281,6 @@ class IncrementalIngestionJob(operationDef: OperationDef, } } - private[core] def getMinMaxOffsetFromDf(df: DataFrame, offsetInfo: OffsetInfo): (OffsetValue, OffsetValue) = { - val row = df.agg(min(offsetInfo.offsetType.getSparkCol(col(offsetInfo.offsetColumn)).cast(StringType)), - max(offsetInfo.offsetType.getSparkCol(col(offsetInfo.offsetColumn))).cast(StringType)) - .collect()(0) - - val minValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(0).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(0)}")) - val maxValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(1).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(1)}")) - - SqlGeneratorBase.validateOffsetValue(minValue) - SqlGeneratorBase.validateOffsetValue(maxValue) - - (minValue, maxValue) - } - private[core] def validateOffsetColumn(df: DataFrame, offsetInfo: OffsetInfo): Unit = { if (!df.schema.fields.exists(_.name.equalsIgnoreCase(offsetInfo.offsetColumn))) { throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' not found in the output table '${outputTable.name}'.") diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala index f84f09f6c..b73285379 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala @@ -25,7 +25,7 @@ import za.co.absa.pramen.api.{Query, Reason, Source, SourceResult} import za.co.absa.pramen.core.app.config.GeneralConfig.TEMPORARY_DIRECTORY_KEY import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.metastore.Metastore -import za.co.absa.pramen.core.metastore.model.MetaTable +import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode} import za.co.absa.pramen.core.metastore.peristence.TransientTableManager import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategySourcing} import za.co.absa.pramen.core.utils.ConfigUtils @@ -173,7 +173,7 @@ class IngestionJob(operationDef: OperationDef, source.postProcess( sourceTable.query, outputTable.name, - metastore.getMetastoreReader(Seq(outputTable.name), infoDate, runReason, isIncremental = false), + metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, ReaderMode.Batch), infoDate, operationDef.extraOptions ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala index 20d050300..69adfe574 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType import za.co.absa.pramen.api.Reason import za.co.absa.pramen.api.status.{TaskDef, TaskRunReason} +import za.co.absa.pramen.core.metastore.Metastore import za.co.absa.pramen.core.metastore.model.MetaTable import za.co.absa.pramen.core.runner.splitter.ScheduleStrategy @@ -31,6 +32,8 @@ trait Job { val outputTable: MetaTable + val metastore: Metastore + val operation: OperationDef val scheduleStrategy: ScheduleStrategy diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala index dd36fe886..cff40fd5e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala @@ -20,7 +20,7 @@ import com.typesafe.config.Config import org.apache.spark.sql.types.StructType import org.slf4j.{Logger, LoggerFactory} import za.co.absa.pramen.api.jobdef.Schedule -import za.co.absa.pramen.api.status.{DependencyFailure, DependencyWarning, JobType, MetastoreDependency, TaskDef, TaskRunReason} +import za.co.absa.pramen.api.status._ import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.expr.DateExprEvaluator import za.co.absa.pramen.core.metastore.Metastore @@ -32,7 +32,7 @@ import java.time.{Instant, LocalDate} import scala.util.{Failure, Success, Try} abstract class JobBase(operationDef: OperationDef, - metastore: Metastore, + val metastore: Metastore, bookkeeper: Bookkeeper, jobNotificationTargets: Seq[JobNotificationTarget], outputTableDef: MetaTable diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationSplitter.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationSplitter.scala index b6f868c81..fa81c31b5 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationSplitter.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationSplitter.scala @@ -119,7 +119,11 @@ class OperationSplitter(conf: Config, val notificationTargets = operationDef.notificationTargets .map(targetName => getNotificationTarget(conf, targetName, operationDef.operationConf)) - Seq(new TransformationJob(operationDef, metastore, bookkeeper, notificationTargets, outputMetaTable, clazz, transformer)) + val latestInfoDateOpt = if (operationDef.schedule == Schedule.Incremental) { + bookkeeper.getOffsetManager.getMaxInfoDateAndOffset(outputTable, None).map(_.maximumInfoDate) + } else None + + Seq(new TransformationJob(operationDef, metastore, bookkeeper, notificationTargets, outputMetaTable, clazz, transformer, latestInfoDateOpt)) } def createPythonTransformation(operationDef: OperationDef, @@ -139,7 +143,11 @@ class OperationSplitter(conf: Config, val notificationTargets = operationDef.notificationTargets .map(targetName => getNotificationTarget(conf, targetName, operationDef.operationConf)) - Seq(new PythonTransformationJob(operationDef, metastore, bookkeeper, notificationTargets, outputMetaTable, pythonClass, pramenPyConfig, processRunner, databricksClientOpt)) + val latestInfoDateOpt = if (operationDef.schedule == Schedule.Incremental) { + bookkeeper.getOffsetManager.getMaxInfoDateAndOffset(outputTable, None).map(_.maximumInfoDate) + } else None + + Seq(new PythonTransformationJob(operationDef, metastore, bookkeeper, notificationTargets, outputMetaTable, pythonClass, pramenPyConfig, processRunner, databricksClientOpt, latestInfoDateOpt)) } def createSink(operationDef: OperationDef, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala index 2a88bdd97..66d907b46 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala @@ -29,7 +29,7 @@ import za.co.absa.pramen.core.metastore.MetastoreImpl.DEFAULT_RECORDS_PER_PARTIT import za.co.absa.pramen.core.metastore.model.MetaTable import za.co.absa.pramen.core.pipeline.PythonTransformationJob._ import za.co.absa.pramen.core.process.ProcessRunner -import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategySourcing} +import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental, ScheduleStrategySourcing} import za.co.absa.pramen.core.utils.StringUtils.escapeString import java.io.{BufferedWriter, File, FileWriter} @@ -63,7 +63,8 @@ class PythonTransformationJob(operationDef: OperationDef, pythonClass: String, pramenPyCmdConfigOpt: Option[PramenPyCmdConfig], processRunner: ProcessRunner, - databricksClientOpt: Option[DatabricksClient]) + databricksClientOpt: Option[DatabricksClient], + latestInfoDate: Option[LocalDate]) (implicit spark: SparkSession) extends JobBase(operationDef, metastore, bookkeeper,notificationTargets, outputTable) { @@ -71,7 +72,12 @@ class PythonTransformationJob(operationDef: OperationDef, private val minimumRecords: Int = operationDef.extraOptions.getOrElse(MINIMUM_RECORDS_OPTION, "0").toInt - override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategySourcing + override val scheduleStrategy: ScheduleStrategy = { + if (isIncremental) + new ScheduleStrategyIncremental(latestInfoDate, true) + else + new ScheduleStrategySourcing + } override def preRunCheckJob(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = { validateTransformationAlreadyRanCases(infoDate, dependencyWarnings) match { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala index 8e2672ee2..6be1fb27d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala @@ -20,12 +20,12 @@ import com.typesafe.config.Config import org.apache.spark.sql.{DataFrame, SparkSession} import za.co.absa.pramen.api.jobdef.SinkTable import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason} -import za.co.absa.pramen.api.{Reason, Sink} +import za.co.absa.pramen.api.{MetastoreReader, Reason, Sink} import za.co.absa.pramen.core.bookkeeper.Bookkeeper -import za.co.absa.pramen.core.metastore.model.MetaTable -import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore} +import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode} +import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore, MetastoreReaderIncremental} import za.co.absa.pramen.core.pipeline.JobPreRunStatus.Ready -import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategySourcing} +import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental, ScheduleStrategySourcing} import za.co.absa.pramen.core.utils.ConfigUtils import za.co.absa.pramen.core.utils.SparkUtils._ @@ -49,13 +49,20 @@ class SinkJob(operationDef: OperationDef, private val inputTables = operationDef.dependencies.flatMap(_.tables).distinct - override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategySourcing + override val scheduleStrategy: ScheduleStrategy = { + if (isIncremental) + new ScheduleStrategyIncremental(None, true) + else + new ScheduleStrategySourcing + } override def preRunCheckJob(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = { val alreadyRanStatus = preRunTransformationCheck(infoDate, runReason, dependencyWarnings) + val readerMode = if (isIncremental) ReaderMode.IncrementalValidation else ReaderMode.Batch + val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, readerMode) alreadyRanStatus.status match { - case JobPreRunStatus.Ready => JobPreRunResult(Ready, Some(getDataDf(infoDate).count()), dependencyWarnings, alreadyRanStatus.warnings) + case JobPreRunStatus.Ready => JobPreRunResult(Ready, Some(getDataDf(infoDate, metastoreReader).count()), dependencyWarnings, alreadyRanStatus.warnings) case _ => alreadyRanStatus } } @@ -65,7 +72,10 @@ class SinkJob(operationDef: OperationDef, minimumRecordsOpt.foreach(n => log.info(s"Minimum records to send: $n")) - val df = getDataDf(infoDate) + val readerMode = if (isIncremental) ReaderMode.IncrementalValidation else ReaderMode.Batch + val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, readerMode) + + val df = getDataDf(infoDate, metastoreReader) val inputRecordCount = df.count() @@ -86,7 +96,17 @@ class SinkJob(operationDef: OperationDef, } override def run(infoDate: LocalDate, runReason: TaskRunReason, conf: Config): RunResult = { - RunResult(getDataDf(infoDate)) + val readerMode = if (isIncremental) ReaderMode.IncrementalRun else ReaderMode.Batch + val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, readerMode) + + val result = RunResult(getDataDf(infoDate, metastoreReader)) + + if (isIncremental) { + // This ensures offsets are tracked for the input table used as sink's source. + metastoreReader.asInstanceOf[MetastoreReaderIncremental].commitIncrementalStage() + } + + result } def postProcessing(df: DataFrame, @@ -122,17 +142,22 @@ class SinkJob(operationDef: OperationDef, case NonFatal(ex) => throw new IllegalStateException("Unable to connect to the sink.", ex) } + val readerMode = if (isIncremental) ReaderMode.IncrementalRun else ReaderMode.Batch + + val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, readerMode) + try { val sinkResult = sink.send(df, sinkTable.metaTableName, - metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, infoDate, runReason, isIncremental), + metastoreReader, infoDate, sinkTable.options ) - val jobFinished = Instant.now val isTransient = outputTable.format.isTransient + val jobFinished = Instant.now + val tooLongWarnings = getTookTooLongWarnings(jobStarted, jobFinished, sinkTable.warnMaxExecutionTimeSeconds) bookkeeper.setRecordCount(outputTable.name, @@ -146,6 +171,12 @@ class SinkJob(operationDef: OperationDef, isTransient ) + if (isIncremental) { + // This ensures offsets are tracked for all incremental tables used by 'sink.send()'. + metastoreReader.asInstanceOf[MetastoreReaderIncremental].commitIncrementalOutputTable(sinkTable.metaTableName, s"${sinkTable.metaTableName}->$sinkName") + metastoreReader.asInstanceOf[MetastoreReaderIncremental].commitIncrementalStage() + } + val stats = MetaTableStats(Option(sinkResult.recordsSent), None, None) SaveResult(stats, sinkResult.filesSent, sinkResult.hiveTables, sinkResult.warnings ++ tooLongWarnings) } catch { @@ -157,10 +188,14 @@ class SinkJob(operationDef: OperationDef, } } - private def getDataDf(infoDate: LocalDate): DataFrame = { + private def getDataDf(infoDate: LocalDate, metastoreReader: MetastoreReader): DataFrame = { try { - val (from, to) = getInfoDateRange(infoDate, sinkTable.rangeFromExpr, sinkTable.rangeToExpr) - metastore.getTable(sinkTable.metaTableName, Option(from), Option(to)) + if (isIncremental) { + metastoreReader.getCurrentBatch(sinkTable.metaTableName) + } else { + val (from, to) = getInfoDateRange(infoDate, sinkTable.rangeFromExpr, sinkTable.rangeToExpr) + metastore.getTable(sinkTable.metaTableName, Option(from), Option(to)) + } } catch { case NonFatal(ex) => throw new IllegalStateException(s"Unable to read input table ${sinkTable.metaTableName} for $infoDate.", ex) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala index 0cd611e3f..387ade453 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala @@ -21,8 +21,8 @@ import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason} import za.co.absa.pramen.api.{Reason, Transformer} import za.co.absa.pramen.core.bookkeeper.Bookkeeper -import za.co.absa.pramen.core.metastore.Metastore -import za.co.absa.pramen.core.metastore.model.MetaTable +import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode} +import za.co.absa.pramen.core.metastore.{Metastore, MetastoreReaderIncremental} import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental, ScheduleStrategySourcing} import java.time.{Instant, LocalDate} @@ -33,7 +33,8 @@ class TransformationJob(operationDef: OperationDef, notificationTargets: Seq[JobNotificationTarget], outputTable: MetaTable, transformerFactoryClass: String, - transformer: Transformer) + transformer: Transformer, + latestInfoDate: Option[LocalDate]) (implicit spark: SparkSession) extends JobBase(operationDef, metastore, bookkeeper, notificationTargets, outputTable) { @@ -42,9 +43,9 @@ class TransformationJob(operationDef: OperationDef, private val inputTables = operationDef.dependencies.flatMap(_.tables).distinct override val scheduleStrategy: ScheduleStrategy = { - if (isIncremental) - new ScheduleStrategyIncremental(None, true) - else + if (isIncremental) { + new ScheduleStrategyIncremental(latestInfoDate, true) + } else new ScheduleStrategySourcing } @@ -53,11 +54,23 @@ class TransformationJob(operationDef: OperationDef, } override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = { - transformer.validate(metastore.getMetastoreReader(inputTables, infoDate, runReason, isIncremental), infoDate, operationDef.extraOptions) + val readerMode = if (isIncremental) ReaderMode.IncrementalValidation else ReaderMode.Batch + transformer.validate(metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, readerMode), infoDate, operationDef.extraOptions) } override def run(infoDate: LocalDate, runReason: TaskRunReason, conf: Config): RunResult = { - RunResult(transformer.run(metastore.getMetastoreReader(inputTables, infoDate, runReason, isIncremental), infoDate, operationDef.extraOptions)) + val readerMode = if (isIncremental) ReaderMode.IncrementalRun else ReaderMode.Batch + val metastoreReader = metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, readerMode) + val runResult = RunResult(transformer.run(metastoreReader, infoDate, operationDef.extraOptions)) + + if (isIncremental) { + // Output tables for transient transformations should not be tracked since they are calculated on-demand. + if (!outputTable.format.isTransient) + metastoreReader.asInstanceOf[MetastoreReaderIncremental].commitIncrementalOutputTable(outputTable.name, outputTable.name) + metastoreReader.asInstanceOf[MetastoreReaderIncremental].commitIncrementalStage() + } + + runResult } def postProcessing(df: DataFrame, @@ -77,10 +90,13 @@ class TransformationJob(operationDef: OperationDef, else SaveResult(metastore.saveTable(outputTable.name, infoDate, df, None)) + val readerMode = if (isIncremental) ReaderMode.IncrementalPostProcessing else ReaderMode.Batch + val metastoreReaderPostProcess = metastore.getMetastoreReader(inputTables :+ outputTable.name, outputTable.name, infoDate, runReason, readerMode) + try { transformer.postProcess( outputTable.name, - metastore.getMetastoreReader(inputTables :+ outputTable.name, infoDate, runReason, isIncremental), + metastoreReaderPostProcess, infoDate, operationDef.extraOptions ) } catch { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbc.scala index b7134eadd..727d1f68f 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbc.scala @@ -23,7 +23,7 @@ import za.co.absa.pramen.api.Query import za.co.absa.pramen.api.offset.OffsetValue import za.co.absa.pramen.core.config.Keys import za.co.absa.pramen.core.reader.model.TableReaderJdbcConfig -import za.co.absa.pramen.core.utils.{ConfigUtils, JdbcNativeUtils, JdbcSparkUtils, TimeUtils} +import za.co.absa.pramen.core.utils._ import java.time.{Instant, LocalDate} import scala.annotation.tailrec @@ -198,6 +198,10 @@ class TableReaderJdbc(jdbcReaderConfig: TableReaderJdbcConfig, .load() if (jdbcReaderConfig.correctDecimalsInSchema || jdbcReaderConfig.correctDecimalsFixPrecision) { + if (isDataQuery) { + df = SparkUtils.sanitizeDfColumns(df, jdbcReaderConfig.specialCharacters) + } + JdbcSparkUtils.getCorrectedDecimalsSchema(df, jdbcReaderConfig.correctDecimalsFixPrecision).foreach(schema => df = spark .read diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/model/TableReaderJdbcConfig.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/model/TableReaderJdbcConfig.scala index 894ab8a69..d18b4fa57 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/model/TableReaderJdbcConfig.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/model/TableReaderJdbcConfig.scala @@ -21,6 +21,7 @@ import org.slf4j.LoggerFactory import za.co.absa.pramen.api.offset.OffsetInfo import za.co.absa.pramen.api.sql.{QuotingPolicy, SqlColumnType} import za.co.absa.pramen.core.config.Keys +import za.co.absa.pramen.core.config.Keys.SPECIAL_CHARACTERS_IN_COLUMN_NAMES import za.co.absa.pramen.core.utils.ConfigUtils import java.time.ZoneId @@ -38,6 +39,7 @@ case class TableReaderJdbcConfig( correctDecimalsFixPrecision: Boolean = false, enableSchemaMetadata: Boolean = false, useJdbcNative: Boolean = false, + specialCharacters: String = " ", serverTimeZone: ZoneId = ZoneId.systemDefault(), identifierQuotingPolicy: QuotingPolicy = QuotingPolicy.Auto, sqlGeneratorClass: Option[String] = None @@ -98,6 +100,8 @@ object TableReaderJdbcConfig { .map(s => QuotingPolicy.fromString(s)) .getOrElse(QuotingPolicy.Auto) + val specialCharacters = ConfigUtils.getOptionString(workflowConf, SPECIAL_CHARACTERS_IN_COLUMN_NAMES).getOrElse(" ") + TableReaderJdbcConfig( jdbcConfig = JdbcConfig.load(conf, parent), hasInfoDate = conf.getBoolean(HAS_INFO_DATE), @@ -111,6 +115,7 @@ object TableReaderJdbcConfig { correctDecimalsFixPrecision = ConfigUtils.getOptionBoolean(conf, CORRECT_DECIMALS_FIX_PRECISION).getOrElse(false), enableSchemaMetadata = ConfigUtils.getOptionBoolean(conf, ENABLE_SCHEMA_METADATA_KEY).getOrElse(false), useJdbcNative = ConfigUtils.getOptionBoolean(conf, USE_JDBC_NATIVE).getOrElse(false), + specialCharacters, serverTimezone, identifierQuotingPolicy = identifierQuotingPolicy, sqlGeneratorClass = ConfigUtils.getOptionString(conf, SQL_GENERATOR_CLASS_KEY) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala index ffb3ab79e..80f115f6c 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala @@ -18,12 +18,10 @@ package za.co.absa.pramen.core.runner.jobrunner import com.github.yruslan.channel.{Channel, ReadChannel} import org.slf4j.LoggerFactory -import za.co.absa.pramen.api.DataFormat import za.co.absa.pramen.api.status.{RunStatus, TaskResult} import za.co.absa.pramen.core.app.config.RuntimeConfig import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.exceptions.FatalErrorWrapper -import za.co.absa.pramen.core.metastore.model.MetaTable import za.co.absa.pramen.core.metastore.peristence.TransientJobManager import za.co.absa.pramen.core.pipeline.Job import za.co.absa.pramen.core.runner.jobrunner.ConcurrentJobRunner.JobRunResults @@ -91,9 +89,12 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig, completedJobsChannel.send((job, Nil, isSucceeded)) } catch { - case ex: FatalErrorWrapper if ex.cause != null => onFatalException(ex.cause, job, isTransient) - case NonFatal(ex) => onNonFatalException(ex, job, isTransient) - case ex: Throwable => onFatalException(ex, job, isTransient) + case ex: FatalErrorWrapper if ex.cause != null => + onFatalException(ex.cause, job, isTransient) + case NonFatal(ex) => + onNonFatalException(ex, job, isTransient) + case ex: Throwable => + onFatalException(ex, job, isTransient) } } completedJobsChannel.close() @@ -117,7 +118,7 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig, None, applicationId, isTransient, - job.outputTable.format.isInstanceOf[DataFormat.Raw], + job.outputTable.format.isRaw, Nil, Nil, Nil, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala index 30076c571..9c2172b1e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala @@ -20,11 +20,9 @@ import com.github.yruslan.channel.Channel import com.typesafe.config.Config import org.apache.spark.sql.SparkSession import org.slf4j.LoggerFactory -import za.co.absa.pramen.api.DataFormat import za.co.absa.pramen.api.status.{RunStatus, TaskResult} import za.co.absa.pramen.core.app.AppContext import za.co.absa.pramen.core.exceptions.{FatalErrorWrapper, ValidationException} -import za.co.absa.pramen.core.metastore.model.MetaTable import za.co.absa.pramen.core.pipeline.{Job, JobDependency, OperationType} import za.co.absa.pramen.core.runner.jobrunner.ConcurrentJobRunner import za.co.absa.pramen.core.runner.splitter.ScheduleStrategyUtils.evaluateRunDate @@ -132,7 +130,7 @@ class OrchestratorImpl extends Orchestrator { None, applicationId, isTransient, - job.outputTable.format.isInstanceOf[DataFormat.Raw], + job.outputTable.format.isRaw, Nil, Nil, Nil, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index 3f736a9fa..a8e584bf8 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -17,10 +17,11 @@ package za.co.absa.pramen.core.runner.task import com.typesafe.config.Config -import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.{DataFrame, SparkSession} import org.slf4j.LoggerFactory import za.co.absa.pramen.api._ +import za.co.absa.pramen.api.jobdef.Schedule import za.co.absa.pramen.api.status._ import za.co.absa.pramen.core.app.config.RuntimeConfig import za.co.absa.pramen.core.bookkeeper.Bookkeeper @@ -199,7 +200,7 @@ abstract class TaskRunnerBase(conf: Config, Option(runInfo), applicationId, isTransient, - isRawFilesJob = task.job.outputTable.format.isInstanceOf[DataFormat.Raw], + isRawFilesJob = task.job.outputTable.format.isRaw, Nil, Nil, Nil, @@ -219,12 +220,10 @@ abstract class TaskRunnerBase(conf: Config, * @return an instance of TaskResult on the check failure or optional record count on success. */ private[core] def preRunCheck(task: Task, started: Instant): Either[TaskResult, JobPreRunResult] = { - val jobName = task.job.name - val outputTable = MetaTable.getMetaTableDef(task.job.outputTable) val outputTableName = task.job.outputTable.name val options = task.job.operation.extraOptions val isTransient = task.job.outputTable.format.isTransient - val isRawFileBased = task.job.outputTable.format.isInstanceOf[DataFormat.Raw] + val isRawFileBased = task.job.outputTable.format.isRaw Try { task.job.preRunCheck(task.infoDate, task.reason, conf) @@ -278,12 +277,10 @@ abstract class TaskRunnerBase(conf: Config, * @return an instance of TaskResult on validation failure or optional record count on success. */ private[core] def validate(task: Task, started: Instant): Either[TaskResult, JobPreRunResult] = { - val jobName = task.job.name - val outputTable = MetaTable.getMetaTableDef(task.job.outputTable) val outputTableName = task.job.outputTable.name val options = task.job.operation.extraOptions val isTransient = task.job.outputTable.format.isTransient - val isRawFileBased = task.job.outputTable.format.isInstanceOf[DataFormat.Raw] + val isRawFileBased = task.job.outputTable.format.isRaw preRunCheck(task, started) match { case Left(result) => @@ -331,7 +328,7 @@ abstract class TaskRunnerBase(conf: Config, */ private[core] def run(task: Task, started: Instant, validationResult: JobPreRunResult): TaskResult = { val isTransient = task.job.outputTable.format.isTransient - val isRawFileBased = task.job.outputTable.format.isInstanceOf[DataFormat.Raw] + val isRawFileBased = task.job.outputTable.format.isRaw val lock = lockFactory.getLock(getTokenName(task)) val attempt = try { @@ -358,7 +355,16 @@ abstract class TaskRunnerBase(conf: Config, dfWithTimestamp.withColumn(task.job.outputTable.infoDateColumn, lit(Date.valueOf(task.infoDate))) } - val postProcessed = task.job.postProcessing(dfWithInfoDate, task.infoDate, conf) + val needAddBatchId = (runtimeConfig.alwaysAddBatchIdColumn || task.job.operation.schedule == Schedule.Incremental) && !task.job.outputTable.format.isRaw + + val dfWithBatchIdColumn = if (needAddBatchId) { + val batchIdColumn = task.job.outputTable.batchIdColumn + dfWithInfoDate.withColumn(batchIdColumn, lit(pipelineState.getBatchId)) + } else { + dfWithInfoDate + } + + val postProcessed = task.job.postProcessing(dfWithBatchIdColumn, task.infoDate, conf) val dfTransformed = applyFilters( applyTransformations(postProcessed, task.job.operation.schemaTransformations), @@ -382,6 +388,10 @@ abstract class TaskRunnerBase(conf: Config, task.job.save(dfTransformed, task.infoDate, task.reason, conf, started, validationResult.inputRecordsCount) } + if (!isTransient) { + task.job.metastore.commitIncrementalTables() + } + val hiveWarnings = if (task.job.outputTable.hiveTable.nonEmpty) { val recreate = schemaChangesBeforeTransform.nonEmpty || schemaChangesAfterTransform.nonEmpty || task.reason == TaskRunReason.Rerun task.job.createOrRefreshHiveTable(dfTransformed.schema, task.infoDate, recreate) @@ -424,6 +434,7 @@ abstract class TaskRunnerBase(conf: Config, case ex: Throwable => Failure(new FatalErrorWrapper("Fatal error has occurred.", ex)) } finally { if (!isTransient) { + task.job.metastore.rollbackIncrementalTables() lock.release() } } @@ -541,7 +552,7 @@ abstract class TaskRunnerBase(conf: Config, } private[core] def handleSchemaChange(df: DataFrame, table: MetaTable, infoDate: LocalDate): List[SchemaDifference] = { - if (table.format.isInstanceOf[DataFormat.Raw]) { + if (table.format.isRaw) { // Raw tables do need schema check return List.empty[SchemaDifference] } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/sink/LocalCsvSink.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/sink/LocalCsvSink.scala index 55ec309d8..721663173 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/sink/LocalCsvSink.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/sink/LocalCsvSink.scala @@ -24,9 +24,8 @@ import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import org.slf4j.LoggerFactory import za.co.absa.pramen.api.{ExternalChannelFactory, MetastoreReader, Sink, SinkResult} import za.co.absa.pramen.core.sink.LocalCsvSink.OUTPUT_PATH_KEY -import za.co.absa.pramen.core.utils.{FsUtils, LocalFsUtils} +import za.co.absa.pramen.core.utils.FsUtils -import java.io.{BufferedWriter, FileOutputStream, OutputStreamWriter} import java.nio.file.{Files, Paths} import java.time.format.DateTimeFormatter import java.time.{LocalDate, ZonedDateTime} @@ -82,7 +81,7 @@ import java.time.{LocalDate, ZonedDateTime} * * tables = [ * { - * metastore.table = metastore_table + * input.metastore.table = metastore_table * output.path = "/local/csv/path" * * # Date range to read the source table for. By default the job information date is used. diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/source/RawFileSource.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/source/RawFileSource.scala index 910c2709f..1596c070c 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/source/RawFileSource.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/source/RawFileSource.scala @@ -18,10 +18,12 @@ package za.co.absa.pramen.core.source import com.typesafe.config.Config import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.slf4j.LoggerFactory import za.co.absa.pramen.api._ -import za.co.absa.pramen.api.offset.OffsetValue +import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType, OffsetValue} +import za.co.absa.pramen.core.metastore.peristence.MetastorePersistenceRaw.{RAW_OFFSET_FIELD_KEY, RAW_PATH_FIELD_KEY} import za.co.absa.pramen.core.utils.{ConfigUtils, FsUtils} import java.io.FileNotFoundException @@ -91,6 +93,8 @@ class RawFileSource(val sourceConfig: Config, override val config: Config = sourceConfig + override def getOffsetInfo: Option[OffsetInfo] = Some(OffsetInfo(RAW_OFFSET_FIELD_KEY, OffsetType.StringType)) + override def hasInfoDateColumn(query: Query): Boolean = { query match { case Query.Path(pathPattern) => pathPattern.contains("{{") @@ -106,13 +110,40 @@ class RawFileSource(val sourceConfig: Config, } override def getData(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): SourceResult = { - val files = getPaths(query, infoDateBegin, infoDateEnd) - val df = files.map(_.getPath.toString).toDF(PATH_FIELD) - val fileNames = files.map(_.getPath.getName).sorted + val filePaths = getPaths(query, infoDateBegin, infoDateEnd) + val fileNames = filePaths.map(_.getPath.getName).sorted + val list = filePaths.map { path => + (path.getPath.toString, path.getPath.getName) + } + + val df = listOfFilesToDataFrame(list) SourceResult(df, fileNames) } + override def getDataIncremental(query: Query, onlyForInfoDate: Option[LocalDate], offsetFromOpt: Option[OffsetValue], offsetToOpt: Option[OffsetValue], columns: Seq[String]): SourceResult = { + if (onlyForInfoDate.isEmpty) { + throw new IllegalArgumentException("Incremental ingestion of raw files requires an info date to be part of filename pattern.") + } + + val filePaths = getPaths(query, onlyForInfoDate.get, onlyForInfoDate.get) + val list = filePaths.map { path => + (path.getPath.toString, path.getPath.getName) + }.filter { + case (_, fileName) => + (offsetFromOpt, offsetToOpt) match { + case (Some(offsetFrom), Some(offsetTo)) => fileName >= offsetFrom.valueString && fileName <= offsetTo.valueString + case (Some(offsetFrom), None) => fileName > offsetFrom.valueString + case (None, Some(offsetTo)) => fileName <= offsetTo.valueString + case _ => true + } + } + + val df = listOfFilesToDataFrame(list) + + SourceResult(df, list.map(_._2).sorted) + } + @throws[FileNotFoundException] private[source] def getPaths(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Seq[FileStatus] = { query match { @@ -140,13 +171,17 @@ class RawFileSource(val sourceConfig: Config, } } - override def getDataIncremental(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): SourceResult = ??? + private[source] def listOfFilesToDataFrame(list: Seq[(String, String)]): DataFrame = { + if (list.isEmpty) + getEmptyRawDf + else + list.toDF(RAW_PATH_FIELD_KEY, RAW_OFFSET_FIELD_KEY) + } } object RawFileSource extends ExternalChannelFactory[RawFileSource] { private val log = LoggerFactory.getLogger(this.getClass) - val PATH_FIELD = "path" val FILE_PREFIX = "file" val FILE_PATTERN_CASE_SENSITIVE_KEY = "file.pattern.case.sensitive" @@ -231,4 +266,11 @@ object RawFileSource extends ExternalChannelFactory[RawFileSource] { filePattern } } + + private[core] def getEmptyRawDf(implicit spark: SparkSession): DataFrame = { + val schema = StructType(Seq(StructField(RAW_PATH_FIELD_KEY, StringType), StructField(RAW_OFFSET_FIELD_KEY, StringType))) + + val emptyRDD = spark.sparkContext.emptyRDD[Row] + spark.createDataFrame(emptyRDD, schema) + } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/transformers/ConversionTransformer.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/transformers/ConversionTransformer.scala index 1fe9db43d..2870097ad 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/transformers/ConversionTransformer.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/transformers/ConversionTransformer.scala @@ -119,7 +119,7 @@ class ConversionTransformer extends Transformer { val filesDf = metastore.getTable(inputTable, Option(infoDate), Option(infoDate)) - if (!metastore.getTableDef(inputTable).format.isInstanceOf[DataFormat.Raw]) { + if (!metastore.getTableDef(inputTable).format.isRaw) { throw new IllegalArgumentException(s"Table $inputTable should be in 'raw' format so the for each file the metastore returns a file path.") } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/transformers/IdentityTransformer.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/transformers/IdentityTransformer.scala index 6e63ebb61..343cb5e88 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/transformers/IdentityTransformer.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/transformers/IdentityTransformer.scala @@ -22,6 +22,40 @@ import za.co.absa.pramen.core.transformers.IdentityTransformer._ import java.time.LocalDate +/** + * The transformer does not do any actual transformation and just returns the input DataFrame. + * + * It can be used to copy data between metastore tables located in different storages. + * + * The transformer supports incremental processing. + * + * Example usage: + * {{{ + * pramen.operations = [ + * { + * name = "Copy table" + * type = "transformation" + * + * class = "za.co.absa.pramen.core.transformers.IdentityTransformer" + * schedule.type = "daily" + * + * dependencies = [ + * { + * tables = [ table_from ] + * date.from = "@infoDate" + * } + * ] + * + * option { + * input.table = "table_from" + * empty.allowed = true + * } + * + * output.table = "table_to" + * } + * ] + * }}} + */ class IdentityTransformer extends Transformer { override def validate(metastore: MetastoreReader, infoDate: LocalDate, options: Map[String, String]): Reason = { if (!options.contains(INPUT_TABLE_KEY) && !options.contains(INPUT_TABLE_LEGACY_KEY)) { @@ -54,4 +88,4 @@ object IdentityTransformer { val INPUT_TABLE_KEY = "input.table" val INPUT_TABLE_LEGACY_KEY = "table" val EMPTY_ALLOWED_KEY = "empty.allowed" -} \ No newline at end of file +} diff --git a/pramen/core/src/test/resources/test/config/incremental_pipeline.conf b/pramen/core/src/test/resources/test/config/incremental_pipeline.conf index 350697cd5..5c7281f7e 100644 --- a/pramen/core/src/test/resources/test/config/incremental_pipeline.conf +++ b/pramen/core/src/test/resources/test/config/incremental_pipeline.conf @@ -84,6 +84,7 @@ pramen.operations = [ { name = "Running a transformer" type = "transformation" + disabled = ${transformer.disabled} class = "za.co.absa.pramen.core.transformers.IdentityTransformer" schedule.type = ${transformer.schedule} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala index 542321914..21120a059 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala @@ -37,6 +37,7 @@ object RuntimeConfigFactory { parallelTasks: Int = 1, stopSparkSession: Boolean = false, allowEmptyPipeline: Boolean = false, + alwaysAddBatchIdColumn: Boolean = false, historicalRunMode: RunMode = RunMode.CheckUpdates, sparkAppDescriptionTemplate: Option[String] = None): RuntimeConfig = { RuntimeConfig(isDryRun, @@ -53,6 +54,7 @@ object RuntimeConfigFactory { parallelTasks, stopSparkSession, allowEmptyPipeline, + alwaysAddBatchIdColumn, historicalRunMode, sparkAppDescriptionTemplate) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineDeltaLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineDeltaLongSuite.scala index dffd3f161..ea1f49fb4 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineDeltaLongSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineDeltaLongSuite.scala @@ -109,5 +109,9 @@ class IncrementalPipelineDeltaLongSuite extends IncrementalPipelineLongFixture { "offsets cross info days" in { testOffsetCrossInfoDateEdgeCase(format) } + + "transformer picks up doubly ingested offsets" in { + testTransformerPicksUpFromDoubleIngestedData(format) + } } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala index 14fe1f0eb..86a708e80 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala @@ -828,7 +828,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec val actualTable2 = dfTable2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") compareText(actualTable1, expectedOffsetOnlyAll) - compareText(actualTable2, expectedOffsetOnly2) // ToDo This logic is to be changed when incremental transformations are supported + compareText(actualTable2, expectedOffsetOnlyAll) val batchIds = dfTable1.select(BATCH_ID_COLUMN).distinct().collect() @@ -1207,11 +1207,84 @@ class IncrementalPipelineLongFixture extends AnyWordSpec succeed } + def testTransformerPicksUpFromDoubleIngestedData(metastoreFormat: String): Assertion = { + val csv1DataStr = s"id,name,info_date\n1,John,$infoDate\n2,Jack,$infoDate\n" + val csv2DataStr = s"id,name,info_date\n3,Jill,$infoDate\n4,Mary,$infoDate\n" + val csv3DataStr = s"id,name,info_date\n5,Jane,$infoDate\n6,Kate,$infoDate\n" + + val expectedStr1: String = + """{"id":1,"name":"John"} + |{"id":2,"name":"Jack"} + |""".stripMargin + + val expectedStr2: String = + """{"id":1,"name":"John"} + |{"id":2,"name":"Jack"} + |{"id":3,"name":"Jill"} + |{"id":4,"name":"Mary"} + |""".stripMargin + + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + val path3 = new Path(tempDir, new Path("landing", "landing_file3.csv")) + + val table1Path = new Path(tempDir, "table1") + val table2Path = new Path(tempDir, "table2") + + fsUtils.writeFile(path1, csv1DataStr) + val conf1 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema) + val exitCode1 = AppRunner.runPipeline(conf1) + assert(exitCode1 == 0) + + fsUtils.writeFile(path2, csv2DataStr) + val conf2 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema, isTransformerDisabled = true) + val exitCode2 = AppRunner.runPipeline(conf2) + assert(exitCode2 == 0) + + val dfTable1Before = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2Before = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1Before, expectedStr2) + compareText(actualTable2Before, expectedStr1) + + fsUtils.writeFile(path3, csv3DataStr) + + val conf3 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema) + val exitCode3 = AppRunner.runPipeline(conf3) + assert(exitCode3 == 0) + + val dfTable1After = spark.read.parquet(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2After = spark.read.parquet(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + + val batchIds = dfTable1After.select(BATCH_ID_COLUMN).distinct().collect() + + assert(batchIds.length == 3) + + val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1After, expectedWithInfoDateAll) + compareText(actualTable2After, expectedWithInfoDateAll) + + val om = new OffsetManagerJdbc(pramenDb.db, 123L) + + val offsets = om.getOffsets("table1->table2", infoDate).map(_.asInstanceOf[CommittedOffset]) + assert(offsets.length == 1) + } + succeed + } + def getConfig(basePath: String, metastoreFormat: String, isRerun: Boolean = false, useDataFrame: Boolean = false, isTransformerIncremental: Boolean = true, + isTransformerDisabled: Boolean = false, isHistoricalRun: Boolean = false, historyRunMode: String = "force", inferSchema: Boolean = true, @@ -1237,6 +1310,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec |pramen.runtime.is.rerun = $isRerun |pramen.current.date = "$useInfoDate" |transformer.schedule = "$transformerSchedule" + |transformer.disabled = "$isTransformerDisabled" |infer.schema = $inferSchema |$historicalConfigStr |has.information.date.column = $hasInfoDate diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineParquetLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineParquetLongSuite.scala index b8226be3f..d1ab79b13 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineParquetLongSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineParquetLongSuite.scala @@ -109,5 +109,9 @@ class IncrementalPipelineParquetLongSuite extends IncrementalPipelineLongFixture "offsets cross info days" in { testOffsetCrossInfoDateEdgeCase(format) } + + "transformer picks up doubly ingested offsets" in { + testTransformerPicksUpFromDoubleIngestedData(format) + } } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala index 20a88c039..d120557eb 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala @@ -24,17 +24,18 @@ import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.api.jobdef.Schedule import za.co.absa.pramen.api.status.TaskRunReason import za.co.absa.pramen.api.{CachePolicy, DataFormat} -import za.co.absa.pramen.core.OperationDefFactory import za.co.absa.pramen.core.app.config.InfoDateConfig import za.co.absa.pramen.core.base.SparkTestBase import za.co.absa.pramen.core.fixtures.{TempDirFixture, TextComparisonFixture} import za.co.absa.pramen.core.metadata.MetadataManagerNull +import za.co.absa.pramen.core.metastore.model.ReaderMode import za.co.absa.pramen.core.metastore.peristence.TransientJobManager import za.co.absa.pramen.core.mocks.bookkeeper.SyncBookkeeperMock import za.co.absa.pramen.core.mocks.job.JobSpy import za.co.absa.pramen.core.mocks.utils.hive.QueryExecutorMock import za.co.absa.pramen.core.utils.SparkUtils import za.co.absa.pramen.core.utils.hive.{HiveHelperSql, HiveQueryTemplates, QueryExecutorSpark} +import za.co.absa.pramen.core.{OperationDefFactory, RuntimeConfigFactory} import java.time.LocalDate @@ -390,7 +391,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table1" :: Nil, infoDate, TaskRunReason.New, isIncremental = false) + val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, ReaderMode.Batch) val df1 = reader.getTable("table1", Some(infoDate), Some(infoDate)) @@ -404,7 +405,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table2" :: Nil, infoDate, TaskRunReason.New, isIncremental = false) + val reader = m.getMetastoreReader("table2" :: Nil, "output_table", infoDate, TaskRunReason.New, ReaderMode.Batch) val ex = intercept[TableNotConfigured] { reader.getTable("table1", Some(infoDate), Some(infoDate)) @@ -420,7 +421,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table1" :: Nil, infoDate, TaskRunReason.New, isIncremental = false) + val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, ReaderMode.Batch) val runInfo1 = reader.getTableRunInfo("table1", infoDate) val runInfo2 = reader.getTableRunInfo("table1", infoDate.plusDays(1)) @@ -438,7 +439,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table1" :: Nil, infoDate, TaskRunReason.New, isIncremental = false) + val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, ReaderMode.Batch) val metadataManager = reader.metadataManager metadataManager.setMetadata("table1", infoDate, "key1", "value1") @@ -456,7 +457,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) m.saveTable("table1", infoDate.plusDays(1), getDf) - val reader = m.getMetastoreReader("table1" :: "table2" :: Nil, infoDate.plusDays(10), TaskRunReason.New, isIncremental = false) + val reader = m.getMetastoreReader("table1" :: "table2" :: Nil, "output_table", infoDate.plusDays(10), TaskRunReason.New, ReaderMode.Batch) val date1 = reader.getLatestAvailableDate("table1") val date2 = reader.getLatestAvailableDate("table1", Some(infoDate)) @@ -579,9 +580,10 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF operationDef = OperationDefFactory.getDummyOperationDef(schedule = schedule)) ) + val runtimeConfig = RuntimeConfigFactory.getDummyRuntimeConfig().copy(isUndercover = undercover) val infoDateConfig = InfoDateConfig.fromConfig(conf) val bk = new SyncBookkeeperMock val mm = new MetadataManagerNull(isPersistenceEnabled = false) - (MetastoreImpl.fromConfig(conf, infoDateConfig, bk, mm, 0L), bk) + (MetastoreImpl.fromConfig(conf, runtimeConfig, infoDateConfig, bk, mm, 0L), bk) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/MetaTableSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/MetaTableSuite.scala index 016350017..a894eac37 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/MetaTableSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/MetaTableSuite.scala @@ -174,10 +174,33 @@ class MetaTableSuite extends AnyWordSpec { assert(metaTable.infoDateColumn == "INFO_DATE") assert(metaTable.infoDateFormat == "dd-MM-yyyy") assert(metaTable.infoDateStart.toString == "2020-01-31") + assert(metaTable.batchIdColumn == "batchid") assert(metaTable.sparkConfig("key1") == "value1") assert(metaTable.saveModeOpt.contains(SaveMode.Append)) } + "load a metatable definition for raw format" in { + val conf = ConfigFactory.parseString( + """ + |name = my_table + |format = raw + |path = /a/b/c + |""".stripMargin) + + val defaultHiveConfig = HiveDefaultConfig.getNullConfig + + val metaTable = MetaTable.fromConfigSingleEntity(conf, conf, "INFO_DATE", "dd-MM-yyyy", defaultPartitionByInfoDate = true, LocalDate.parse("2020-01-31"), 0, defaultHiveConfig, defaultPreferAddPartition = true, "batchid") + + assert(metaTable.name == "my_table") + assert(metaTable.format.name == "raw") + assert(metaTable.hiveTable.isEmpty) + assert(metaTable.hivePath.isEmpty) + assert(metaTable.infoDateColumn == "INFO_DATE") + assert(metaTable.infoDateFormat == "dd-MM-yyyy") + assert(metaTable.infoDateStart.toString == "2020-01-31") + assert(metaTable.batchIdColumn == "file_name") + } + "load a metatable definition with hive table defined" in { val conf = ConfigFactory.parseString( """ diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala index 359c77d31..8f920784e 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala @@ -21,12 +21,13 @@ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType import za.co.absa.pramen.api.status.{TaskDef, TaskRunReason} import za.co.absa.pramen.api.{DataFormat, Reason} -import za.co.absa.pramen.core.{OperationDefFactory, TaskDefFactory} -import za.co.absa.pramen.core.metastore.MetaTableStats import za.co.absa.pramen.core.metastore.model.MetaTable +import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore} import za.co.absa.pramen.core.mocks.MetaTableFactory.getDummyMetaTable +import za.co.absa.pramen.core.mocks.metastore.MetastoreSpy import za.co.absa.pramen.core.pipeline._ import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategySourcing} +import za.co.absa.pramen.core.{OperationDefFactory, TaskDefFactory} import java.time.{Instant, LocalDate} @@ -59,6 +60,8 @@ class JobSpy(jobName: String = "Dummy Job", override val outputTable: MetaTable = getDummyMetaTable(outputTableIn, format = outputTableFormat, hiveTable = hiveTable) + override val metastore: Metastore = new MetastoreSpy() + override val operation: OperationDef = operationDef override val scheduleStrategy: ScheduleStrategy = scheduleStrategyIn diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala index 88e0d117c..d9858bf5a 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala @@ -22,8 +22,8 @@ import za.co.absa.pramen.api.offset.DataOffset import za.co.absa.pramen.api.status.TaskRunReason import za.co.absa.pramen.api.{MetaTableDef, MetaTableRunInfo, MetadataManager, MetastoreReader} import za.co.absa.pramen.core.metadata.MetadataManagerNull -import za.co.absa.pramen.core.metastore.model.MetaTable -import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore, TableNotConfigured} +import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode, TrackingTable} +import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore, MetastoreReaderIncremental, TableNotConfigured} import za.co.absa.pramen.core.mocks.MetaTableFactory import za.co.absa.pramen.core.mocks.utils.hive.QueryExecutorMock import za.co.absa.pramen.core.utils.hive.{HiveHelper, HiveHelperSql, HiveQueryTemplates} @@ -105,10 +105,10 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"), stats } - override def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, taskRunReason: TaskRunReason, isIncremental: Boolean): MetastoreReader = { + override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, taskRunReason: TaskRunReason, readMode: ReaderMode): MetastoreReader = { val metastore = this - new MetastoreReader { + new MetastoreReaderIncremental { override def getTable(tableName: String, infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = { validateTable(tableName) val from = infoDateFrom.orElse(Option(infoDate)) @@ -170,6 +170,16 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"), throw new TableNotConfigured(s"Attempt accessing non-dependent table: $tableName") } } + + override def commitIncrementalOutputTable(tableName: String, trackingName: String): Unit = {} + + override def commitIncrementalStage(): Unit = {} } } + + override def addTrackingTables(trackingTables: Seq[TrackingTable]): Unit = {} + + override def commitIncrementalTables(): Unit = {} + + override def rollbackIncrementalTables(): Unit = {} } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJobSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJobSuite.scala index 763299783..f68fedea5 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJobSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJobSuite.scala @@ -611,7 +611,8 @@ class PythonTransformationJobSuite extends AnyWordSpec with BeforeAndAfterAll wi "python_class", pramenPyConfigOpt, processRunner, - databricksClientOpt + databricksClientOpt, + None ) (job, processRunner, pramenPyConfigOpt, databricksClientOpt) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransformationJobSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransformationJobSuite.scala index 3b0531a82..da84de3d7 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransformationJobSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransformationJobSuite.scala @@ -137,7 +137,7 @@ class TransformationJobSuite extends AnyWordSpec with SparkTestBase { val outputTable = MetaTableFactory.getDummyMetaTable(name = "table1") - (new TransformationJob(operation, metastore, bk, Nil, outputTable, "dummy_class", transformer), metastore) + (new TransformationJob(operation, metastore, bk, Nil, outputTable, "dummy_class", transformer, None), metastore) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/source/RawFileSourceSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/source/RawFileSourceSuite.scala index 482b340ca..2fc497189 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/source/RawFileSourceSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/source/RawFileSourceSuite.scala @@ -20,10 +20,12 @@ import com.typesafe.config.{Config, ConfigFactory} import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterAll import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType, OffsetValue} import za.co.absa.pramen.api.{Query, Source} import za.co.absa.pramen.core.ExternalChannelFactoryReflect import za.co.absa.pramen.core.base.SparkTestBase import za.co.absa.pramen.core.fixtures.TempDirFixture +import za.co.absa.pramen.core.metastore.peristence.MetastorePersistenceRaw.RAW_OFFSET_FIELD_KEY import za.co.absa.pramen.core.source.RawFileSource.FILE_PATTERN_CASE_SENSITIVE_KEY import za.co.absa.pramen.core.utils.{FsUtils, LocalFsUtils} @@ -292,6 +294,79 @@ class RawFileSourceSuite extends AnyWordSpec with BeforeAndAfterAll with TempDir } } + "getOffsetInfo" should { + "always return the same column name and type" in { + val source = new RawFileSource(emptyConfig, null)(spark) + + assert(source.getOffsetInfo.contains(OffsetInfo(RAW_OFFSET_FIELD_KEY, OffsetType.StringType))) + } + } + + "getDataIncremental" should { + val conf = ConfigFactory.parseString( + s"$FILE_PATTERN_CASE_SENSITIVE_KEY = false" + ) + val source = new RawFileSource(conf, null)(spark) + + "work with from offset" in { + val files = source.getDataIncremental(Query.Path(filesPattern.toString), Some(infoDate), Some(OffsetValue.StringValue("FILE_TEST_2022-02-18_A.dat")), None, Seq.empty) + + val fileNames = files.filesRead + + assert(fileNames.length == 2) + assert(fileNames.contains("FILE_TEST_2022-02-18_B.dat")) + assert(fileNames.contains("FILE_TEST_2022-02-18_C.DAT")) + } + + "work with to offset" in { + val files = source.getDataIncremental(Query.Path(filesPattern.toString), Some(infoDate), None, Some(OffsetValue.StringValue("FILE_TEST_2022-02-18_B.dat")), Seq.empty) + + val fileNames = files.filesRead + + assert(fileNames.length == 2) + assert(fileNames.contains("FILE_TEST_2022-02-18_A.dat")) + assert(fileNames.contains("FILE_TEST_2022-02-18_B.dat")) + } + + "work from and to offset" in { + val fileB = Some(OffsetValue.StringValue("FILE_TEST_2022-02-18_B.dat")) + val files = source.getDataIncremental(Query.Path(filesPattern.toString), Some(infoDate), fileB, fileB, Seq.empty) + + val fileNames = files.filesRead + + assert(fileNames.length == 1) + assert(fileNames.contains("FILE_TEST_2022-02-18_B.dat")) + } + + "work from and to offset and an empty data frame" in { + val fileB = Some(OffsetValue.StringValue("NONEXISTENT_FILE.dat")) + val files = source.getDataIncremental(Query.Path(filesPattern.toString), Some(infoDate), fileB, fileB, Seq.empty) + + val fileNames = files.filesRead + + assert(fileNames.isEmpty) + } + + "work when no offsets are specified" in { + val files = source.getDataIncremental(Query.Path(filesPattern.toString), Some(infoDate), None, None, Seq.empty) + + val fileNames = files.filesRead + + assert(fileNames.length == 3) + assert(fileNames.contains("FILE_TEST_2022-02-18_A.dat")) + assert(fileNames.contains("FILE_TEST_2022-02-18_B.dat")) + assert(fileNames.contains("FILE_TEST_2022-02-18_C.DAT")) + } + + "throw an exception when info date is not passed" in { + val ex = intercept[IllegalArgumentException] { + source.getDataIncremental(Query.Path(filesPattern.toString), None, None, None, Seq.empty) + } + + assert(ex.getMessage == "Incremental ingestion of raw files requires an info date to be part of filename pattern.") + } + } + "getPaths" should { "work for a directory" in { val source = new RawFileSource(emptyConfig, null)(spark) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala index e0896d60b..a09ed347a 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala @@ -20,7 +20,7 @@ import org.scalatest.wordspec.AnyWordSpec import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import za.co.absa.pramen.api.offset.DataOffset.{CommittedOffset, UncommittedOffset} import za.co.absa.pramen.api.offset.{OffsetType, OffsetValue} -import za.co.absa.pramen.core.bookkeeper.{OffsetManager, OffsetManagerJdbc} +import za.co.absa.pramen.core.bookkeeper.{OffsetManager, OffsetManagerCached, OffsetManagerJdbc} import za.co.absa.pramen.core.fixtures.RelationalDbFixture import za.co.absa.pramen.core.rdb.PramenDb import za.co.absa.pramen.core.reader.model.JdbcConfig @@ -44,7 +44,7 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B } def getOffsetManager: OffsetManager = { - new OffsetManagerJdbc(pramenDb.slickDb, 123L) + new OffsetManagerCached(new OffsetManagerJdbc(pramenDb.slickDb, 123L)) } "getOffsets" should { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerUtilsSuite.scala new file mode 100644 index 000000000..c7e8b06b4 --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerUtilsSuite.scala @@ -0,0 +1,62 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.tests.bookkeeper + +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.TimestampType +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.api.offset.{OffsetType, OffsetValue} +import za.co.absa.pramen.core.base.SparkTestBase +import za.co.absa.pramen.core.bookkeeper.OffsetManagerUtils + +import java.time.Instant + +class OffsetManagerUtilsSuite extends AnyWordSpec with SparkTestBase { + + import spark.implicits._ + + "getMinMaxValueFromData" should { + "work for an integral data type" in { + val df = List(("A", 1), ("B", 2), ("C", 3)).toDF("a", "offset") + + val (minValue, maxValue) = OffsetManagerUtils.getMinMaxValueFromData(df, "offset", OffsetType.IntegralType).get + + assert(minValue == OffsetValue.IntegralValue(1)) + assert(maxValue == OffsetValue.IntegralValue(3)) + } + + "work for an string data type" in { + val df = List(("A", 3), ("B", 1), ("C", 2)).toDF("offset", "b") + + val (minValue, maxValue) = OffsetManagerUtils.getMinMaxValueFromData(df, "offset", OffsetType.StringType).get + + assert(minValue == OffsetValue.StringValue("A")) + assert(maxValue == OffsetValue.StringValue("C")) + } + + "work for an datetime data type" in { + val baseTime = 1733989092000L + val df = List(("A", baseTime), ("B", baseTime + 1000), ("C", baseTime + 1500)).toDF("a", "offset") + .withColumn("offset", (col("offset") / 1000).cast(TimestampType)) + + val (minValue, maxValue) = OffsetManagerUtils.getMinMaxValueFromData(df, "offset", OffsetType.DateTimeType).get + + assert(minValue == OffsetValue.DateTimeValue(Instant.ofEpochMilli(baseTime))) + assert(maxValue == OffsetValue.DateTimeValue(Instant.ofEpochMilli(baseTime + 1500))) + } + } +} diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTarget.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTarget.scala index 7143cc7b8..a18baf56f 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTarget.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTarget.scala @@ -57,7 +57,7 @@ class EcsPipelineNotificationTarget(conf: Config) extends PipelineNotificationTa case Some(runInfo) if task.runStatus.isInstanceOf[RunStatus.Succeeded] => if (!task.taskDef.outputTable.format.isTransient && !task.taskDef.outputTable.format.isInstanceOf[DataFormat.Null] && - !task.taskDef.outputTable.format.isInstanceOf[DataFormat.Raw]) { + !task.taskDef.outputTable.format.isRaw) { EcsNotificationTarget.cleanUpS3VersionsForTable(task.taskDef.outputTable, runInfo.infoDate, ecsApiUrl, ecsApiKey, httpClient) } else { log.info(s"The task outputting to '${task.taskDef.outputTable.name}' for '${runInfo.infoDate}' outputs to ${task.taskDef.outputTable.format.name} format - skipping ECS cleanup...") diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala index 6fa2514fc..22c42780a 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/KafkaAvroSink.scala @@ -83,7 +83,7 @@ import java.time.LocalDate * * tables = [ * { - * metastore.table = metastore_table + * input.metastore.table = metastore_table * output.topic.name = "my.topic" * * # All following settings are OPTIONAL