diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 21bc20bf..fa33254f 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -51,3 +51,9 @@ jobs:
- name: Build and unit test
if: ${{ ! contains( github.event.pull_request.labels.*.name, 'NoTestNeeded') }}
run: mvn clean install --no-transfer-progress -Pspark-${{ matrix.spark }} -Dspan.scale.factor=10
+ - name: Build and unit test Atum Examples Scala ${{ matrix.scala }} and Spark ${{ matrix.spark }}
+ working-directory: ./examples/atum-examples
+ if: ${{ ! contains( github.event.pull_request.labels.*.name, 'NoTestNeeded') }}
+ run: |
+ mvn scala-cross-build:change-version -Pscala-${{ matrix.scala }}
+ mvn clean install --no-transfer-progress -Pspark-${{ matrix.spark }} -Dspan.scale.factor=10
diff --git a/.gitignore b/.gitignore
index 55cad332..be005f1e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -53,11 +53,15 @@ dependency-reduced-pom.xml
_testOutput
myTestCheckpoints
-examples/data/output
# switch to regexp syntax.
# syntax: regexp
# ^\.pc/
-/examples-s3-sdk-extension/data/output_s3/
+
+# examples:
+examples/atum-examples/data/output
+examples/s3-sdk-extension-examples/data/output_s3/
+
+
pom.xml.bkp
diff --git a/atum/src/test/scala/za/co/absa/atum/integration/SampleMeasurementsIntegrationSuite.scala b/atum/src/test/scala/za/co/absa/atum/integration/SampleMeasurementsIntegrationSuite.scala
index 28d2f2d7..ce9943de 100644
--- a/atum/src/test/scala/za/co/absa/atum/integration/SampleMeasurementsIntegrationSuite.scala
+++ b/atum/src/test/scala/za/co/absa/atum/integration/SampleMeasurementsIntegrationSuite.scala
@@ -29,6 +29,12 @@ import za.co.absa.atum.utils.{BuildProperties, FileUtils, SerializationUtils, Sp
import scala.concurrent.duration.DurationInt
+object SampleMeasurementsIntegrationSuite {
+
+ private case class TestBuildProperties(projectName: String, buildVersion: String) extends BuildProperties
+
+}
+
class SampleMeasurementsIntegrationSuite extends AnyFlatSpec with Matchers with Eventually with SparkTestBase
with BeforeAndAfterAll with OptionValues {
@@ -123,9 +129,3 @@ class SampleMeasurementsIntegrationSuite extends AnyFlatSpec with Matchers with
spark.disableControlMeasuresTracking()
}
}
-
-object SampleMeasurementsIntegrationSuite {
-
- private case class TestBuildProperties(projectName: String, buildVersion: String) extends BuildProperties
-
-}
diff --git a/examples-s3-sdk-extension/pom.xml b/examples-s3-sdk-extension/pom.xml
deleted file mode 100644
index 2671355a..00000000
--- a/examples-s3-sdk-extension/pom.xml
+++ /dev/null
@@ -1,101 +0,0 @@
-
-
-
-
-
- 4.0.0
-
- atum-examples-s3-sdk-extension_2.11
- jar
-
-
- za.co.absa
- atum-parent_2.11
- 4.0.0-SNAPSHOT
-
-
-
- ${project.parent.basedir}/scalastyle-config.xml
-
-
-
-
-
- za.co.absa
- atum-s3-sdk-extension_${scala.binary.version}
- ${project.version}
-
-
- za.co.absa
- atum-examples_${scala.binary.version}
- ${project.version}
-
-
-
-
- ${project.basedir}/target/${scala.binary.version}
- ${project.build.directory}/classes
- ${project.build.directory}/test-classes
-
- src/main/scala
- src/test/scala
-
-
- org.scalatest
- scalatest-maven-plugin
- ${scalatest.maven.version}
-
- true
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- ${maven.shade.version}
-
-
- package
-
- shade
-
-
-
-
-
- org.scalastyle
- scalastyle-maven-plugin
-
-
-
-
-
-
- scala-2.11
-
- 2.11
- ${scala_2.11.version}
-
-
-
- scala-2.12
-
- 2.12
- ${scala_2.12.version}
-
-
-
-
diff --git a/examples/atum-examples/README.md b/examples/atum-examples/README.md
new file mode 100644
index 00000000..483d9857
--- /dev/null
+++ b/examples/atum-examples/README.md
@@ -0,0 +1,63 @@
+# Atum Spark Job Application Example
+
+This is a set of Atum Apache Spark Applications that can be used as inspiration for creating other
+Spark projects. It includes all dependencies in a 'fat' jar to run the job locally and on a cluster.
+
+Here is the list of examples (all from `za.co.absa.atum.examples` space):
+
+- `SampleMeasurements{1|2|3}` - Example apps using core Atum to show the Atum initialization,
+checkpoint setup and the resulting control measure handling (in the form of `_INFO` file)
+- `CreateInfoFileTool[CSV]` - Applications demonstrating the means of creating the initial `_INFO` for data (CSV or general)
+
+## Usage
+
+The example application is in `za.co.absa.atum.examples` package. The project contains build files for `Maven`.
+
+## Maven
+
+**To test-run this locally use**
+```shell script
+mvn test
+```
+(This will run the `SampleMeasurements{1|2|3}` jobs via `SampleMeasurementsHdfsRunnerSpec` as tests)
+
+**To build an uber jar to run on cluster**
+```shell script
+mvn package -DskipTests=true
+```
+
+## Scala and Spark version switching
+Same as Atum itself, the example project also supports switching to build with different Scala and Spark version:
+
+Switching Scala version (2.11 or 2.12) can be done via
+```shell script
+mvn scala-cross-build:change-version -Pscala-2.11 # this is default
+# or
+mvn scala-cross-build:change-version -Pscala-2.12
+```
+
+Choosing a spark version to build, there are `spark-2.4` and `spark-3.1` profiles:
+```shell script
+mvn clean install -Pspark-2.4 # this is default
+mvn clean install -Pspark-3.1
+```
+
+## Running via spark-submit
+
+After the project is packaged you can copy `target/2.11/atum-examples_2.11-0.0.1-SNAPSHOT.jar`
+to an edge node of a cluster and use `spark-submit` to run the job. Here is an example when running on Yarn:
+
+```shell script
+spark-submit --master yarn --deploy-mode client --class za.co.absa.atum.examples.SampleMeasurements1 atum-examples_2.11-0.0.1-SNAPSHOT.jar
+```
+
+### Running Spark Applications in local mode from an IDE
+If you try to run the example from an IDE you'll likely get the following exception:
+```Exception in thread "main" java.lang.NoClassDefFoundError: scala/Option```
+
+This is because the jar is created with all Scala and Spark dependencies removed (using shade plugin). This is done so that the uber jar for `spark-submit` is not too big.
+
+There are multiple options to deal with it, namely:
+ - use the test runner class, for the SampleMeasurements, it is `SampleMeasurementsHdfsRunnerSpec` (provided dependencies will be loaded for tests)
+ - use the _Include dependencies with "Provided" scope_ option in Run Configuration in IDEA or equivalent in your IDE.
+ - change the scope of `provided` dependencies to `compile` in the POM file and run Spark Applications as a normal JVM App.
diff --git a/examples-s3-sdk-extension/data/input/wikidata.csv b/examples/atum-examples/data/input/wikidata.csv
similarity index 100%
rename from examples-s3-sdk-extension/data/input/wikidata.csv
rename to examples/atum-examples/data/input/wikidata.csv
diff --git a/examples/data/input/wikidata.csv.info b/examples/atum-examples/data/input/wikidata.csv.info
similarity index 100%
rename from examples/data/input/wikidata.csv.info
rename to examples/atum-examples/data/input/wikidata.csv.info
diff --git a/examples/atum-examples/pom.xml b/examples/atum-examples/pom.xml
new file mode 100644
index 00000000..b9142e15
--- /dev/null
+++ b/examples/atum-examples/pom.xml
@@ -0,0 +1,218 @@
+
+
+
+
+
+ 4.0.0
+
+ za.co.absa
+ atum-examples_2.12
+ 0.0.1-SNAPSHOT
+
+ jar
+
+
+ 4.0.0-SNAPSHOT
+ UTF-8
+ 1
+
+
+ 2.11
+ ${scala_2.11.version}
+
+ 2.11.12
+ 2.12.12
+
+
+ 2.4.6
+ 3.1.2
+
+ ${spark-24.version}
+
+
+ 2.12.12
+ 2.12
+
+
+ 2.0.2
+ 2.3
+ 3.2.0
+ 3.0.1
+ 0.2.1
+
+
+ 0.0.27
+ 3.2.2
+
+
+
+
+
+
+ org.scala-lang
+ scala-library
+ ${scala.version}
+ provided
+
+
+
+
+ za.co.absa
+ atum_${scala.binary.version}
+ ${atum.version}
+
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${spark.version}
+ provided
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ ${spark.version}
+ provided
+
+
+
+
+ za.co.absa.commons
+ commons_${scala.binary.version}
+ ${commons.version}
+ provided
+
+
+
+
+ org.scalatest
+ scalatest_${scala.binary.version}
+ ${scalatest.version}
+ compile
+
+
+
+
+ ${project.basedir}/target/${scala.binary.version}
+ ${project.build.directory}/classes
+ ${project.build.directory}/test-classes
+
+ src/main/scala
+ src/test/scala
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ ${scala.maven.plugin.version}
+
+ false
+ incremental
+
+ -Xfatal-warnings
+ -unchecked
+ -deprecation
+ -feature
+
+
+
+
+
+ compile
+ testCompile
+
+
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+ ${scalatest.maven.plugin.version}
+
+ ${span.scale.factor}
+
+
+
+ test
+
+ test
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ ${maven.shade.plugin.version}
+
+
+ package
+
+ shade
+
+
+
+
+
+
+
+ com.github.wajda
+ scala-cross-build-maven-plugin
+ ${scala.cross.build.maven.plugin.version}
+
+ ${default.scala.binary.version}
+ ${default.scala.version}
+
+
+
+
+
+
+
+
+
+ scala-2.11
+
+ 2.11
+ ${scala_2.11.version}
+
+
+
+ scala-2.12
+
+ 2.12
+ ${scala_2.12.version}
+
+
+
+
+
+ spark-2.4
+
+ ${spark-24.version}
+
+
+
+ spark-3.1
+
+ ${spark-31.version}
+
+
+
+
diff --git a/examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileTool.scala b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileTool.scala
similarity index 99%
rename from examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileTool.scala
rename to examples/atum-examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileTool.scala
index 886271c0..1f68b7c5 100644
--- a/examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileTool.scala
+++ b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileTool.scala
@@ -77,5 +77,6 @@ object CreateInfoFileTool {
ControlMeasureUtils.writeControlMeasureInfoFileToHadoopFs(cm, path, JsonType.Pretty)(fs)
val strJson = cm.asJsonPretty
+ log.info(strJson)
}
}
diff --git a/examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileToolCSV.scala b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileToolCSV.scala
similarity index 100%
rename from examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileToolCSV.scala
rename to examples/atum-examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileToolCSV.scala
diff --git a/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala
similarity index 91%
rename from examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala
rename to examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala
index 339c405f..65b8ce3c 100644
--- a/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala
+++ b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala
@@ -18,16 +18,21 @@ package za.co.absa.atum.examples
import java.nio.file.{Files, Paths}
import org.apache.hadoop.fs.FileSystem
+import org.apache.log4j.LogManager
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.scalatest.concurrent.Eventually
import za.co.absa.atum.AtumImplicits._
-import scala.concurrent.duration.DurationInt // using basic Atum without extensions
+import scala.concurrent.duration.DurationInt
object SampleMeasurements1 extends Eventually {
+
+ private val log = LogManager.getLogger(this.getClass)
+
def main(args: Array[String]) {
val sparkBuilder = SparkSession.builder().appName("Sample Measurements 1 Job")
val spark = sparkBuilder
+ // .master("local") // use this when running locally
.getOrCreate()
import spark.implicits._
@@ -39,7 +44,6 @@ object SampleMeasurements1 extends Eventually {
.setControlMeasuresWorkflow("Job 1")
// A business logic of a spark job ...
-
spark.read
.option("header", "true")
.option("inferSchema", "true")
diff --git a/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala
similarity index 73%
rename from examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala
rename to examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala
index f0f70f37..d3721453 100644
--- a/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala
+++ b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala
@@ -15,17 +15,30 @@
package za.co.absa.atum.examples
+import java.nio.file.{Files, Paths}
+
import org.apache.hadoop.fs.FileSystem
+import org.apache.log4j.LogManager
import org.apache.spark.sql.{SaveMode, SparkSession}
-import za.co.absa.atum.AtumImplicits._ // using basic Atum without extensions
+import org.scalatest.concurrent.Eventually
+import za.co.absa.atum.AtumImplicits._
+import za.co.absa.atum.examples.SampleMeasurements1.{eventually, interval, scaled, timeout}
+
+import scala.concurrent.duration.DurationInt
+
+object SampleMeasurements2 extends Eventually {
+
+ private val log = LogManager.getLogger(this.getClass)
-object SampleMeasurements2 {
def main(args: Array[String]) {
// This example is intended to run AFTER SampleMeasurements1, otherwise it will fail on input file absence
val sparkBuilder = SparkSession.builder().appName("Sample Measurements 2 Job")
- val spark = sparkBuilder.getOrCreate()
+ val spark = sparkBuilder
+ // .master("local") // use this when running locally
+ .getOrCreate()
+
import spark.implicits._
val hadoopConfiguration = spark.sparkContext.hadoopConfiguration
@@ -51,6 +64,12 @@ object SampleMeasurements2 {
.write.mode(SaveMode.Overwrite)
.parquet("data/output/stage2_job_results")
+ eventually(timeout(scaled(10.seconds)), interval(scaled(500.millis))) {
+ if (!Files.exists(Paths.get("data/output/stage2_job_results/_INFO"))) {
+ throw new Exception("_INFO file not found at data/output/stage2_job_results")
+ }
+ }
+
spark.disableControlMeasuresTracking()
}
}
diff --git a/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala
similarity index 65%
rename from examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala
rename to examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala
index a6c8ff02..df9e9d4f 100644
--- a/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala
+++ b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala
@@ -15,22 +15,29 @@
package za.co.absa.atum.examples
+import java.nio.file.{Files, Paths}
+
import org.apache.hadoop.fs.FileSystem
+import org.apache.log4j.LogManager
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.scalatest.concurrent.Eventually
import za.co.absa.atum.AtumImplicits._
+import za.co.absa.atum.examples.SampleMeasurements2.{eventually, interval, scaled, timeout}
import za.co.absa.atum.model.ControlMeasure
import za.co.absa.atum.utils.{BuildProperties, FileUtils, SerializationUtils}
-import java.nio.file.{Files, Paths}
-import scala.concurrent.duration.DurationInt // using basic Atum without extensions
-
-case class MyBuildProperties(projectName: String, buildVersion: String) extends BuildProperties
+import scala.concurrent.duration.DurationInt
object SampleMeasurements3 extends Eventually {
+ case class MyBuildProperties(projectName: String, buildVersion: String) extends BuildProperties
+
+ private val log = LogManager.getLogger(this.getClass)
+
def main(args: Array[String]) {
val sparkBuilder = SparkSession.builder().appName("Sample Measurements 3 Job")
- val spark = sparkBuilder.getOrCreate()
+ val spark = sparkBuilder
+ // .master("local") // use this when running locally
+ .getOrCreate()
import spark.implicits._
val hadoopConfiguration = spark.sparkContext.hadoopConfiguration
@@ -41,7 +48,6 @@ object SampleMeasurements3 extends Eventually {
.setControlMeasuresWorkflow("Job 1")
// A business logic of a spark job ...
-
spark.read
.option("header", "true")
.option("inferSchema", "true")
@@ -52,17 +58,21 @@ object SampleMeasurements3 extends Eventually {
.write.mode(SaveMode.Overwrite)
.parquet("data/output/stage3_job_results")
- eventually(timeout(scaled(10.seconds)), interval(scaled(500.millis))) {
- if (!Files.exists(Paths.get("data/output/stage3_job_results/_INFO")))
+ eventually(timeout(scaled(10.seconds)), interval(scaled(500.millis))) { // scaling will help on slow environments
+ if (!Files.exists(Paths.get("data/output/stage3_job_results/_INFO"))) {
throw new Exception("_INFO file not found at data/output/stage3_job_results")
- }
+ }
+
+ val jsonInfoFile = FileUtils.readFileToString("data/output/stage3_job_results/_INFO")
+ val measureObject1: ControlMeasure = SerializationUtils.fromJson[ControlMeasure](jsonInfoFile)
+ val checkpoint = measureObject1.checkpoints.filter(_.name == "checkpoint1").head
- val jsonInfoFile = FileUtils.readFileToString("data/output/stage3_job_results/_INFO")
- val measureObject1: ControlMeasure = SerializationUtils.fromJson[ControlMeasure](jsonInfoFile)
- val checkpoint = measureObject1.checkpoints.filter(_.name == "checkpoint1").head
+ if (!checkpoint.software.contains("MySoftware") || !checkpoint.version.contains("v007")) {
+ throw new Exception(s"Software or Version was not set properly. Got name ${checkpoint.software} and version ${checkpoint.version}")
+ } else {
+ log.info("_INFO file correctly contained custom SW Name and version.")
+ }
- if (!checkpoint.software.contains("MySoftware") || !checkpoint.version.contains("v007")) {
- throw new Exception(s"Software or Version was not set properly. Got name ${checkpoint.software} and version ${checkpoint.version}")
}
spark.disableControlMeasuresTracking()
diff --git a/examples/src/test/scala/za/co/absa/atum/HdfsInfoIntegrationSuite.scala b/examples/atum-examples/src/test/scala/za/co/absa/atum/HdfsInfoIntegrationSuite.scala
similarity index 100%
rename from examples/src/test/scala/za/co/absa/atum/HdfsInfoIntegrationSuite.scala
rename to examples/atum-examples/src/test/scala/za/co/absa/atum/HdfsInfoIntegrationSuite.scala
diff --git a/examples/src/test/scala/za/co/absa/atum/LocalFsTestUtils.scala b/examples/atum-examples/src/test/scala/za/co/absa/atum/LocalFsTestUtils.scala
similarity index 100%
rename from examples/src/test/scala/za/co/absa/atum/LocalFsTestUtils.scala
rename to examples/atum-examples/src/test/scala/za/co/absa/atum/LocalFsTestUtils.scala
diff --git a/examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsHdfsRunnerSpec.scala b/examples/atum-examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsHdfsRunnerSpec.scala
similarity index 94%
rename from examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsHdfsRunnerSpec.scala
rename to examples/atum-examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsHdfsRunnerSpec.scala
index fc68d1c2..cb39aad8 100644
--- a/examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsHdfsRunnerSpec.scala
+++ b/examples/atum-examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsHdfsRunnerSpec.scala
@@ -22,7 +22,7 @@ class SampleMeasurementsHdfsRunnerSpec extends AnyFunSuite
with SparkJobRunnerMethods
with SparkLocalMaster {
- // SampleMeasurement2 depends on SampleMeasurements1's output, so they must be run in this order
+ // SampleMeasurement2 depends on SampleMeasurements1's output (analogical for SM3), so they should be run in this order
runSparkJobAsTest[SampleMeasurements1.type]
runSparkJobAsTest[SampleMeasurements2.type]
runSparkJobAsTest[SampleMeasurements3.type]
diff --git a/examples/src/main/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala b/examples/atum-examples/src/test/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala
similarity index 100%
rename from examples/src/main/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala
rename to examples/atum-examples/src/test/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala
index 315b0057..4931af3b 100644
--- a/examples/src/main/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala
+++ b/examples/atum-examples/src/test/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala
@@ -16,8 +16,8 @@
package za.co.absa.atum.utils
import org.scalatest.funsuite.AnyFunSuiteLike
-
import scala.language.reflectiveCalls
+
import scala.reflect.ClassTag
import scala.reflect.runtime.universe
diff --git a/examples/src/main/scala/za/co/absa/atum/utils/SparkLocalMaster.scala b/examples/atum-examples/src/test/scala/za/co/absa/atum/utils/SparkLocalMaster.scala
similarity index 100%
rename from examples/src/main/scala/za/co/absa/atum/utils/SparkLocalMaster.scala
rename to examples/atum-examples/src/test/scala/za/co/absa/atum/utils/SparkLocalMaster.scala
diff --git a/examples/pom.xml b/examples/pom.xml
deleted file mode 100644
index a0ea70d0..00000000
--- a/examples/pom.xml
+++ /dev/null
@@ -1,120 +0,0 @@
-
-
-
-
-
- 4.0.0
-
- atum-examples_2.11
- jar
-
-
- za.co.absa
- atum-parent_2.11
- 4.0.0-SNAPSHOT
-
-
-
- ${project.parent.basedir}/scalastyle-config.xml
-
-
-
-
-
- org.scala-lang
- scala-library
- ${scala.version}
- provided
-
-
-
-
- za.co.absa
- atum_${scala.binary.version}
- ${project.version}
-
-
-
-
- za.co.absa.commons
- commons_${scala.binary.version}
- ${commons.version}
- test
-
-
-
-
- org.scalatest
- scalatest_${scala.binary.version}
- ${scalatest.version}
- provided
-
-
-
-
- ${project.basedir}/target/${scala.binary.version}
- ${project.build.directory}/classes
- ${project.build.directory}/test-classes
-
- src/main/scala
- src/test/scala
-
-
- org.scalatest
- scalatest-maven-plugin
- ${scalatest.maven.version}
-
- false
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- ${maven.shade.version}
-
-
- package
-
- shade
-
-
-
-
-
- org.scalastyle
- scalastyle-maven-plugin
-
-
-
-
-
-
- scala-2.11
-
- 2.11
- ${scala_2.11.version}
-
-
-
- scala-2.12
-
- 2.12
- ${scala_2.12.version}
-
-
-
-
diff --git a/examples/s3-sdk-extension-examples/README.md b/examples/s3-sdk-extension-examples/README.md
new file mode 100644
index 00000000..dc50cd43
--- /dev/null
+++ b/examples/s3-sdk-extension-examples/README.md
@@ -0,0 +1,60 @@
+# SDK-S3 Atum Spark Job Application Example
+
+This is a set of Atum Apache Spark Applications (using the SDK S3 Atum Extension) that can be used as inspiration for creating other
+Spark projects. It includes all dependencies in a 'fat' jar to run the job locally and on a cluster.
+
+- `SampleSdkS3Measurements{1|2}` - Example apps using Atum SDK S3 Extension to show the Atum initialization,
+checkpoint setup and the resulting control measure handling (in the form of `_INFO` file originating and lading to AWS S3)
+
+## Usage
+
+The example application is in `za.co.absa.atum.examples` package. The project contains build files for `Maven`.
+
+## Maven
+**To build an uber jar to run on cluster**
+```shell script
+mvn package -DskipTests=true
+```
+
+## Scala and Spark version switching
+Same as Atum itself, the example project also supports switching to build with different Scala and Spark version:
+
+Switching Scala version (2.11 or 2.12) can be done via
+```shell script
+mvn scala-cross-build:change-version -Pscala-2.11 # this is default
+# or
+mvn scala-cross-build:change-version -Pscala-2.12
+```
+
+Choosing a spark version to build, there are `spark-2.4` and `spark-3.1` profiles:
+```shell script
+mvn clean install -Pspark-2.4 # this is default
+mvn clean install -Pspark-3.1
+```
+
+## Running Requirements
+Since these example apps demonstrate cooperation with S3 resources, a number of environment prerequisites must be met
+for the code to be truly runnable. Namely:
+ - having a AWS profile named `saml` in `~/.aws/credentials`
+ - having your bucket defined in `TOOLING_BUCKET_NAME` and your KMS Key ID in `TOOLING_KMS_KEY_ID`
+ (the example is written to enforce AWS-KMS server-side encryption)
+
+## Running via spark-submit
+
+After the project is packaged you can copy `target/2.11/atum-examples-s3-sdk-extension_2.11-0.0.1-SNAPSHOT`
+to an edge node of a cluster and use `spark-submit` to run the job. Here is an example when running on Yarn:
+
+```shell script
+spark-submit --master yarn --deploy-mode client --class za.co.absa.atum.examples.SampleSdkS3Measurements1 atum-examples-s3-sdk-extension_2.11-0.0.1-SNAPSHOT.jar
+```
+
+### Running Spark Applications in local mode from an IDE
+If you try to run the example from an IDE you'll likely get the following exception:
+```Exception in thread "main" java.lang.NoClassDefFoundError: scala/Option```
+
+This is because the jar is created with all Scala and Spark dependencies removed (using shade plugin). This is done so that the uber jar for `spark-submit` is not too big.
+
+There are multiple options to deal with it, namely:
+ - use the test runner class, for the SampleSdkS3Measurements, it is `SampleMeasurementsS3RunnerExampleSpec` (provided dependencies will be loaded for tests)
+ - use the _Include dependencies with "Provided" scope_ option in Run Configuration in IDEA or equivalent in your IDE.
+ - change the scope of `provided` dependencies to `compile` in the POM file and run Spark Applications as a normal JVM App.
diff --git a/examples/data/input/wikidata.csv b/examples/s3-sdk-extension-examples/data/input/wikidata.csv
similarity index 100%
rename from examples/data/input/wikidata.csv
rename to examples/s3-sdk-extension-examples/data/input/wikidata.csv
diff --git a/examples/s3-sdk-extension-examples/pom.xml b/examples/s3-sdk-extension-examples/pom.xml
new file mode 100644
index 00000000..0b51c951
--- /dev/null
+++ b/examples/s3-sdk-extension-examples/pom.xml
@@ -0,0 +1,218 @@
+
+
+
+
+
+ 4.0.0
+
+ za.co.absa
+ atum-examples-s3-sdk-extension_2.11
+ 0.0.1-SNAPSHOT
+
+ jar
+
+
+ 4.0.0-SNAPSHOT
+ UTF-8
+
+
+ 2.11
+ ${scala_2.11.version}
+
+ 2.11.12
+ 2.12.12
+
+
+ 2.4.6
+ 3.1.2
+
+ ${spark-24.version}
+
+
+ 2.11.12
+ 2.11
+
+
+ 2.0.2
+ 2.3
+ 3.2.0
+ 3.0.1
+ 0.2.1
+
+
+ 0.0.27
+ 3.2.2
+
+
+
+
+
+
+ org.scala-lang
+ scala-library
+ ${scala.version}
+ provided
+
+
+
+
+ za.co.absa
+ atum-s3-sdk-extension_${scala.binary.version}
+ ${atum.version}
+
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${spark.version}
+ provided
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ ${spark.version}
+ provided
+
+
+
+
+ za.co.absa.commons
+ commons_${scala.binary.version}
+ ${commons.version}
+ provided
+
+
+
+
+ org.scalatest
+ scalatest_${scala.binary.version}
+ ${scalatest.version}
+ test
+
+
+
+
+ ${project.basedir}/target/${scala.binary.version}
+ ${project.build.directory}/classes
+ ${project.build.directory}/test-classes
+
+ src/main/scala
+ src/test/scala
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ ${scala.maven.plugin.version}
+
+ false
+ incremental
+
+ -Xfatal-warnings
+ -unchecked
+ -deprecation
+ -feature
+
+
+
+
+
+ compile
+ testCompile
+
+
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+ ${scalatest.maven.plugin.version}
+
+ true
+
+
+
+ test
+
+ test
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ ${maven.shade.plugin.version}
+
+
+ package
+
+ shade
+
+
+
+
+
+
+
+ com.github.wajda
+ scala-cross-build-maven-plugin
+ ${scala.cross.build.maven.plugin.version}
+
+ ${default.scala.binary.version}
+ ${default.scala.version}
+
+
+
+
+
+
+
+
+
+ scala-2.11
+
+ 2.11
+ ${scala_2.11.version}
+
+
+
+ scala-2.12
+
+ 2.12
+ ${scala_2.12.version}
+
+
+
+
+
+ spark-2.4
+
+ ${spark-24.version}
+
+
+
+ spark-3.1
+
+ ${spark-31.version}
+
+
+
+
diff --git a/examples-s3-sdk-extension/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements1.scala b/examples/s3-sdk-extension-examples/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements1.scala
similarity index 97%
rename from examples-s3-sdk-extension/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements1.scala
rename to examples/s3-sdk-extension-examples/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements1.scala
index 0a0e8c84..c74a0289 100644
--- a/examples-s3-sdk-extension/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements1.scala
+++ b/examples/s3-sdk-extension-examples/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements1.scala
@@ -27,7 +27,7 @@ object SampleSdkS3Measurements1 {
def main(args: Array[String]) {
val sparkBuilder = SparkSession.builder().appName("Sample S3 Measurements 1 Job")
val spark = sparkBuilder
- // .master("local")
+ // .master("local") // use this when running locally
.getOrCreate()
import spark.implicits._
@@ -58,5 +58,6 @@ object SampleSdkS3Measurements1 {
.parquet("data/output_s3/stage1_job_results")
spark.disableControlMeasuresTracking()
+ spark.stop()
}
}
diff --git a/examples-s3-sdk-extension/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements2.scala b/examples/s3-sdk-extension-examples/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements2.scala
similarity index 91%
rename from examples-s3-sdk-extension/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements2.scala
rename to examples/s3-sdk-extension-examples/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements2.scala
index 2d831e16..52d7b300 100644
--- a/examples-s3-sdk-extension/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements2.scala
+++ b/examples/s3-sdk-extension-examples/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements2.scala
@@ -34,8 +34,10 @@ object SampleSdkS3Measurements2 {
// This example is intended to run AFTER SampleMeasurements1, otherwise it will fail on input file absence
val sparkBuilder = SparkSession.builder().appName("Sample Measurements 2 Job")
- //val spark = sparkBuilder.master("local").getOrCreate()
- val spark = sparkBuilder.getOrCreate()
+ val spark = sparkBuilder
+ // .master("local") // use this when running locally
+ .getOrCreate()
+
import spark.implicits._
val hadoopConfiguration = spark.sparkContext.hadoopConfiguration
@@ -77,6 +79,7 @@ object SampleSdkS3Measurements2 {
.parquet("data/output_s3/stage2_job_results")
spark.disableControlMeasuresTracking()
+ spark.stop()
// checking info file presence via wrapped AWS SDK s3
val loader = ControlMeasuresSdkS3LoaderJsonFile(infoFileOutputLocation)
@@ -85,7 +88,11 @@ object SampleSdkS3Measurements2 {
val expectedCheckpointRecordCounts = Seq(
"Source" -> 4964, "Raw" -> 4964, "checkpoint1" -> 3072, "checkpoint2" -> 1651)
val extractedCounts = extractCheckpointsRecordCounts(controlMeasure)
- assert(extractedCounts == expectedCheckpointRecordCounts, s"expecting control measure counts to be: $expectedCheckpointRecordCounts, but $extractedCounts found.")
+ if (extractedCounts == expectedCheckpointRecordCounts) {
+ log.info("Expected control measure counts check suceeded.")
+ } else {
+ throw new Exception(s"expecting control measure counts to be: $expectedCheckpointRecordCounts, but $extractedCounts found.")
+ }
}
private def extractCheckpointsRecordCounts(controlMeasure: ControlMeasure): Seq[(String, Int)] = {
diff --git a/examples-s3-sdk-extension/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsS3RunnerExampleSpec.scala b/examples/s3-sdk-extension-examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsS3RunnerExampleSpec.scala
similarity index 100%
rename from examples-s3-sdk-extension/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsS3RunnerExampleSpec.scala
rename to examples/s3-sdk-extension-examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsS3RunnerExampleSpec.scala
diff --git a/examples/s3-sdk-extension-examples/src/test/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala b/examples/s3-sdk-extension-examples/src/test/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala
new file mode 100644
index 00000000..4931af3b
--- /dev/null
+++ b/examples/s3-sdk-extension-examples/src/test/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.atum.utils
+
+import org.scalatest.funsuite.AnyFunSuiteLike
+import scala.language.reflectiveCalls
+
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe
+
+trait SparkJobRunnerMethods {
+ this: AnyFunSuiteLike =>
+
+ private def runSparkJob[T](implicit ct: ClassTag[T]): Unit = {
+ type MainClass = {def main(args: Array[String]): Unit}
+
+ val jobClass = ct.runtimeClass
+ val jobClassSymbol = universe.runtimeMirror(jobClass.getClassLoader).classSymbol(jobClass)
+ val jobInstance =
+ if (jobClassSymbol.isModuleClass) {
+ jobClass.getField("MODULE$").get(jobClass)
+ } else {
+ jobClass.getDeclaredConstructor().newInstance()
+ }
+
+ jobInstance.asInstanceOf[MainClass].main(Array.empty)
+ }
+
+ def runSparkJobAsTest[T](implicit ct: ClassTag[T]): Unit = {
+ val sampleName = ct.runtimeClass.getSimpleName
+ test(sampleName)(runSparkJob[T](ct))
+ }
+}
diff --git a/examples/s3-sdk-extension-examples/src/test/scala/za/co/absa/atum/utils/SparkLocalMaster.scala b/examples/s3-sdk-extension-examples/src/test/scala/za/co/absa/atum/utils/SparkLocalMaster.scala
new file mode 100644
index 00000000..2e0885c6
--- /dev/null
+++ b/examples/s3-sdk-extension-examples/src/test/scala/za/co/absa/atum/utils/SparkLocalMaster.scala
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.atum.utils
+
+trait SparkLocalMaster {
+ System.getProperties.setProperty("spark.master", "local[*]")
+
+ // in order to runSampleMeasuremts as tests, otherwise
+ // java.lang.IllegalArgumentException: System memory 259522560 must be at least 471859200... is thrown
+ System.getProperties.setProperty("spark.testing.memory", (1024*1024*1024).toString) // 1g
+}
diff --git a/model/src/test/scala/za/co/absa/atum/util/JacksonJsonSerializer.scala b/model/src/test/scala/za/co/absa/atum/util/JacksonJsonSerializer.scala
index d009ffb4..06bc6c16 100644
--- a/model/src/test/scala/za/co/absa/atum/util/JacksonJsonSerializer.scala
+++ b/model/src/test/scala/za/co/absa/atum/util/JacksonJsonSerializer.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018-2019 ABSA Group Limited
+ * Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/pom.xml b/pom.xml
index 11431e4b..79b1af47 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,11 +25,9 @@
pom
- atum
model
- examples
+ atum
atum-s3-sdk-extension
- examples-s3-sdk-extension
Atum
@@ -94,10 +92,14 @@
yyyy-MM-dd'T'HH:mm:ssX
1.6
1.6
- 3.2.0
- 2.3
- 3.0.1
- 2.18.1
+ 3.2.0
+ 2.3
+ 3.0.1
+ 2.18.1
+ 2.0.2
+
+ 2.4.2
+ 0.2.1
atum-scm-server
@@ -121,7 +123,6 @@
3.5.3
2.10.4
- 2.0.2
3.2.2
1.7.25
2.5
@@ -241,7 +242,7 @@
net.alchim31.maven
scala-maven-plugin
- ${maven.scala.version}
+ ${scala.maven.plugin.version}
false
incremental
@@ -275,7 +276,7 @@
maven-surefire-plugin
- ${maven.surefire.version}
+ ${maven.surefire.plugin.version}
true
@@ -284,7 +285,7 @@
org.scalatest
scalatest-maven-plugin
- ${scalatest.maven.version}
+ ${scalatest.maven.plugin.version}
${project.build.directory}/surefire-reports
.
@@ -303,7 +304,7 @@
org.apache.maven.plugins
maven-source-plugin
- ${maven.sources.version}
+ ${maven.source.plugin.version}
attach-sources
@@ -316,7 +317,7 @@
org.apache.maven.plugins
maven-release-plugin
- 2.4.2
+ ${maven.release.plugin.version}
v@{project.version}
true
@@ -332,7 +333,7 @@
com.github.wajda
scala-cross-build-maven-plugin
- 0.2.1
+ ${scala.cross.build.maven.plugin.version}
${default.scala.binary.version}
${default.scala.version}
@@ -410,8 +411,10 @@
**/_INFO
**/_SUCCESS
**/org.apache.spark.sql.sources.DataSourceRegister
- dependency-reduced-pom.xml
+ **/dependency-reduced-pom.xml
**/org.mockito.plugins.MockMaker
+ **/.idea/**
+ **/*.iml