Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#520 Add ability to run incremental transformers and sinks #526

Merged
merged 26 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6f944bf
#520 Prepare interfaces for incremental transformer processing.
yruslan Nov 22, 2024
d90bd7d
#520 Make the default batchid field for tables having 'raw' format to…
yruslan Nov 22, 2024
e5755e7
#520 Implement offset management for incremental processing.
yruslan Nov 23, 2024
b8b85b1
#520 Allow gracefully adding 'pramen_batchid' field to metastore tabl…
yruslan Nov 25, 2024
53acd43
#520 Add unit tests for the incremental processing.
yruslan Nov 25, 2024
aeac5df
#520 Add chain-commit for incremental jobs with transient job depende…
yruslan Nov 27, 2024
254dafa
#520 Add chain-commit transaction for incremental jobs with transient…
yruslan Nov 28, 2024
c381cf5
#520 Move some common offset management code to OffsetManagerUtils.
yruslan Nov 28, 2024
b700025
#520 Rename the commit flag to make it easier to understand.
yruslan Nov 29, 2024
ffa27ae
#520 Add the cache layer to the offset manager calls that incur bigge…
yruslan Nov 29, 2024
ec92fc8
#520 Simplify the way metastore reader is created for various purposes.
yruslan Nov 29, 2024
2636b3d
#520 Implement incremental transformations job tracking logic.
yruslan Nov 29, 2024
fe130d3
#520 Improve performance of committing of transformers and sinks outp…
yruslan Dec 2, 2024
423c46b
#398 Fix decimal auto-correctness for Hive JDBC source.
yruslan Dec 9, 2024
7ff08a4
#520 Move OffsetCommitRequest to the model package.
yruslan Dec 10, 2024
5bb749b
#520 Increase the size of offset table names, and various fixups.
yruslan Dec 10, 2024
24fc1de
#520 Make 'isRaw' an interface function instead of relying on 'isInst…
yruslan Dec 11, 2024
bc5019e
#520 Add unit test suite for the new incremental methods of the raw f…
yruslan Dec 12, 2024
365461d
#520 Add unit test suite for the offset manager utils.
yruslan Dec 12, 2024
7157901
#520 Update README of the new functionality.
yruslan Dec 12, 2024
f65794b
#520 Add documentation for the identity transformer.
yruslan Dec 13, 2024
ec765a7
Apply suggestions from code review
yruslan Jan 2, 2025
ffcfcdb
#520 Fix incremental processing PR suggestions.
yruslan Jan 2, 2025
96abd15
#520 Split batch and incremental implementation of MetastoreReader.
yruslan Jan 2, 2025
0565b95
#520 Improve a test as a PR suggestion.
yruslan Jan 2, 2025
c51419d
#520 Fix a regression introduced by the refactoring.
yruslan Jan 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,24 @@ Offset types available at the moment:
Only ingestion jobs support incremental schedule at the moment. Incremental transformations and sinks are planned to be
available soon.

### Incremental transformers and sinks (experimental)
In order for a transformer or a sink to use a table from metastore in incremental way, the code should invoke
`metastore.getCurrentBatch()` method instead of `metastore.getTable()`. `metastore.getCurrentBatch()` also works for
normal batch pipelines.

- When `getCurrentBatch()` is used with daily, weekly or monthly schedule, it returns data for the information date
corresponding to the running job, same as invoking `metastore.getTable("my_table", Some(infoDate), Some(infoDate))`.
- When `getCurrentBatch()` is used with incremental schedule, it returns only latests non-processed data. The offset
management is used to keep tracked of processed data.
- The column `pramen_batchid` is added automatically to output tables of ingested and transformed data in order to track
offsets. The exception is metastore `raw` format, which keeps original files as they are, and so we can't add the
`pramen_batchid` column to such tables.
- The offsets manager updates the offsets only after output of transformers or sinks have succeeded. It does the update
in transactional manner. But if update failed in the middle, duplicates are possible on next runs, so we can say that
Pramen provides 'AT LEAST ONCE' semantics for incremental transformation pipelines.
- Reruns are possible for full days to remove duplicates. But for incremental sinks, such ask Kafka sink duplicates still
might happen.

### Sinks
Sinks define a way data needs to be sent to a target system. Built-in sinks include:
- Kafka sink.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ sealed trait DataFormat {
def isTransient: Boolean

def isLazy: Boolean

def isRaw: Boolean
}

object DataFormat {
Expand All @@ -32,6 +34,8 @@ object DataFormat {
override val isTransient: Boolean = false

override val isLazy: Boolean = false

override val isRaw: Boolean = false
}

case class Delta(query: Query, recordsPerPartition: Option[Long]) extends DataFormat {
Expand All @@ -40,6 +44,8 @@ object DataFormat {
override val isTransient: Boolean = false

override val isLazy: Boolean = false

override val isRaw: Boolean = false
}

// This format is used for metatables which are just files and can only be used for further sourcing
Expand All @@ -49,6 +55,8 @@ object DataFormat {
override val isTransient: Boolean = false

override val isLazy: Boolean = false

override val isRaw: Boolean = true
}

// This format is used for tables that exist only for the duration of the process, and is not persisted
Expand All @@ -58,6 +66,8 @@ object DataFormat {
override val isTransient: Boolean = true

override val isLazy: Boolean = false

override val isRaw: Boolean = false
}

// This format is used for tables are calculated only if requested, and is not persisted
Expand All @@ -67,6 +77,8 @@ object DataFormat {
override val isTransient: Boolean = true

override val isLazy: Boolean = true

override val isRaw: Boolean = false
}

// This format is used for metatables which do not support persistence, e.g. for sink or transfer jobs
Expand All @@ -76,5 +88,7 @@ object DataFormat {
override val isTransient: Boolean = false

override val isLazy: Boolean = false

override val isRaw: Boolean = false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ abstract class SqlGeneratorBase(sqlConfig: SqlConfig) extends SqlGenerator {
}

object SqlGeneratorBase {
val MAX_STRING_OFFSET_CHARACTERS = 64
val MAX_STRING_OFFSET_CHARACTERS = 128

val forbiddenCharacters = ";'\\"
val normalCharacters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_."
Expand Down
3 changes: 3 additions & 0 deletions pramen/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ pramen {
initial.sourcing.date.weekly.expr = "@runDate - 6"
initial.sourcing.date.monthly.expr = "beginOfMonth(@runDate)"

# If true, Pramen always adds 'pramen_batchid' column, even for non-incremental pipelines
always.add.batchid.column = false

# Pramen can stop the Spark session at the end of execution. This can help cleanly finalize running
# jobs started from 'spark-submit'. But when running on Databriks this results in the job failure.
# Use it with caution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.typesafe.config.Config
import org.apache.spark.sql.SparkSession
import za.co.absa.pramen.api.MetadataManager
import za.co.absa.pramen.core.PramenImpl
import za.co.absa.pramen.core.app.config.InfoDateConfig
import za.co.absa.pramen.core.app.config.{InfoDateConfig, RuntimeConfig}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.journal.Journal
import za.co.absa.pramen.core.lock.{TokenLockFactory, TokenLockFactoryAllow}
Expand Down Expand Up @@ -57,7 +57,7 @@ object AppContextImpl {

val (bookkeeper, tokenLockFactory, journal, metadataManager, closable) = Bookkeeper.fromConfig(appConfig.bookkeepingConfig, appConfig.runtimeConfig, batchId)

val metastore: Metastore = MetastoreImpl.fromConfig(conf, appConfig.infoDateDefaults, bookkeeper, metadataManager, batchId)
val metastore: Metastore = MetastoreImpl.fromConfig(conf, appConfig.runtimeConfig, appConfig.infoDateDefaults, bookkeeper, metadataManager, batchId)

PramenImpl.instance.asInstanceOf[PramenImpl].setMetadataManager(metadataManager)
PramenImpl.instance.asInstanceOf[PramenImpl].setWorkflowConfig(conf)
Expand All @@ -83,8 +83,9 @@ object AppContextImpl {
val appConfig = AppConfig.fromConfig(conf)

val metadataManager = new MetadataManagerNull(isPersistenceEnabled = false)
val runtimeConfig = RuntimeConfig.default

val metastore: Metastore = MetastoreImpl.fromConfig(conf, infoDateConfig, bookkeeper, metadataManager, 0L)
val metastore: Metastore = MetastoreImpl.fromConfig(conf, runtimeConfig, infoDateConfig, bookkeeper, metadataManager, 0L)

val appContext = new AppContextImpl(
appConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ case class RuntimeConfig(
parallelTasks: Int,
stopSparkSession: Boolean,
allowEmptyPipeline: Boolean,
alwaysAddBatchIdColumn: Boolean,
historicalRunMode: RunMode,
sparkAppDescriptionTemplate: Option[String]
)
Expand All @@ -67,6 +68,7 @@ object RuntimeConfig {
val STOP_SPARK_SESSION = "pramen.stop.spark.session"
val VERBOSE = "pramen.verbose"
val ALLOW_EMPTY_PIPELINE = "pramen.allow.empty.pipeline"
val ALWAYS_ADD_BATCHID_COLUMN = "pramen.always.add.batchid.column"
val SPARK_APP_DESCRIPTION_TEMPLATE = "pramen.job.description.template"

def fromConfig(conf: Config): RuntimeConfig = {
Expand Down Expand Up @@ -130,6 +132,7 @@ object RuntimeConfig {
}

val allowEmptyPipeline = ConfigUtils.getOptionBoolean(conf, ALLOW_EMPTY_PIPELINE).getOrElse(false)
val alwaysAddBatchIdColumn = ConfigUtils.getOptionBoolean(conf, ALWAYS_ADD_BATCHID_COLUMN).getOrElse(false)
val sparkAppDescriptionTemplate = ConfigUtils.getOptionString(conf, SPARK_APP_DESCRIPTION_TEMPLATE)

RuntimeConfig(
Expand All @@ -147,8 +150,31 @@ object RuntimeConfig {
parallelTasks = parallelTasks,
stopSparkSession = conf.getBoolean(STOP_SPARK_SESSION),
allowEmptyPipeline,
alwaysAddBatchIdColumn,
runMode,
sparkAppDescriptionTemplate
)
}

def default: RuntimeConfig = {
RuntimeConfig(
isDryRun = false,
isRerun = false,
runTables = Seq.empty,
isUndercover = false,
useLocks = true,
checkOnlyLateData = false,
checkOnlyNewData = true,
emailIfNoChanges = false,
runDate = LocalDate.now(),
runDateTo = None,
isInverseOrder = false,
parallelTasks = 1,
stopSparkSession = true,
allowEmptyPipeline = false,
alwaysAddBatchIdColumn = false,
historicalRunMode = RunMode.CheckUpdates,
sparkAppDescriptionTemplate = None
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class BookkeeperJdbc(db: Database, batchId: Long) extends BookkeeperBase(true) {
import za.co.absa.pramen.core.utils.FutureImplicits._

private val log = LoggerFactory.getLogger(this.getClass)
private val offsetManagement = new OffsetManagerJdbc(db, batchId)
private val offsetManagement = new OffsetManagerCached(new OffsetManagerJdbc(db, batchId))

override val bookkeepingEnabled: Boolean = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.bookkeeper

import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset
import za.co.absa.pramen.api.offset.{DataOffset, OffsetType, OffsetValue}
import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest}
import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest, OffsetCommitRequest}

import java.time.LocalDate

Expand Down Expand Up @@ -79,6 +79,11 @@ trait OffsetManager {
*/
def commitRerun(request: DataOffsetRequest, minOffset: OffsetValue, maxOffset: OffsetValue): Unit

/**
* Combines both startWriteOffsets() and commitOffsets() into one operation when it is applicable.
*/
def postCommittedRecords(commitRequests: Seq[OffsetCommitRequest]): Unit

/**
* Rolls back an offset request
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.bookkeeper

import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset
import za.co.absa.pramen.api.offset.{DataOffset, OffsetType, OffsetValue}
import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest, OffsetCommitRequest}

import java.time.LocalDate
import scala.collection.mutable

/**
* The offset manager decorator handles caching or repeated queries.
*/
class OffsetManagerCached(offsetManager: OffsetManager) extends OffsetManager {
private val log = LoggerFactory.getLogger(this.getClass)
private val aggregatedOffsetsCache = new mutable.HashMap[(String, Option[LocalDate]), Option[DataOffsetAggregated]]

def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset] = {
offsetManager.getOffsets(table, infoDate)
}

def getUncommittedOffsets(table: String, onlyForInfoDate: Option[LocalDate]): Array[UncommittedOffset] = {
offsetManager.getUncommittedOffsets(table, onlyForInfoDate)
}

def getMaxInfoDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = synchronized {
val tbl = onlyForInfoDate match {
case Some(date) => s"'$table' for '$date'"
case None => s"'$table'"
}

if (aggregatedOffsetsCache.contains((table, onlyForInfoDate))) {
val value = aggregatedOffsetsCache((table, onlyForInfoDate))
log.info(s"Got min/max offsets for $tbl from cache (${renderAggregatedOptionalOffset(value)}).")
value
} else {
val value = offsetManager.getMaxInfoDateAndOffset(table, onlyForInfoDate)
log.info(s"Got min/max offsets for $tbl from the database (${renderAggregatedOptionalOffset(value)}). Saving to cache...")
aggregatedOffsetsCache += (table, onlyForInfoDate) -> value
value
}
}

def startWriteOffsets(table: String, infoDate: LocalDate, offsetType: OffsetType): DataOffsetRequest = {
offsetManager.startWriteOffsets(table, infoDate, offsetType)
}

def commitOffsets(request: DataOffsetRequest, minOffset: OffsetValue, maxOffset: OffsetValue): Unit = {
offsetManager.commitOffsets(request, minOffset, maxOffset)

this.synchronized {
aggregatedOffsetsCache --= aggregatedOffsetsCache.keys.filter(_._1 == request.tableName)
}
}

def commitRerun(request: DataOffsetRequest, minOffset: OffsetValue, maxOffset: OffsetValue): Unit = {
this.synchronized {
aggregatedOffsetsCache --= aggregatedOffsetsCache.keys.filter(_._1 == request.tableName)
}

offsetManager.commitRerun(request, minOffset, maxOffset)
}

def postCommittedRecords(commitRequests: Seq[OffsetCommitRequest]): Unit = {
offsetManager.postCommittedRecords(commitRequests)

val updatedTables = commitRequests.map(_.table).toSet
this.synchronized {
aggregatedOffsetsCache --= aggregatedOffsetsCache.keys.filter(k => updatedTables.contains(k._1))
}
}

def rollbackOffsets(request: DataOffsetRequest): Unit = {
offsetManager.rollbackOffsets(request)
}

private def renderAggregatedOptionalOffset(offsetsOpt: Option[DataOffsetAggregated]): String = {
offsetsOpt match {
case Some(offsets) =>
val minOffsetStr = offsets.minimumOffset.valueString
val maxOffsetStr = offsets.maximumOffset.valueString
s"max_info_date=${offsets.maximumInfoDate} min: '$minOffsetStr', max: $maxOffsetStr"
case None =>
"offsets are not defined"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package za.co.absa.pramen.core.bookkeeper

import org.slf4j.LoggerFactory
import slick.jdbc.H2Profile.api._
import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset
import za.co.absa.pramen.api.offset.{DataOffset, OffsetType, OffsetValue}
Expand All @@ -29,8 +28,6 @@ import scala.util.control.NonFatal
class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {
import za.co.absa.pramen.core.utils.FutureImplicits._

private val log = LoggerFactory.getLogger(this.getClass)

override def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset] = {
val offsets = getOffsetRecords(table, infoDate)

Expand Down Expand Up @@ -116,6 +113,29 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {
).execute()
}

override def postCommittedRecords(commitRequests: Seq[OffsetCommitRequest]): Unit = {
val committedAt = Instant.now()
val committedAtMilli = committedAt.toEpochMilli

val records = commitRequests.map { req =>
OffsetRecord(req.table, req.infoDate.toString, req.minOffset.dataType.dataTypeString, req.minOffset.valueString, req.maxOffset.valueString, batchId, req.createdAt.toEpochMilli, Some(committedAtMilli))
}

db.run(
OffsetRecords.records ++= records
).execute()

commitRequests.map(r => (r.table, r.infoDate))
.distinct
.foreach { case (table, infoDate) =>
db.run(
OffsetRecords.records
.filter(r => r.pramenTableName === table && r.infoDate === infoDate.toString && r.committedAt =!= committedAtMilli)
.delete
).execute()
}
}

override def rollbackOffsets(request: DataOffsetRequest): Unit = {
db.run(
OffsetRecords.records
Expand Down
Loading
Loading