diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 21bc20bf..fa33254f 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -51,3 +51,9 @@ jobs: - name: Build and unit test if: ${{ ! contains( github.event.pull_request.labels.*.name, 'NoTestNeeded') }} run: mvn clean install --no-transfer-progress -Pspark-${{ matrix.spark }} -Dspan.scale.factor=10 + - name: Build and unit test Atum Examples Scala ${{ matrix.scala }} and Spark ${{ matrix.spark }} + working-directory: ./examples/atum-examples + if: ${{ ! contains( github.event.pull_request.labels.*.name, 'NoTestNeeded') }} + run: | + mvn scala-cross-build:change-version -Pscala-${{ matrix.scala }} + mvn clean install --no-transfer-progress -Pspark-${{ matrix.spark }} -Dspan.scale.factor=10 diff --git a/.gitignore b/.gitignore index 55cad332..be005f1e 100644 --- a/.gitignore +++ b/.gitignore @@ -53,11 +53,15 @@ dependency-reduced-pom.xml _testOutput myTestCheckpoints -examples/data/output # switch to regexp syntax. # syntax: regexp # ^\.pc/ -/examples-s3-sdk-extension/data/output_s3/ + +# examples: +examples/atum-examples/data/output +examples/s3-sdk-extension-examples/data/output_s3/ + + pom.xml.bkp diff --git a/atum/src/test/scala/za/co/absa/atum/integration/SampleMeasurementsIntegrationSuite.scala b/atum/src/test/scala/za/co/absa/atum/integration/SampleMeasurementsIntegrationSuite.scala index 28d2f2d7..ce9943de 100644 --- a/atum/src/test/scala/za/co/absa/atum/integration/SampleMeasurementsIntegrationSuite.scala +++ b/atum/src/test/scala/za/co/absa/atum/integration/SampleMeasurementsIntegrationSuite.scala @@ -29,6 +29,12 @@ import za.co.absa.atum.utils.{BuildProperties, FileUtils, SerializationUtils, Sp import scala.concurrent.duration.DurationInt +object SampleMeasurementsIntegrationSuite { + + private case class TestBuildProperties(projectName: String, buildVersion: String) extends BuildProperties + +} + class SampleMeasurementsIntegrationSuite extends AnyFlatSpec with Matchers with Eventually with SparkTestBase with BeforeAndAfterAll with OptionValues { @@ -123,9 +129,3 @@ class SampleMeasurementsIntegrationSuite extends AnyFlatSpec with Matchers with spark.disableControlMeasuresTracking() } } - -object SampleMeasurementsIntegrationSuite { - - private case class TestBuildProperties(projectName: String, buildVersion: String) extends BuildProperties - -} diff --git a/examples-s3-sdk-extension/pom.xml b/examples-s3-sdk-extension/pom.xml deleted file mode 100644 index 2671355a..00000000 --- a/examples-s3-sdk-extension/pom.xml +++ /dev/null @@ -1,101 +0,0 @@ - - - - - - 4.0.0 - - atum-examples-s3-sdk-extension_2.11 - jar - - - za.co.absa - atum-parent_2.11 - 4.0.0-SNAPSHOT - - - - ${project.parent.basedir}/scalastyle-config.xml - - - - - - za.co.absa - atum-s3-sdk-extension_${scala.binary.version} - ${project.version} - - - za.co.absa - atum-examples_${scala.binary.version} - ${project.version} - - - - - ${project.basedir}/target/${scala.binary.version} - ${project.build.directory}/classes - ${project.build.directory}/test-classes - - src/main/scala - src/test/scala - - - org.scalatest - scalatest-maven-plugin - ${scalatest.maven.version} - - true - - - - - org.apache.maven.plugins - maven-shade-plugin - ${maven.shade.version} - - - package - - shade - - - - - - org.scalastyle - scalastyle-maven-plugin - - - - - - - scala-2.11 - - 2.11 - ${scala_2.11.version} - - - - scala-2.12 - - 2.12 - ${scala_2.12.version} - - - - diff --git a/examples/atum-examples/README.md b/examples/atum-examples/README.md new file mode 100644 index 00000000..483d9857 --- /dev/null +++ b/examples/atum-examples/README.md @@ -0,0 +1,63 @@ +# Atum Spark Job Application Example + +This is a set of Atum Apache Spark Applications that can be used as inspiration for creating other +Spark projects. It includes all dependencies in a 'fat' jar to run the job locally and on a cluster. + +Here is the list of examples (all from `za.co.absa.atum.examples` space): + +- `SampleMeasurements{1|2|3}` - Example apps using core Atum to show the Atum initialization, +checkpoint setup and the resulting control measure handling (in the form of `_INFO` file) +- `CreateInfoFileTool[CSV]` - Applications demonstrating the means of creating the initial `_INFO` for data (CSV or general) + +## Usage + +The example application is in `za.co.absa.atum.examples` package. The project contains build files for `Maven`. + +## Maven + +**To test-run this locally use** +```shell script +mvn test +``` +(This will run the `SampleMeasurements{1|2|3}` jobs via `SampleMeasurementsHdfsRunnerSpec` as tests) + +**To build an uber jar to run on cluster** +```shell script +mvn package -DskipTests=true +``` + +## Scala and Spark version switching +Same as Atum itself, the example project also supports switching to build with different Scala and Spark version: + +Switching Scala version (2.11 or 2.12) can be done via +```shell script +mvn scala-cross-build:change-version -Pscala-2.11 # this is default +# or +mvn scala-cross-build:change-version -Pscala-2.12 +``` + +Choosing a spark version to build, there are `spark-2.4` and `spark-3.1` profiles: +```shell script +mvn clean install -Pspark-2.4 # this is default +mvn clean install -Pspark-3.1 +``` + +## Running via spark-submit + +After the project is packaged you can copy `target/2.11/atum-examples_2.11-0.0.1-SNAPSHOT.jar` +to an edge node of a cluster and use `spark-submit` to run the job. Here is an example when running on Yarn: + +```shell script +spark-submit --master yarn --deploy-mode client --class za.co.absa.atum.examples.SampleMeasurements1 atum-examples_2.11-0.0.1-SNAPSHOT.jar +``` + +### Running Spark Applications in local mode from an IDE +If you try to run the example from an IDE you'll likely get the following exception: +```Exception in thread "main" java.lang.NoClassDefFoundError: scala/Option``` + +This is because the jar is created with all Scala and Spark dependencies removed (using shade plugin). This is done so that the uber jar for `spark-submit` is not too big. + +There are multiple options to deal with it, namely: + - use the test runner class, for the SampleMeasurements, it is `SampleMeasurementsHdfsRunnerSpec` (provided dependencies will be loaded for tests) + - use the _Include dependencies with "Provided" scope_ option in Run Configuration in IDEA or equivalent in your IDE. + - change the scope of `provided` dependencies to `compile` in the POM file and run Spark Applications as a normal JVM App. diff --git a/examples-s3-sdk-extension/data/input/wikidata.csv b/examples/atum-examples/data/input/wikidata.csv similarity index 100% rename from examples-s3-sdk-extension/data/input/wikidata.csv rename to examples/atum-examples/data/input/wikidata.csv diff --git a/examples/data/input/wikidata.csv.info b/examples/atum-examples/data/input/wikidata.csv.info similarity index 100% rename from examples/data/input/wikidata.csv.info rename to examples/atum-examples/data/input/wikidata.csv.info diff --git a/examples/atum-examples/pom.xml b/examples/atum-examples/pom.xml new file mode 100644 index 00000000..b9142e15 --- /dev/null +++ b/examples/atum-examples/pom.xml @@ -0,0 +1,218 @@ + + + + + + 4.0.0 + + za.co.absa + atum-examples_2.12 + 0.0.1-SNAPSHOT + + jar + + + 4.0.0-SNAPSHOT + UTF-8 + 1 + + + 2.11 + ${scala_2.11.version} + + 2.11.12 + 2.12.12 + + + 2.4.6 + 3.1.2 + + ${spark-24.version} + + + 2.12.12 + 2.12 + + + 2.0.2 + 2.3 + 3.2.0 + 3.0.1 + 0.2.1 + + + 0.0.27 + 3.2.2 + + + + + + + org.scala-lang + scala-library + ${scala.version} + provided + + + + + za.co.absa + atum_${scala.binary.version} + ${atum.version} + + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + provided + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + provided + + + + + za.co.absa.commons + commons_${scala.binary.version} + ${commons.version} + provided + + + + + org.scalatest + scalatest_${scala.binary.version} + ${scalatest.version} + compile + + + + + ${project.basedir}/target/${scala.binary.version} + ${project.build.directory}/classes + ${project.build.directory}/test-classes + + src/main/scala + src/test/scala + + + + net.alchim31.maven + scala-maven-plugin + ${scala.maven.plugin.version} + + false + incremental + + -Xfatal-warnings + -unchecked + -deprecation + -feature + + + + + + compile + testCompile + + + + + + org.scalatest + scalatest-maven-plugin + ${scalatest.maven.plugin.version} + + ${span.scale.factor} + + + + test + + test + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + ${maven.shade.plugin.version} + + + package + + shade + + + + + + + + com.github.wajda + scala-cross-build-maven-plugin + ${scala.cross.build.maven.plugin.version} + + ${default.scala.binary.version} + ${default.scala.version} + + + + + + + + + + scala-2.11 + + 2.11 + ${scala_2.11.version} + + + + scala-2.12 + + 2.12 + ${scala_2.12.version} + + + + + + spark-2.4 + + ${spark-24.version} + + + + spark-3.1 + + ${spark-31.version} + + + + diff --git a/examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileTool.scala b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileTool.scala similarity index 99% rename from examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileTool.scala rename to examples/atum-examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileTool.scala index 886271c0..1f68b7c5 100644 --- a/examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileTool.scala +++ b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileTool.scala @@ -77,5 +77,6 @@ object CreateInfoFileTool { ControlMeasureUtils.writeControlMeasureInfoFileToHadoopFs(cm, path, JsonType.Pretty)(fs) val strJson = cm.asJsonPretty + log.info(strJson) } } diff --git a/examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileToolCSV.scala b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileToolCSV.scala similarity index 100% rename from examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileToolCSV.scala rename to examples/atum-examples/src/main/scala/za/co/absa/atum/examples/CreateInfoFileToolCSV.scala diff --git a/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala similarity index 91% rename from examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala rename to examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala index 339c405f..65b8ce3c 100644 --- a/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala +++ b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala @@ -18,16 +18,21 @@ package za.co.absa.atum.examples import java.nio.file.{Files, Paths} import org.apache.hadoop.fs.FileSystem +import org.apache.log4j.LogManager import org.apache.spark.sql.{SaveMode, SparkSession} import org.scalatest.concurrent.Eventually import za.co.absa.atum.AtumImplicits._ -import scala.concurrent.duration.DurationInt // using basic Atum without extensions +import scala.concurrent.duration.DurationInt object SampleMeasurements1 extends Eventually { + + private val log = LogManager.getLogger(this.getClass) + def main(args: Array[String]) { val sparkBuilder = SparkSession.builder().appName("Sample Measurements 1 Job") val spark = sparkBuilder + // .master("local") // use this when running locally .getOrCreate() import spark.implicits._ @@ -39,7 +44,6 @@ object SampleMeasurements1 extends Eventually { .setControlMeasuresWorkflow("Job 1") // A business logic of a spark job ... - spark.read .option("header", "true") .option("inferSchema", "true") diff --git a/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala similarity index 73% rename from examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala rename to examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala index f0f70f37..d3721453 100644 --- a/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala +++ b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala @@ -15,17 +15,30 @@ package za.co.absa.atum.examples +import java.nio.file.{Files, Paths} + import org.apache.hadoop.fs.FileSystem +import org.apache.log4j.LogManager import org.apache.spark.sql.{SaveMode, SparkSession} -import za.co.absa.atum.AtumImplicits._ // using basic Atum without extensions +import org.scalatest.concurrent.Eventually +import za.co.absa.atum.AtumImplicits._ +import za.co.absa.atum.examples.SampleMeasurements1.{eventually, interval, scaled, timeout} + +import scala.concurrent.duration.DurationInt + +object SampleMeasurements2 extends Eventually { + + private val log = LogManager.getLogger(this.getClass) -object SampleMeasurements2 { def main(args: Array[String]) { // This example is intended to run AFTER SampleMeasurements1, otherwise it will fail on input file absence val sparkBuilder = SparkSession.builder().appName("Sample Measurements 2 Job") - val spark = sparkBuilder.getOrCreate() + val spark = sparkBuilder + // .master("local") // use this when running locally + .getOrCreate() + import spark.implicits._ val hadoopConfiguration = spark.sparkContext.hadoopConfiguration @@ -51,6 +64,12 @@ object SampleMeasurements2 { .write.mode(SaveMode.Overwrite) .parquet("data/output/stage2_job_results") + eventually(timeout(scaled(10.seconds)), interval(scaled(500.millis))) { + if (!Files.exists(Paths.get("data/output/stage2_job_results/_INFO"))) { + throw new Exception("_INFO file not found at data/output/stage2_job_results") + } + } + spark.disableControlMeasuresTracking() } } diff --git a/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala similarity index 65% rename from examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala rename to examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala index a6c8ff02..df9e9d4f 100644 --- a/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala +++ b/examples/atum-examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements3.scala @@ -15,22 +15,29 @@ package za.co.absa.atum.examples +import java.nio.file.{Files, Paths} + import org.apache.hadoop.fs.FileSystem +import org.apache.log4j.LogManager import org.apache.spark.sql.{SaveMode, SparkSession} import org.scalatest.concurrent.Eventually import za.co.absa.atum.AtumImplicits._ +import za.co.absa.atum.examples.SampleMeasurements2.{eventually, interval, scaled, timeout} import za.co.absa.atum.model.ControlMeasure import za.co.absa.atum.utils.{BuildProperties, FileUtils, SerializationUtils} -import java.nio.file.{Files, Paths} -import scala.concurrent.duration.DurationInt // using basic Atum without extensions - -case class MyBuildProperties(projectName: String, buildVersion: String) extends BuildProperties +import scala.concurrent.duration.DurationInt object SampleMeasurements3 extends Eventually { + case class MyBuildProperties(projectName: String, buildVersion: String) extends BuildProperties + + private val log = LogManager.getLogger(this.getClass) + def main(args: Array[String]) { val sparkBuilder = SparkSession.builder().appName("Sample Measurements 3 Job") - val spark = sparkBuilder.getOrCreate() + val spark = sparkBuilder + // .master("local") // use this when running locally + .getOrCreate() import spark.implicits._ val hadoopConfiguration = spark.sparkContext.hadoopConfiguration @@ -41,7 +48,6 @@ object SampleMeasurements3 extends Eventually { .setControlMeasuresWorkflow("Job 1") // A business logic of a spark job ... - spark.read .option("header", "true") .option("inferSchema", "true") @@ -52,17 +58,21 @@ object SampleMeasurements3 extends Eventually { .write.mode(SaveMode.Overwrite) .parquet("data/output/stage3_job_results") - eventually(timeout(scaled(10.seconds)), interval(scaled(500.millis))) { - if (!Files.exists(Paths.get("data/output/stage3_job_results/_INFO"))) + eventually(timeout(scaled(10.seconds)), interval(scaled(500.millis))) { // scaling will help on slow environments + if (!Files.exists(Paths.get("data/output/stage3_job_results/_INFO"))) { throw new Exception("_INFO file not found at data/output/stage3_job_results") - } + } + + val jsonInfoFile = FileUtils.readFileToString("data/output/stage3_job_results/_INFO") + val measureObject1: ControlMeasure = SerializationUtils.fromJson[ControlMeasure](jsonInfoFile) + val checkpoint = measureObject1.checkpoints.filter(_.name == "checkpoint1").head - val jsonInfoFile = FileUtils.readFileToString("data/output/stage3_job_results/_INFO") - val measureObject1: ControlMeasure = SerializationUtils.fromJson[ControlMeasure](jsonInfoFile) - val checkpoint = measureObject1.checkpoints.filter(_.name == "checkpoint1").head + if (!checkpoint.software.contains("MySoftware") || !checkpoint.version.contains("v007")) { + throw new Exception(s"Software or Version was not set properly. Got name ${checkpoint.software} and version ${checkpoint.version}") + } else { + log.info("_INFO file correctly contained custom SW Name and version.") + } - if (!checkpoint.software.contains("MySoftware") || !checkpoint.version.contains("v007")) { - throw new Exception(s"Software or Version was not set properly. Got name ${checkpoint.software} and version ${checkpoint.version}") } spark.disableControlMeasuresTracking() diff --git a/examples/src/test/scala/za/co/absa/atum/HdfsInfoIntegrationSuite.scala b/examples/atum-examples/src/test/scala/za/co/absa/atum/HdfsInfoIntegrationSuite.scala similarity index 100% rename from examples/src/test/scala/za/co/absa/atum/HdfsInfoIntegrationSuite.scala rename to examples/atum-examples/src/test/scala/za/co/absa/atum/HdfsInfoIntegrationSuite.scala diff --git a/examples/src/test/scala/za/co/absa/atum/LocalFsTestUtils.scala b/examples/atum-examples/src/test/scala/za/co/absa/atum/LocalFsTestUtils.scala similarity index 100% rename from examples/src/test/scala/za/co/absa/atum/LocalFsTestUtils.scala rename to examples/atum-examples/src/test/scala/za/co/absa/atum/LocalFsTestUtils.scala diff --git a/examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsHdfsRunnerSpec.scala b/examples/atum-examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsHdfsRunnerSpec.scala similarity index 94% rename from examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsHdfsRunnerSpec.scala rename to examples/atum-examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsHdfsRunnerSpec.scala index fc68d1c2..cb39aad8 100644 --- a/examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsHdfsRunnerSpec.scala +++ b/examples/atum-examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsHdfsRunnerSpec.scala @@ -22,7 +22,7 @@ class SampleMeasurementsHdfsRunnerSpec extends AnyFunSuite with SparkJobRunnerMethods with SparkLocalMaster { - // SampleMeasurement2 depends on SampleMeasurements1's output, so they must be run in this order + // SampleMeasurement2 depends on SampleMeasurements1's output (analogical for SM3), so they should be run in this order runSparkJobAsTest[SampleMeasurements1.type] runSparkJobAsTest[SampleMeasurements2.type] runSparkJobAsTest[SampleMeasurements3.type] diff --git a/examples/src/main/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala b/examples/atum-examples/src/test/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala similarity index 100% rename from examples/src/main/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala rename to examples/atum-examples/src/test/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala index 315b0057..4931af3b 100644 --- a/examples/src/main/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala +++ b/examples/atum-examples/src/test/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala @@ -16,8 +16,8 @@ package za.co.absa.atum.utils import org.scalatest.funsuite.AnyFunSuiteLike - import scala.language.reflectiveCalls + import scala.reflect.ClassTag import scala.reflect.runtime.universe diff --git a/examples/src/main/scala/za/co/absa/atum/utils/SparkLocalMaster.scala b/examples/atum-examples/src/test/scala/za/co/absa/atum/utils/SparkLocalMaster.scala similarity index 100% rename from examples/src/main/scala/za/co/absa/atum/utils/SparkLocalMaster.scala rename to examples/atum-examples/src/test/scala/za/co/absa/atum/utils/SparkLocalMaster.scala diff --git a/examples/pom.xml b/examples/pom.xml deleted file mode 100644 index a0ea70d0..00000000 --- a/examples/pom.xml +++ /dev/null @@ -1,120 +0,0 @@ - - - - - - 4.0.0 - - atum-examples_2.11 - jar - - - za.co.absa - atum-parent_2.11 - 4.0.0-SNAPSHOT - - - - ${project.parent.basedir}/scalastyle-config.xml - - - - - - org.scala-lang - scala-library - ${scala.version} - provided - - - - - za.co.absa - atum_${scala.binary.version} - ${project.version} - - - - - za.co.absa.commons - commons_${scala.binary.version} - ${commons.version} - test - - - - - org.scalatest - scalatest_${scala.binary.version} - ${scalatest.version} - provided - - - - - ${project.basedir}/target/${scala.binary.version} - ${project.build.directory}/classes - ${project.build.directory}/test-classes - - src/main/scala - src/test/scala - - - org.scalatest - scalatest-maven-plugin - ${scalatest.maven.version} - - false - - - - - org.apache.maven.plugins - maven-shade-plugin - ${maven.shade.version} - - - package - - shade - - - - - - org.scalastyle - scalastyle-maven-plugin - - - - - - - scala-2.11 - - 2.11 - ${scala_2.11.version} - - - - scala-2.12 - - 2.12 - ${scala_2.12.version} - - - - diff --git a/examples/s3-sdk-extension-examples/README.md b/examples/s3-sdk-extension-examples/README.md new file mode 100644 index 00000000..dc50cd43 --- /dev/null +++ b/examples/s3-sdk-extension-examples/README.md @@ -0,0 +1,60 @@ +# SDK-S3 Atum Spark Job Application Example + +This is a set of Atum Apache Spark Applications (using the SDK S3 Atum Extension) that can be used as inspiration for creating other +Spark projects. It includes all dependencies in a 'fat' jar to run the job locally and on a cluster. + +- `SampleSdkS3Measurements{1|2}` - Example apps using Atum SDK S3 Extension to show the Atum initialization, +checkpoint setup and the resulting control measure handling (in the form of `_INFO` file originating and lading to AWS S3) + +## Usage + +The example application is in `za.co.absa.atum.examples` package. The project contains build files for `Maven`. + +## Maven +**To build an uber jar to run on cluster** +```shell script +mvn package -DskipTests=true +``` + +## Scala and Spark version switching +Same as Atum itself, the example project also supports switching to build with different Scala and Spark version: + +Switching Scala version (2.11 or 2.12) can be done via +```shell script +mvn scala-cross-build:change-version -Pscala-2.11 # this is default +# or +mvn scala-cross-build:change-version -Pscala-2.12 +``` + +Choosing a spark version to build, there are `spark-2.4` and `spark-3.1` profiles: +```shell script +mvn clean install -Pspark-2.4 # this is default +mvn clean install -Pspark-3.1 +``` + +## Running Requirements +Since these example apps demonstrate cooperation with S3 resources, a number of environment prerequisites must be met +for the code to be truly runnable. Namely: + - having a AWS profile named `saml` in `~/.aws/credentials` + - having your bucket defined in `TOOLING_BUCKET_NAME` and your KMS Key ID in `TOOLING_KMS_KEY_ID` + (the example is written to enforce AWS-KMS server-side encryption) + +## Running via spark-submit + +After the project is packaged you can copy `target/2.11/atum-examples-s3-sdk-extension_2.11-0.0.1-SNAPSHOT` +to an edge node of a cluster and use `spark-submit` to run the job. Here is an example when running on Yarn: + +```shell script +spark-submit --master yarn --deploy-mode client --class za.co.absa.atum.examples.SampleSdkS3Measurements1 atum-examples-s3-sdk-extension_2.11-0.0.1-SNAPSHOT.jar +``` + +### Running Spark Applications in local mode from an IDE +If you try to run the example from an IDE you'll likely get the following exception: +```Exception in thread "main" java.lang.NoClassDefFoundError: scala/Option``` + +This is because the jar is created with all Scala and Spark dependencies removed (using shade plugin). This is done so that the uber jar for `spark-submit` is not too big. + +There are multiple options to deal with it, namely: + - use the test runner class, for the SampleSdkS3Measurements, it is `SampleMeasurementsS3RunnerExampleSpec` (provided dependencies will be loaded for tests) + - use the _Include dependencies with "Provided" scope_ option in Run Configuration in IDEA or equivalent in your IDE. + - change the scope of `provided` dependencies to `compile` in the POM file and run Spark Applications as a normal JVM App. diff --git a/examples/data/input/wikidata.csv b/examples/s3-sdk-extension-examples/data/input/wikidata.csv similarity index 100% rename from examples/data/input/wikidata.csv rename to examples/s3-sdk-extension-examples/data/input/wikidata.csv diff --git a/examples/s3-sdk-extension-examples/pom.xml b/examples/s3-sdk-extension-examples/pom.xml new file mode 100644 index 00000000..0b51c951 --- /dev/null +++ b/examples/s3-sdk-extension-examples/pom.xml @@ -0,0 +1,218 @@ + + + + + + 4.0.0 + + za.co.absa + atum-examples-s3-sdk-extension_2.11 + 0.0.1-SNAPSHOT + + jar + + + 4.0.0-SNAPSHOT + UTF-8 + + + 2.11 + ${scala_2.11.version} + + 2.11.12 + 2.12.12 + + + 2.4.6 + 3.1.2 + + ${spark-24.version} + + + 2.11.12 + 2.11 + + + 2.0.2 + 2.3 + 3.2.0 + 3.0.1 + 0.2.1 + + + 0.0.27 + 3.2.2 + + + + + + + org.scala-lang + scala-library + ${scala.version} + provided + + + + + za.co.absa + atum-s3-sdk-extension_${scala.binary.version} + ${atum.version} + + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + provided + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + provided + + + + + za.co.absa.commons + commons_${scala.binary.version} + ${commons.version} + provided + + + + + org.scalatest + scalatest_${scala.binary.version} + ${scalatest.version} + test + + + + + ${project.basedir}/target/${scala.binary.version} + ${project.build.directory}/classes + ${project.build.directory}/test-classes + + src/main/scala + src/test/scala + + + + net.alchim31.maven + scala-maven-plugin + ${scala.maven.plugin.version} + + false + incremental + + -Xfatal-warnings + -unchecked + -deprecation + -feature + + + + + + compile + testCompile + + + + + + org.scalatest + scalatest-maven-plugin + ${scalatest.maven.plugin.version} + + true + + + + test + + test + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + ${maven.shade.plugin.version} + + + package + + shade + + + + + + + + com.github.wajda + scala-cross-build-maven-plugin + ${scala.cross.build.maven.plugin.version} + + ${default.scala.binary.version} + ${default.scala.version} + + + + + + + + + + scala-2.11 + + 2.11 + ${scala_2.11.version} + + + + scala-2.12 + + 2.12 + ${scala_2.12.version} + + + + + + spark-2.4 + + ${spark-24.version} + + + + spark-3.1 + + ${spark-31.version} + + + + diff --git a/examples-s3-sdk-extension/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements1.scala b/examples/s3-sdk-extension-examples/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements1.scala similarity index 97% rename from examples-s3-sdk-extension/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements1.scala rename to examples/s3-sdk-extension-examples/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements1.scala index 0a0e8c84..c74a0289 100644 --- a/examples-s3-sdk-extension/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements1.scala +++ b/examples/s3-sdk-extension-examples/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements1.scala @@ -27,7 +27,7 @@ object SampleSdkS3Measurements1 { def main(args: Array[String]) { val sparkBuilder = SparkSession.builder().appName("Sample S3 Measurements 1 Job") val spark = sparkBuilder - // .master("local") + // .master("local") // use this when running locally .getOrCreate() import spark.implicits._ @@ -58,5 +58,6 @@ object SampleSdkS3Measurements1 { .parquet("data/output_s3/stage1_job_results") spark.disableControlMeasuresTracking() + spark.stop() } } diff --git a/examples-s3-sdk-extension/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements2.scala b/examples/s3-sdk-extension-examples/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements2.scala similarity index 91% rename from examples-s3-sdk-extension/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements2.scala rename to examples/s3-sdk-extension-examples/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements2.scala index 2d831e16..52d7b300 100644 --- a/examples-s3-sdk-extension/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements2.scala +++ b/examples/s3-sdk-extension-examples/src/main/scala/za/co/absa/atum/examples/SampleSdkS3Measurements2.scala @@ -34,8 +34,10 @@ object SampleSdkS3Measurements2 { // This example is intended to run AFTER SampleMeasurements1, otherwise it will fail on input file absence val sparkBuilder = SparkSession.builder().appName("Sample Measurements 2 Job") - //val spark = sparkBuilder.master("local").getOrCreate() - val spark = sparkBuilder.getOrCreate() + val spark = sparkBuilder + // .master("local") // use this when running locally + .getOrCreate() + import spark.implicits._ val hadoopConfiguration = spark.sparkContext.hadoopConfiguration @@ -77,6 +79,7 @@ object SampleSdkS3Measurements2 { .parquet("data/output_s3/stage2_job_results") spark.disableControlMeasuresTracking() + spark.stop() // checking info file presence via wrapped AWS SDK s3 val loader = ControlMeasuresSdkS3LoaderJsonFile(infoFileOutputLocation) @@ -85,7 +88,11 @@ object SampleSdkS3Measurements2 { val expectedCheckpointRecordCounts = Seq( "Source" -> 4964, "Raw" -> 4964, "checkpoint1" -> 3072, "checkpoint2" -> 1651) val extractedCounts = extractCheckpointsRecordCounts(controlMeasure) - assert(extractedCounts == expectedCheckpointRecordCounts, s"expecting control measure counts to be: $expectedCheckpointRecordCounts, but $extractedCounts found.") + if (extractedCounts == expectedCheckpointRecordCounts) { + log.info("Expected control measure counts check suceeded.") + } else { + throw new Exception(s"expecting control measure counts to be: $expectedCheckpointRecordCounts, but $extractedCounts found.") + } } private def extractCheckpointsRecordCounts(controlMeasure: ControlMeasure): Seq[(String, Int)] = { diff --git a/examples-s3-sdk-extension/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsS3RunnerExampleSpec.scala b/examples/s3-sdk-extension-examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsS3RunnerExampleSpec.scala similarity index 100% rename from examples-s3-sdk-extension/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsS3RunnerExampleSpec.scala rename to examples/s3-sdk-extension-examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsS3RunnerExampleSpec.scala diff --git a/examples/s3-sdk-extension-examples/src/test/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala b/examples/s3-sdk-extension-examples/src/test/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala new file mode 100644 index 00000000..4931af3b --- /dev/null +++ b/examples/s3-sdk-extension-examples/src/test/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.utils + +import org.scalatest.funsuite.AnyFunSuiteLike +import scala.language.reflectiveCalls + +import scala.reflect.ClassTag +import scala.reflect.runtime.universe + +trait SparkJobRunnerMethods { + this: AnyFunSuiteLike => + + private def runSparkJob[T](implicit ct: ClassTag[T]): Unit = { + type MainClass = {def main(args: Array[String]): Unit} + + val jobClass = ct.runtimeClass + val jobClassSymbol = universe.runtimeMirror(jobClass.getClassLoader).classSymbol(jobClass) + val jobInstance = + if (jobClassSymbol.isModuleClass) { + jobClass.getField("MODULE$").get(jobClass) + } else { + jobClass.getDeclaredConstructor().newInstance() + } + + jobInstance.asInstanceOf[MainClass].main(Array.empty) + } + + def runSparkJobAsTest[T](implicit ct: ClassTag[T]): Unit = { + val sampleName = ct.runtimeClass.getSimpleName + test(sampleName)(runSparkJob[T](ct)) + } +} diff --git a/examples/s3-sdk-extension-examples/src/test/scala/za/co/absa/atum/utils/SparkLocalMaster.scala b/examples/s3-sdk-extension-examples/src/test/scala/za/co/absa/atum/utils/SparkLocalMaster.scala new file mode 100644 index 00000000..2e0885c6 --- /dev/null +++ b/examples/s3-sdk-extension-examples/src/test/scala/za/co/absa/atum/utils/SparkLocalMaster.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.utils + +trait SparkLocalMaster { + System.getProperties.setProperty("spark.master", "local[*]") + + // in order to runSampleMeasuremts as tests, otherwise + // java.lang.IllegalArgumentException: System memory 259522560 must be at least 471859200... is thrown + System.getProperties.setProperty("spark.testing.memory", (1024*1024*1024).toString) // 1g +} diff --git a/model/src/test/scala/za/co/absa/atum/util/JacksonJsonSerializer.scala b/model/src/test/scala/za/co/absa/atum/util/JacksonJsonSerializer.scala index d009ffb4..06bc6c16 100644 --- a/model/src/test/scala/za/co/absa/atum/util/JacksonJsonSerializer.scala +++ b/model/src/test/scala/za/co/absa/atum/util/JacksonJsonSerializer.scala @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 ABSA Group Limited + * Copyright 2018 ABSA Group Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/pom.xml b/pom.xml index 11431e4b..79b1af47 100644 --- a/pom.xml +++ b/pom.xml @@ -25,11 +25,9 @@ pom - atum model - examples + atum atum-s3-sdk-extension - examples-s3-sdk-extension Atum @@ -94,10 +92,14 @@ yyyy-MM-dd'T'HH:mm:ssX 1.6 1.6 - 3.2.0 - 2.3 - 3.0.1 - 2.18.1 + 3.2.0 + 2.3 + 3.0.1 + 2.18.1 + 2.0.2 + + 2.4.2 + 0.2.1 atum-scm-server @@ -121,7 +123,6 @@ 3.5.3 2.10.4 - 2.0.2 3.2.2 1.7.25 2.5 @@ -241,7 +242,7 @@ net.alchim31.maven scala-maven-plugin - ${maven.scala.version} + ${scala.maven.plugin.version} false incremental @@ -275,7 +276,7 @@ maven-surefire-plugin - ${maven.surefire.version} + ${maven.surefire.plugin.version} true @@ -284,7 +285,7 @@ org.scalatest scalatest-maven-plugin - ${scalatest.maven.version} + ${scalatest.maven.plugin.version} ${project.build.directory}/surefire-reports . @@ -303,7 +304,7 @@ org.apache.maven.plugins maven-source-plugin - ${maven.sources.version} + ${maven.source.plugin.version} attach-sources @@ -316,7 +317,7 @@ org.apache.maven.plugins maven-release-plugin - 2.4.2 + ${maven.release.plugin.version} v@{project.version} true @@ -332,7 +333,7 @@ com.github.wajda scala-cross-build-maven-plugin - 0.2.1 + ${scala.cross.build.maven.plugin.version} ${default.scala.binary.version} ${default.scala.version} @@ -410,8 +411,10 @@ **/_INFO **/_SUCCESS **/org.apache.spark.sql.sources.DataSourceRegister - dependency-reduced-pom.xml + **/dependency-reduced-pom.xml **/org.mockito.plugins.MockMaker + **/.idea/** + **/*.iml