From 516028ce879de5ba5d47b4ef9fd100487bc38327 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Tue, 8 Oct 2024 10:26:20 +0200 Subject: [PATCH 01/11] Implementing reader module --- .../absa/atum/agent/reader/ParitioningReader.scala | 6 ++++++ .../atum/agent/reader/ParitioningReaderImpl.scala | 12 ++++++++++++ 2 files changed, 18 insertions(+) create mode 100644 agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReader.scala create mode 100644 agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReaderImpl.scala diff --git a/agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReader.scala b/agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReader.scala new file mode 100644 index 00000000..56c9989a --- /dev/null +++ b/agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReader.scala @@ -0,0 +1,6 @@ +package za.co.absa.atum.agent.reader + +trait PartitionReader { + def getAdditionalData: Option[Any] + def getCheckpoints: List[String] +} diff --git a/agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReaderImpl.scala b/agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReaderImpl.scala new file mode 100644 index 00000000..9e942a95 --- /dev/null +++ b/agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReaderImpl.scala @@ -0,0 +1,12 @@ +package za.co.absa.atum.agent.reader + +class ParitioningReaderImpl (partitionData: Map[String, Any], checkpoints: List[String]) extends PartitionReader { + override def getAdditionalData: Option[Any] = { + partitionData.get("additionalData") + } + + override def getCheckpoints: List[String] = { + checkpoints + } + +} From f15be99325456cb478310fe130b5baf3cb28edf5 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Tue, 8 Oct 2024 10:30:57 +0200 Subject: [PATCH 02/11] Adding License --- .../atum/agent/reader/ParitioningReader.scala | 16 ++++++++++++++++ .../agent/reader/ParitioningReaderImpl.scala | 16 ++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReader.scala b/agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReader.scala index 56c9989a..826828d2 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReader.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReader.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package za.co.absa.atum.agent.reader trait PartitionReader { diff --git a/agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReaderImpl.scala b/agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReaderImpl.scala index 9e942a95..67118f0c 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReaderImpl.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReaderImpl.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package za.co.absa.atum.agent.reader class ParitioningReaderImpl (partitionData: Map[String, Any], checkpoints: List[String]) extends PartitionReader { From bec16a84695fffebff1fe47d577dff82492d4cb7 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Tue, 8 Oct 2024 14:31:48 +0200 Subject: [PATCH 03/11] Fixing return types --- ...ritioningReader.scala => PartitioningReader.scala} | 8 +++++--- ...gReaderImpl.scala => PartitioningReaderImpl.scala} | 11 +++++++---- 2 files changed, 12 insertions(+), 7 deletions(-) rename agent/src/main/scala/za/co/absa/atum/agent/reader/{ParitioningReader.scala => PartitioningReader.scala} (76%) rename agent/src/main/scala/za/co/absa/atum/agent/reader/{ParitioningReaderImpl.scala => PartitioningReaderImpl.scala} (64%) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReader.scala b/agent/src/main/scala/za/co/absa/atum/agent/reader/PartitioningReader.scala similarity index 76% rename from agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReader.scala rename to agent/src/main/scala/za/co/absa/atum/agent/reader/PartitioningReader.scala index 826828d2..9074978b 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReader.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/reader/PartitioningReader.scala @@ -16,7 +16,9 @@ package za.co.absa.atum.agent.reader -trait PartitionReader { - def getAdditionalData: Option[Any] - def getCheckpoints: List[String] +import za.co.absa.atum.model.dto.{AdditionalDataDTO, CheckpointV2DTO} + +trait PartitioningReader { + def getAdditionalData: Option[AdditionalDataDTO] + def getCheckpoints: List[CheckpointV2DTO] } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReaderImpl.scala b/agent/src/main/scala/za/co/absa/atum/agent/reader/PartitioningReaderImpl.scala similarity index 64% rename from agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReaderImpl.scala rename to agent/src/main/scala/za/co/absa/atum/agent/reader/PartitioningReaderImpl.scala index 67118f0c..717e3552 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/reader/ParitioningReaderImpl.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/reader/PartitioningReaderImpl.scala @@ -16,12 +16,15 @@ package za.co.absa.atum.agent.reader -class ParitioningReaderImpl (partitionData: Map[String, Any], checkpoints: List[String]) extends PartitionReader { - override def getAdditionalData: Option[Any] = { - partitionData.get("additionalData") +import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO, CheckpointV2DTO} + +class PartitioningReaderImpl(partitionData: Option[AdditionalDataDTO], checkpoints: List[CheckpointV2DTO]) + extends PartitioningReader { + override def getAdditionalData: Option[AdditionalDataDTO] = { + partitionData } - override def getCheckpoints: List[String] = { + override def getCheckpoints: List[CheckpointV2DTO] = { checkpoints } From 202ab196ac748af7079c793fff7612df4555590f Mon Sep 17 00:00:00 2001 From: AB019TC Date: Thu, 10 Oct 2024 10:23:39 +0200 Subject: [PATCH 04/11] Addition unit test --- .../agent/reader/PartitioningReaderImpl.scala | 2 +- .../PartitioningReaderImplUnitTest.scala | 47 +++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderImplUnitTest.scala diff --git a/agent/src/main/scala/za/co/absa/atum/agent/reader/PartitioningReaderImpl.scala b/agent/src/main/scala/za/co/absa/atum/agent/reader/PartitioningReaderImpl.scala index 717e3552..2dfa144d 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/reader/PartitioningReaderImpl.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/reader/PartitioningReaderImpl.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.agent.reader -import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO, CheckpointV2DTO} +import za.co.absa.atum.model.dto.{AdditionalDataDTO, CheckpointV2DTO} class PartitioningReaderImpl(partitionData: Option[AdditionalDataDTO], checkpoints: List[CheckpointV2DTO]) extends PartitioningReader { diff --git a/agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderImplUnitTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderImplUnitTest.scala new file mode 100644 index 00000000..3e739f56 --- /dev/null +++ b/agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderImplUnitTest.scala @@ -0,0 +1,47 @@ + + +package za.co.absa.atum.agent.reader + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO, CheckpointV2DTO} + +import java.util.UUID +import java.time.ZonedDateTime + +class PartitioningReaderImplUnitTest extends AnyFlatSpec with Matchers { + + "PartitioningReaderImpl" should "return the correct additional data" in { + val additionalDataItem = AdditionalDataItemDTO(Some("value"), "author") + val additionalData = Some(AdditionalDataDTO(Map("key" -> Some(additionalDataItem)))) + val checkpoints = List(CheckpointV2DTO( + UUID.randomUUID(), + "checkpoint1", + "author1", + measuredByAtumAgent = true, + ZonedDateTime.now(), + Some(ZonedDateTime.now().plusHours(1)), + Set.empty + )) + val reader = new PartitioningReaderImpl(additionalData, checkpoints) + + reader.getAdditionalData shouldEqual additionalData + } + + it should "return the correct checkpoints" in { + val additionalDataItem = AdditionalDataItemDTO(Some("value"), "author") + val additionalData = Some(AdditionalDataDTO(Map("key" -> Some(additionalDataItem)))) + val checkpoints = List(CheckpointV2DTO( + UUID.randomUUID(), + "checkpoint1", + "author1", + measuredByAtumAgent = true, + ZonedDateTime.now(), + Some(ZonedDateTime.now().plusHours(1)), + Set.empty + )) + val reader = new PartitioningReaderImpl(additionalData, checkpoints) + + reader.getCheckpoints shouldEqual checkpoints + } +} From 619fc6545f547d825d83192726474311f4bf1010 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Thu, 10 Oct 2024 13:04:15 +0200 Subject: [PATCH 05/11] Adding license --- .../reader/PartitioningReaderImplUnitTest.scala | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderImplUnitTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderImplUnitTest.scala index 3e739f56..3d29ceb3 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderImplUnitTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderImplUnitTest.scala @@ -1,4 +1,18 @@ - +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package za.co.absa.atum.agent.reader From 9580fc8739502026ecc71321cd7fbd856a1b70a1 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Thu, 10 Oct 2024 13:41:58 +0200 Subject: [PATCH 06/11] Fixing file name --- ...eaderImplUnitTest.scala => PartitioningReaderUnitTest.scala} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename agent/src/test/scala/za/co/absa/atum/agent/reader/{PartitioningReaderImplUnitTest.scala => PartitioningReaderUnitTest.scala} (96%) diff --git a/agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderImplUnitTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderUnitTest.scala similarity index 96% rename from agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderImplUnitTest.scala rename to agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderUnitTest.scala index 3d29ceb3..eaf0a46b 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderImplUnitTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderUnitTest.scala @@ -23,7 +23,7 @@ import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO, Chec import java.util.UUID import java.time.ZonedDateTime -class PartitioningReaderImplUnitTest extends AnyFlatSpec with Matchers { +class PartitioningReaderUnitTest extends AnyFlatSpec with Matchers { "PartitioningReaderImpl" should "return the correct additional data" in { val additionalDataItem = AdditionalDataItemDTO(Some("value"), "author") From 9643b48b144bbddd4b5c37c341bcf88880381498 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Thu, 10 Oct 2024 13:48:53 +0200 Subject: [PATCH 07/11] Fixing file name --- ...ngReaderUnitTest.scala => PartitioningReaderUnitTests.scala} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename agent/src/test/scala/za/co/absa/atum/agent/reader/{PartitioningReaderUnitTest.scala => PartitioningReaderUnitTests.scala} (96%) diff --git a/agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderUnitTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderUnitTests.scala similarity index 96% rename from agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderUnitTest.scala rename to agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderUnitTests.scala index eaf0a46b..f9976f09 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderUnitTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderUnitTests.scala @@ -23,7 +23,7 @@ import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO, Chec import java.util.UUID import java.time.ZonedDateTime -class PartitioningReaderUnitTest extends AnyFlatSpec with Matchers { +class PartitioningReaderUnitTests extends AnyFlatSpec with Matchers { "PartitioningReaderImpl" should "return the correct additional data" in { val additionalDataItem = AdditionalDataItemDTO(Some("value"), "author") From 7c891e54c693eafe3112e134a408e244b8b555bf Mon Sep 17 00:00:00 2001 From: AB019TC Date: Thu, 17 Oct 2024 17:12:59 +0200 Subject: [PATCH 08/11] Fixing file name --- .../dispatcher/CapturingDispatcher.scala | 24 ++++++++ .../atum/agent/dispatcher/Dispatcher.scala | 23 ++++--- .../agent/dispatcher/HttpDispatcher.scala | 32 ++++++++++ .../agent/reader/PartitioningReader.scala | 24 -------- .../agent/reader/PartitioningReaderImpl.scala | 31 ---------- .../reader/PartitioningReaderUnitTests.scala | 61 ------------------- build.sbt | 1 + .../absa/atum/reader/PartitioningReader.scala | 44 +++++++++++-- .../reader/PartitioningReaderUnitTests.scala | 8 +-- 9 files changed, 116 insertions(+), 132 deletions(-) delete mode 100644 agent/src/main/scala/za/co/absa/atum/agent/reader/PartitioningReader.scala delete mode 100644 agent/src/main/scala/za/co/absa/atum/agent/reader/PartitioningReaderImpl.scala delete mode 100644 agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderUnitTests.scala diff --git a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcher.scala b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcher.scala index 9a01c642..5a4dd26b 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcher.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcher.scala @@ -143,6 +143,30 @@ class CapturingDispatcher(config: Config) extends Dispatcher(config) { val result = AtumContextDTO(partitioning.partitioning) captureFunctionCall(partitioning, result) } + + /** + * This method is used to get the Partitioning Additional data from the server. + * + * @param partitioning : Partitioning to obtain ID for. + * @return AdditionalDataDTO. + */ + override protected[agent] def getAdditionalData(partitioning: PartitioningDTO): AdditionalDataDTO = { + val result = AdditionalDataDTO( + Map( + "key1" -> Some(AdditionalDataItemDTO(Some("value1"), "user1")), + "key2" -> Some(AdditionalDataItemDTO(Some("value2"), "user2")) + ) + ) + captureFunctionCall(partitioning, result) + } + + /** + * This method is used to get the partitioning ID from the server. + * + * @param partitioning : Partitioning to obtain ID for. + * @return Long ID of the partitioning. + */ + override protected[agent] def getCheckpoints(partitioning: PartitioningDTO): Seq[CheckpointV2DTO] = ??? } object CapturingDispatcher { diff --git a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala index 0580f844..eb118a48 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala @@ -17,14 +17,7 @@ package za.co.absa.atum.agent.dispatcher import com.typesafe.config.Config -import za.co.absa.atum.model.dto.{ - AdditionalDataDTO, - AdditionalDataPatchDTO, - AtumContextDTO, - CheckpointDTO, - PartitioningDTO, - PartitioningSubmitDTO -} +import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, AtumContextDTO, CheckpointDTO, CheckpointV2DTO, PartitioningDTO, PartitioningSubmitDTO} /** * This class provides a contract for different dispatchers. It has a constructor foe eventual creation via reflection. @@ -56,4 +49,18 @@ abstract class Dispatcher(config: Config) { partitioning: PartitioningDTO, additionalDataPatchDTO: AdditionalDataPatchDTO ): AdditionalDataDTO + + /** + * This method is used to get the Additional data from the server. + * @param partitioning: Partitioning to obtain ID for. + * @return AdditionalDataDTO. + */ + protected[agent] def getAdditionalData(partitioning: PartitioningDTO): AdditionalDataDTO + + /** + * This method is used to get the partitioning ID from the server. + * @param partitioning: Partitioning to obtain ID for. + * @return Long ID of the partitioning. + */ + protected[agent] def getCheckpoints(partitioning: PartitioningDTO): Seq[CheckpointV2DTO] } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala index e2202ba9..d93e4760 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala @@ -44,6 +44,8 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging { private val getPartitioningIdEndpoint = Uri.unsafeParse(s"$serverUrl$apiV2/$partitioningsPath") private def createAdditionalDataEndpoint(partitioningId: Long): Uri = Uri.unsafeParse(s"$serverUrl$apiV2/$partitioningsPath/$partitioningId/additional-data") + private def getAdditionalDataEndpoint(partitioningId: Long): Uri = + Uri.unsafeParse(s"$serverUrl$apiV2/$partitioningsPath/$partitioningId/additional-data") private val commonAtumRequest = basicRequest .header("Content-Type", "application/json") @@ -105,6 +107,36 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging { handleResponseBody(response).as[SingleSuccessResponse[AdditionalDataDTO]].data } + override def getAdditionalData(partitioning: PartitioningDTO): AdditionalDataDTO = { + val partitioningId = getPartitioningId(partitioning) + log.debug(s"Got partitioning ID: '$partitioningId'") + + val request = commonAtumRequest + .get(getAdditionalDataEndpoint(partitioningId)) + + val response = backend.send(request) + + handleResponseBody(response).as[SingleSuccessResponse[AdditionalDataDTO]].data + } + + /** + * Fetches checkpoints for the given partitioning. + * + * @param partitioning The partitioning for which to fetch checkpoints. + * @return List of CheckpointDTO containing the checkpoints. + */ + override def getCheckpoints(partitioning: PartitioningDTO): List[CheckpointV2DTO] = { + val partitioningId = getPartitioningId(partitioning) + log.debug(s"Got partitioning ID: '$partitioningId'") + + val request = commonAtumRequest + .get(getAdditionalDataEndpoint(partitioningId)) + + val response = backend.send(request) + + handleResponseBody(response).as[SingleSuccessResponse[List[CheckpointV2DTO]]].data + } + private def handleResponseBody(response: Response[Either[String, String]]): String = { response.body match { case Left(body) => throw HttpException(response.code.code, body) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/reader/PartitioningReader.scala b/agent/src/main/scala/za/co/absa/atum/agent/reader/PartitioningReader.scala deleted file mode 100644 index 9074978b..00000000 --- a/agent/src/main/scala/za/co/absa/atum/agent/reader/PartitioningReader.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2021 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.atum.agent.reader - -import za.co.absa.atum.model.dto.{AdditionalDataDTO, CheckpointV2DTO} - -trait PartitioningReader { - def getAdditionalData: Option[AdditionalDataDTO] - def getCheckpoints: List[CheckpointV2DTO] -} diff --git a/agent/src/main/scala/za/co/absa/atum/agent/reader/PartitioningReaderImpl.scala b/agent/src/main/scala/za/co/absa/atum/agent/reader/PartitioningReaderImpl.scala deleted file mode 100644 index 2dfa144d..00000000 --- a/agent/src/main/scala/za/co/absa/atum/agent/reader/PartitioningReaderImpl.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2021 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.atum.agent.reader - -import za.co.absa.atum.model.dto.{AdditionalDataDTO, CheckpointV2DTO} - -class PartitioningReaderImpl(partitionData: Option[AdditionalDataDTO], checkpoints: List[CheckpointV2DTO]) - extends PartitioningReader { - override def getAdditionalData: Option[AdditionalDataDTO] = { - partitionData - } - - override def getCheckpoints: List[CheckpointV2DTO] = { - checkpoints - } - -} diff --git a/agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderUnitTests.scala b/agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderUnitTests.scala deleted file mode 100644 index f9976f09..00000000 --- a/agent/src/test/scala/za/co/absa/atum/agent/reader/PartitioningReaderUnitTests.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2021 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.atum.agent.reader - -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers -import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO, CheckpointV2DTO} - -import java.util.UUID -import java.time.ZonedDateTime - -class PartitioningReaderUnitTests extends AnyFlatSpec with Matchers { - - "PartitioningReaderImpl" should "return the correct additional data" in { - val additionalDataItem = AdditionalDataItemDTO(Some("value"), "author") - val additionalData = Some(AdditionalDataDTO(Map("key" -> Some(additionalDataItem)))) - val checkpoints = List(CheckpointV2DTO( - UUID.randomUUID(), - "checkpoint1", - "author1", - measuredByAtumAgent = true, - ZonedDateTime.now(), - Some(ZonedDateTime.now().plusHours(1)), - Set.empty - )) - val reader = new PartitioningReaderImpl(additionalData, checkpoints) - - reader.getAdditionalData shouldEqual additionalData - } - - it should "return the correct checkpoints" in { - val additionalDataItem = AdditionalDataItemDTO(Some("value"), "author") - val additionalData = Some(AdditionalDataDTO(Map("key" -> Some(additionalDataItem)))) - val checkpoints = List(CheckpointV2DTO( - UUID.randomUUID(), - "checkpoint1", - "author1", - measuredByAtumAgent = true, - ZonedDateTime.now(), - Some(ZonedDateTime.now().plusHours(1)), - Set.empty - )) - val reader = new PartitioningReaderImpl(additionalData, checkpoints) - - reader.getCheckpoints shouldEqual checkpoints - } -} diff --git a/build.sbt b/build.sbt index 0c2f6b1e..67bd1406 100644 --- a/build.sbt +++ b/build.sbt @@ -121,3 +121,4 @@ lazy val reader = (projectMatrix in file("reader")) ) .addScalaCrossBuild(Setup.clientSupportedScalaVersions, Dependencies.readerDependencies) .dependsOn(model) + .dependsOn(agent) diff --git a/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala b/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala index d1153e4b..9cae666a 100644 --- a/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala +++ b/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala @@ -16,9 +16,45 @@ package za.co.absa.atum.reader -class PartitioningReader { - def foo(): String = { - // just to have some testable content - "bar" +import com.typesafe.config.Config +import io.circe.syntax.EncoderOps +//import za.co.absa.atum.agent.AtumAgent.dispatcher +import za.co.absa.atum.agent.dispatcher.HttpDispatcher +import za.co.absa.atum.model.dto.{CheckpointV2DTO, PartitioningDTO} + +class PartitioningReader(config: Config) { + + private val dispatcher: HttpDispatcher = new HttpDispatcher(config) + + /** + * Fetches additional data for the given partitioning. + * @param partitioning The partitioning for which to fetch additional data. + * @return AdditionalDataDTO containing the additional data. + */ + def getAdditionalData(partitioning: PartitioningDTO): String = { + dispatcher.getAdditionalData(partitioning).asJson.noSpaces + } + + /** + * Fetches checkpoints for the given partitioning. + * @param partitioning The partitioning for which to fetch checkpoints. + * @return List of CheckpointDTO containing the checkpoints. + */ + def getCheckpoints(partitioning: PartitioningDTO): List[CheckpointV2DTO] = { + dispatcher.getCheckpoints(partitioning).applyOrElse(partitioning, _ => List.empty) + } + +} + +object PartitioningReader { + def apply(config: Config): PartitioningReader = new PartitioningReader(config) + + def apply(): PartitioningReader = new PartitioningReader(null) + + def apply(config: Config, partitioning: PartitioningDTO): PartitioningReader = { + val reader = new PartitioningReader(config) + reader.getAdditionalData(partitioning) + reader.getCheckpoints(partitioning) + reader } } diff --git a/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala b/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala index c4221d8c..977efca3 100644 --- a/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala +++ b/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala @@ -19,8 +19,8 @@ package za.co.absa.atum.reader import org.scalatest.funsuite.AnyFunSuiteLike class PartitioningReaderUnitTests extends AnyFunSuiteLike { - test("foo") { - val expected = new PartitioningReader().foo() - assert(expected == "bar") - } +// test("foo") { +// val expected = new PartitioningReader().foo() +// assert(expected == "bar") +// } } From 5edac2e7fb03eb9c99d193b8616fdf03d4497862 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Fri, 18 Oct 2024 15:22:56 +0200 Subject: [PATCH 09/11] Remove get additioinalData and checkpoint implementation from agent --- .../dispatcher/CapturingDispatcher.scala | 24 ----------- .../atum/agent/dispatcher/Dispatcher.scala | 22 ++++------ .../agent/dispatcher/HttpDispatcher.scala | 32 --------------- build.sbt | 2 +- .../co/absa/atum/model/types/BasicTypes.scala | 8 ++++ .../co/absa/atum/model/types/Checkpoint.scala | 9 ++++ .../za/co/absa/atum/reader/Dispatcher.scala | 37 +++++++++++++++++ .../absa/atum/reader/PartitioningReader.scala | 41 ++++++++----------- .../server/GenericServerConnection.scala | 10 +++++ 9 files changed, 89 insertions(+), 96 deletions(-) create mode 100644 model/src/main/scala/za/co/absa/atum/model/types/BasicTypes.scala create mode 100644 model/src/main/scala/za/co/absa/atum/model/types/Checkpoint.scala create mode 100644 reader/src/main/scala/za/co/absa/atum/reader/Dispatcher.scala create mode 100644 reader/src/main/scala/za/co/absa/atum/reader/server/GenericServerConnection.scala diff --git a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcher.scala b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcher.scala index 5a4dd26b..9a01c642 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcher.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcher.scala @@ -143,30 +143,6 @@ class CapturingDispatcher(config: Config) extends Dispatcher(config) { val result = AtumContextDTO(partitioning.partitioning) captureFunctionCall(partitioning, result) } - - /** - * This method is used to get the Partitioning Additional data from the server. - * - * @param partitioning : Partitioning to obtain ID for. - * @return AdditionalDataDTO. - */ - override protected[agent] def getAdditionalData(partitioning: PartitioningDTO): AdditionalDataDTO = { - val result = AdditionalDataDTO( - Map( - "key1" -> Some(AdditionalDataItemDTO(Some("value1"), "user1")), - "key2" -> Some(AdditionalDataItemDTO(Some("value2"), "user2")) - ) - ) - captureFunctionCall(partitioning, result) - } - - /** - * This method is used to get the partitioning ID from the server. - * - * @param partitioning : Partitioning to obtain ID for. - * @return Long ID of the partitioning. - */ - override protected[agent] def getCheckpoints(partitioning: PartitioningDTO): Seq[CheckpointV2DTO] = ??? } object CapturingDispatcher { diff --git a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala index eb118a48..7b3a929a 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala @@ -17,7 +17,14 @@ package za.co.absa.atum.agent.dispatcher import com.typesafe.config.Config -import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, AtumContextDTO, CheckpointDTO, CheckpointV2DTO, PartitioningDTO, PartitioningSubmitDTO} +import za.co.absa.atum.model.dto.{ + AdditionalDataDTO, + AdditionalDataPatchDTO, + AtumContextDTO, + CheckpointDTO, + PartitioningDTO, + PartitioningSubmitDTO +} /** * This class provides a contract for different dispatchers. It has a constructor foe eventual creation via reflection. @@ -50,17 +57,4 @@ abstract class Dispatcher(config: Config) { additionalDataPatchDTO: AdditionalDataPatchDTO ): AdditionalDataDTO - /** - * This method is used to get the Additional data from the server. - * @param partitioning: Partitioning to obtain ID for. - * @return AdditionalDataDTO. - */ - protected[agent] def getAdditionalData(partitioning: PartitioningDTO): AdditionalDataDTO - - /** - * This method is used to get the partitioning ID from the server. - * @param partitioning: Partitioning to obtain ID for. - * @return Long ID of the partitioning. - */ - protected[agent] def getCheckpoints(partitioning: PartitioningDTO): Seq[CheckpointV2DTO] } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala index d93e4760..e2202ba9 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala @@ -44,8 +44,6 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging { private val getPartitioningIdEndpoint = Uri.unsafeParse(s"$serverUrl$apiV2/$partitioningsPath") private def createAdditionalDataEndpoint(partitioningId: Long): Uri = Uri.unsafeParse(s"$serverUrl$apiV2/$partitioningsPath/$partitioningId/additional-data") - private def getAdditionalDataEndpoint(partitioningId: Long): Uri = - Uri.unsafeParse(s"$serverUrl$apiV2/$partitioningsPath/$partitioningId/additional-data") private val commonAtumRequest = basicRequest .header("Content-Type", "application/json") @@ -107,36 +105,6 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging { handleResponseBody(response).as[SingleSuccessResponse[AdditionalDataDTO]].data } - override def getAdditionalData(partitioning: PartitioningDTO): AdditionalDataDTO = { - val partitioningId = getPartitioningId(partitioning) - log.debug(s"Got partitioning ID: '$partitioningId'") - - val request = commonAtumRequest - .get(getAdditionalDataEndpoint(partitioningId)) - - val response = backend.send(request) - - handleResponseBody(response).as[SingleSuccessResponse[AdditionalDataDTO]].data - } - - /** - * Fetches checkpoints for the given partitioning. - * - * @param partitioning The partitioning for which to fetch checkpoints. - * @return List of CheckpointDTO containing the checkpoints. - */ - override def getCheckpoints(partitioning: PartitioningDTO): List[CheckpointV2DTO] = { - val partitioningId = getPartitioningId(partitioning) - log.debug(s"Got partitioning ID: '$partitioningId'") - - val request = commonAtumRequest - .get(getAdditionalDataEndpoint(partitioningId)) - - val response = backend.send(request) - - handleResponseBody(response).as[SingleSuccessResponse[List[CheckpointV2DTO]]].data - } - private def handleResponseBody(response: Response[Either[String, String]]): String = { response.body match { case Left(body) => throw HttpException(response.code.code, body) diff --git a/build.sbt b/build.sbt index 67bd1406..2a638abd 100644 --- a/build.sbt +++ b/build.sbt @@ -121,4 +121,4 @@ lazy val reader = (projectMatrix in file("reader")) ) .addScalaCrossBuild(Setup.clientSupportedScalaVersions, Dependencies.readerDependencies) .dependsOn(model) - .dependsOn(agent) +// .dependsOn(agent) diff --git a/model/src/main/scala/za/co/absa/atum/model/types/BasicTypes.scala b/model/src/main/scala/za/co/absa/atum/model/types/BasicTypes.scala new file mode 100644 index 00000000..2d6ad6ec --- /dev/null +++ b/model/src/main/scala/za/co/absa/atum/model/types/BasicTypes.scala @@ -0,0 +1,8 @@ +package za.co.absa.atum.model.types + +import scala.collection.immutable.ListMap + +object BasicTypes { + type AtumPartitions = ListMap[String, String] + type AdditionalData = Map[String, Option[String]] +} diff --git a/model/src/main/scala/za/co/absa/atum/model/types/Checkpoint.scala b/model/src/main/scala/za/co/absa/atum/model/types/Checkpoint.scala new file mode 100644 index 00000000..4003730e --- /dev/null +++ b/model/src/main/scala/za/co/absa/atum/model/types/Checkpoint.scala @@ -0,0 +1,9 @@ +package za.co.absa.atum.model.types + +import za.co.absa.atum.model.types.BasicTypes.{AdditionalData, AtumPartitions} + +case class Checkpoint ( + id: String, + partitioning: AtumPartitions, + additionalData: AdditionalData, + ) diff --git a/reader/src/main/scala/za/co/absa/atum/reader/Dispatcher.scala b/reader/src/main/scala/za/co/absa/atum/reader/Dispatcher.scala new file mode 100644 index 00000000..8f43ac25 --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/Dispatcher.scala @@ -0,0 +1,37 @@ +package za.co.absa.atum.reader + +import za.co.absa.atum.model.dto.AdditionalDataDTO +import za.co.absa.atum.model.types.Checkpoint + +class Dispatcher { + + /** + * This method is used to get the Additional data from the server. + * + * @param partitioning : Partitioning to obtain ID for. + * @return AdditionalDataDTO. + */ + def getAdditionalData(partitioning: Partitioning): AdditionalDataDTO = ??? + + /** + * override protected[agent] def getAdditionalData(partitioning: PartitioningDTO): AdditionalDataDTO = { + * val partitioningId = getPartitioningId(partitioning) + * log.debug(s"Got partitioning ID: '$partitioningId'") + * + * val request = commonAtumRequest + * .get(getAdditionalDataEndpoint(partitioningId)) + * + * val response = backend.send(request) + * + * handleResponseBody(response).as[SingleSuccessResponse[AdditionalDataDTO]].data + * } + */ + + /** + * This method is used to get the partitioning ID from the server. + * + * @param partitioning : Partitioning to obtain ID for. + * @return Long ID of the partitioning. + */ + def getCheckpoints(partitioning: Partitioning): Seq[Checkpoint] = ??? +} diff --git a/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala b/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala index 9cae666a..df019b73 100644 --- a/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala +++ b/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala @@ -16,23 +16,26 @@ package za.co.absa.atum.reader -import com.typesafe.config.Config -import io.circe.syntax.EncoderOps -//import za.co.absa.atum.agent.AtumAgent.dispatcher +import cats.Monad +import za.co.absa.atum.model.types.BasicTypes.{AdditionalData, AtumPartitions} +import za.co.absa.atum.model.types.Checkpoint +import za.co.absa.atum.reader.server.GenericServerConnection +import za.co.absa.atum.agent.AtumAgent.dispatcher import za.co.absa.atum.agent.dispatcher.HttpDispatcher -import za.co.absa.atum.model.dto.{CheckpointV2DTO, PartitioningDTO} -class PartitioningReader(config: Config) { - - private val dispatcher: HttpDispatcher = new HttpDispatcher(config) +class PartitioningReader[F[_] : Monad](partitioning: AtumPartitions)(implicit serverConnection : GenericServerConnection[F]) { /** * Fetches additional data for the given partitioning. * @param partitioning The partitioning for which to fetch additional data. - * @return AdditionalDataDTO containing the additional data. + * @return AdditionalData containing the additional data. */ - def getAdditionalData(partitioning: PartitioningDTO): String = { - dispatcher.getAdditionalData(partitioning).asJson.noSpaces + def getAdditionalData(): F[AdditionalData] = { + // dispatcher.getAdditionalData(partitioning).asJson.noSpaces + // call uri to get id + // call the get partitioningAdditionalData + // transform envelope additionalDataDTO into AdditionalData +// serverConnection.getAdditionalData(partitioning) } /** @@ -40,21 +43,9 @@ class PartitioningReader(config: Config) { * @param partitioning The partitioning for which to fetch checkpoints. * @return List of CheckpointDTO containing the checkpoints. */ - def getCheckpoints(partitioning: PartitioningDTO): List[CheckpointV2DTO] = { - dispatcher.getCheckpoints(partitioning).applyOrElse(partitioning, _ => List.empty) + def getCheckpoints(): List[Checkpoint] = { + // Add optional parameters here. +// dispatcher.getCheckpoints(partitioning).applyOrElse(partitioning, _ => List.empty) } } - -object PartitioningReader { - def apply(config: Config): PartitioningReader = new PartitioningReader(config) - - def apply(): PartitioningReader = new PartitioningReader(null) - - def apply(config: Config, partitioning: PartitioningDTO): PartitioningReader = { - val reader = new PartitioningReader(config) - reader.getAdditionalData(partitioning) - reader.getCheckpoints(partitioning) - reader - } -} diff --git a/reader/src/main/scala/za/co/absa/atum/reader/server/GenericServerConnection.scala b/reader/src/main/scala/za/co/absa/atum/reader/server/GenericServerConnection.scala new file mode 100644 index 00000000..71fda5f7 --- /dev/null +++ b/reader/src/main/scala/za/co/absa/atum/reader/server/GenericServerConnection.scala @@ -0,0 +1,10 @@ +package za.co.absa.atum.reader.server + +import cats.Monad +import io.circe.Decoder + +import scala.util.Try + +abstract class GenericServerConnection[F[_] : Monad](val serverUrl: String) { + def query[R: Decoder](endpointUri: String): F[Try[R]] +} From c9053f2307e8e24d39bf6c920162a6fc7c75a0de Mon Sep 17 00:00:00 2001 From: AB019TC Date: Mon, 21 Oct 2024 12:22:37 +0200 Subject: [PATCH 10/11] Adding some mock and unit test. --- .../co/absa/atum/model/types/Checkpoint.scala | 14 ++- .../za/co/absa/atum/reader/Dispatcher.scala | 67 ++++++++++----- .../absa/atum/reader/PartitioningReader.scala | 39 ++++++--- .../reader/PartitioningReaderUnitTests.scala | 86 +++++++++++++++++-- 4 files changed, 163 insertions(+), 43 deletions(-) diff --git a/model/src/main/scala/za/co/absa/atum/model/types/Checkpoint.scala b/model/src/main/scala/za/co/absa/atum/model/types/Checkpoint.scala index 4003730e..1de91481 100644 --- a/model/src/main/scala/za/co/absa/atum/model/types/Checkpoint.scala +++ b/model/src/main/scala/za/co/absa/atum/model/types/Checkpoint.scala @@ -1,9 +1,15 @@ package za.co.absa.atum.model.types -import za.co.absa.atum.model.types.BasicTypes.{AdditionalData, AtumPartitions} +import za.co.absa.atum.model.dto.MeasurementDTO + +import java.time.ZonedDateTime case class Checkpoint ( id: String, - partitioning: AtumPartitions, - additionalData: AdditionalData, - ) + name: String, + author: String, + measuredByAtumAgent: Boolean = false, + processStartTime: ZonedDateTime, + processEndTime: Option[ZonedDateTime], + measurements: Set[MeasurementDTO] +) diff --git a/reader/src/main/scala/za/co/absa/atum/reader/Dispatcher.scala b/reader/src/main/scala/za/co/absa/atum/reader/Dispatcher.scala index 8f43ac25..a5206ed1 100644 --- a/reader/src/main/scala/za/co/absa/atum/reader/Dispatcher.scala +++ b/reader/src/main/scala/za/co/absa/atum/reader/Dispatcher.scala @@ -1,37 +1,60 @@ package za.co.absa.atum.reader -import za.co.absa.atum.model.dto.AdditionalDataDTO -import za.co.absa.atum.model.types.Checkpoint +import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO, CheckpointV2DTO} +import za.co.absa.atum.model.types.BasicTypes.AtumPartitions + +import java.time.ZonedDateTime +import java.util.UUID class Dispatcher { /** * This method is used to get the Additional data from the server. - * + * Mock method to return AdditionalDataDTO * @param partitioning : Partitioning to obtain ID for. * @return AdditionalDataDTO. */ - def getAdditionalData(partitioning: Partitioning): AdditionalDataDTO = ??? + def getAdditionalData(partitioning: AtumPartitions): AdditionalDataDTO = { + AdditionalDataDTO( + data = Map( + "key1" -> Some(AdditionalDataItemDTO(Some("value1"), "author1")), + "key2" -> None + ) + ) + } /** - * override protected[agent] def getAdditionalData(partitioning: PartitioningDTO): AdditionalDataDTO = { - * val partitioningId = getPartitioningId(partitioning) - * log.debug(s"Got partitioning ID: '$partitioningId'") - * - * val request = commonAtumRequest - * .get(getAdditionalDataEndpoint(partitioningId)) + * This method is used to get the Checkpoints from the server. * - * val response = backend.send(request) - * - * handleResponseBody(response).as[SingleSuccessResponse[AdditionalDataDTO]].data - * } - */ - - /** - * This method is used to get the partitioning ID from the server. - * - * @param partitioning : Partitioning to obtain ID for. - * @return Long ID of the partitioning. + * @param partitioning : Partitioning to obtain checkpoints for. + * @param limit : Limit of checkpoints to return. + * @param offset : Offset of checkpoints to return. + * @param checkpointName : Name of the checkpoint to return. + * @return List of CheckpointV2DTO. */ - def getCheckpoints(partitioning: Partitioning): Seq[Checkpoint] = ??? + def getCheckpoints( + partitioning: AtumPartitions, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String]): Seq[CheckpointV2DTO] = { + Seq( + CheckpointV2DTO( + id = UUID.randomUUID(), + name = "checkpoint1", + author = "author1", + measuredByAtumAgent = true, + processStartTime = ZonedDateTime.now(), + processEndTime = Some(ZonedDateTime.now().plusHours(1)), + measurements = Set.empty + ), + CheckpointV2DTO( + id = UUID.randomUUID(), + name = "checkpoint2", + author = "author2", + processStartTime = ZonedDateTime.now().minusDays(1), + processEndTime = None, + measurements = Set.empty + ) + ) + } } diff --git a/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala b/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala index df019b73..4232f417 100644 --- a/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala +++ b/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala @@ -20,22 +20,22 @@ import cats.Monad import za.co.absa.atum.model.types.BasicTypes.{AdditionalData, AtumPartitions} import za.co.absa.atum.model.types.Checkpoint import za.co.absa.atum.reader.server.GenericServerConnection -import za.co.absa.atum.agent.AtumAgent.dispatcher -import za.co.absa.atum.agent.dispatcher.HttpDispatcher -class PartitioningReader[F[_] : Monad](partitioning: AtumPartitions)(implicit serverConnection : GenericServerConnection[F]) { +import scala.language.higherKinds + +class PartitioningReader[F[_]: Monad](partitioning: AtumPartitions)( + implicit serverConnection: GenericServerConnection[F], dispatcher: Dispatcher) { /** * Fetches additional data for the given partitioning. * @param partitioning The partitioning for which to fetch additional data. * @return AdditionalData containing the additional data. */ - def getAdditionalData(): F[AdditionalData] = { - // dispatcher.getAdditionalData(partitioning).asJson.noSpaces - // call uri to get id - // call the get partitioningAdditionalData - // transform envelope additionalDataDTO into AdditionalData -// serverConnection.getAdditionalData(partitioning) + def getAdditionalData: F[AdditionalData] = { + Monad[F].pure(dispatcher.getAdditionalData(partitioning).data.map { + case (key, Some(itemDTO)) => key -> Some(itemDTO.value) + case (key, None) => key -> None + }) } /** @@ -43,9 +43,24 @@ class PartitioningReader[F[_] : Monad](partitioning: AtumPartitions)(implicit se * @param partitioning The partitioning for which to fetch checkpoints. * @return List of CheckpointDTO containing the checkpoints. */ - def getCheckpoints(): List[Checkpoint] = { - // Add optional parameters here. -// dispatcher.getCheckpoints(partitioning).applyOrElse(partitioning, _ => List.empty) + def getCheckpoints(limit: Option[Int], offset: Option[Long], checkpointName: Option[String]): F[List[Checkpoint]] = { + Monad[F].pure(dispatcher.getCheckpoints(partitioning, limit, offset, checkpointName).map { dto => + Checkpoint( + id = dto.id.toString, + name = dto.name, + author = dto.author, + measuredByAtumAgent = dto.measuredByAtumAgent, + processStartTime = dto.processStartTime, + processEndTime = dto.processEndTime, + measurements = dto.measurements + ) + }.toList) } } + +object PartitioningReader { + def apply[F[_]: Monad](partitioning: AtumPartitions)( + implicit serverConnection: GenericServerConnection[F], dispatcher: Dispatcher): PartitioningReader[F] = + new PartitioningReader[F](partitioning) +} diff --git a/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala b/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala index 977efca3..75ed3020 100644 --- a/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala +++ b/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala @@ -16,11 +16,87 @@ package za.co.absa.atum.reader +import cats.Id +import org.mockito.MockitoSugar import org.scalatest.funsuite.AnyFunSuiteLike +import org.scalatest.matchers.should.Matchers +import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO, CheckpointV2DTO} +import za.co.absa.atum.model.types.BasicTypes.{AtumPartitions, AdditionalData} +import za.co.absa.atum.model.types.Checkpoint +import za.co.absa.atum.reader.server.GenericServerConnection -class PartitioningReaderUnitTests extends AnyFunSuiteLike { -// test("foo") { -// val expected = new PartitioningReader().foo() -// assert(expected == "bar") -// } +import java.time.ZonedDateTime +import java.util.UUID + +class PartitioningReaderUnitTests extends AnyFunSuiteLike with Matchers with MockitoSugar { + + trait TestContext { + val partitioning: AtumPartitions = mock[AtumPartitions] + val serverConnection: GenericServerConnection[Id] = mock[GenericServerConnection[Id]] + val dispatcher: Dispatcher = mock[Dispatcher] + val reader: PartitioningReader[Id] = new PartitioningReader(partitioning)(serverConnection, dispatcher) + } + + test("getAdditionalData should fetch and transform additional data correctly") { + new TestContext { + val additionalDataDTO: AdditionalDataDTO = AdditionalDataDTO( + data = Map( + "key1" -> Some(AdditionalDataItemDTO(Some("value1"), "author1")), + "key2" -> None + ) + ) + + when(dispatcher.getAdditionalData(partitioning)).thenReturn(additionalDataDTO) + + val result: Id[AdditionalData] = reader.getAdditionalData + + result shouldEqual AdditionalData( + data = Map( + "key1" -> Some("value1"), + "key2" -> None + ) + ) + } + } + + test("getCheckpoints should fetch and transform checkpoints correctly") { + new TestContext { + val checkpointsDTO: Seq[CheckpointV2DTO] = Seq( + CheckpointV2DTO( + id = UUID.randomUUID(), + name = "checkpoint1", + author = "author1", + measuredByAtumAgent = true, + processStartTime = ZonedDateTime.now(), + processEndTime = Some(ZonedDateTime.now().plusHours(1)), + measurements = Set.empty + ), + CheckpointV2DTO( + id = UUID.randomUUID(), + name = "checkpoint2", + author = "author2", + measuredByAtumAgent = false, + processStartTime = ZonedDateTime.now().minusDays(1), + processEndTime = None, + measurements = Set.empty + ) + ) + + when(dispatcher.getCheckpoints(partitioning, Some(10), Some(0L), Some("checkpoint1"))).thenReturn(checkpointsDTO) + + val result: Id[List[Checkpoint]] = reader.getCheckpoints(Some(10), Some(0L), Some("checkpoint1")) + + result shouldEqual checkpointsDTO.map { dto => + Checkpoint( + id = dto.id.toString, + name = dto.name, + author = dto.author, + measuredByAtumAgent = dto.measuredByAtumAgent, + processStartTime = dto.processStartTime, + processEndTime = dto.processEndTime, + measurements = dto.measurements + ) + }.toList + } + } } From cecf988ea49014d284fe08a73dfc67b1a1736050 Mon Sep 17 00:00:00 2001 From: AB019TC Date: Fri, 25 Oct 2024 16:16:04 +0200 Subject: [PATCH 11/11] Re-implemented get additional data --- .../co/absa/atum/model/types/BasicTypes.scala | 29 +++++++ .../absa/atum/reader/PartitioningReader.scala | 81 +++++++++++++++---- .../server/GenericServerConnection.scala | 4 +- .../reader/PartitioningReaderUnitTests.scala | 43 +++++----- 4 files changed, 117 insertions(+), 40 deletions(-) diff --git a/model/src/main/scala/za/co/absa/atum/model/types/BasicTypes.scala b/model/src/main/scala/za/co/absa/atum/model/types/BasicTypes.scala index 2d6ad6ec..2fdf9cfa 100644 --- a/model/src/main/scala/za/co/absa/atum/model/types/BasicTypes.scala +++ b/model/src/main/scala/za/co/absa/atum/model/types/BasicTypes.scala @@ -1,8 +1,37 @@ package za.co.absa.atum.model.types +import za.co.absa.atum.model.dto.{AdditionalDataDTO, PartitionDTO, PartitioningDTO} + import scala.collection.immutable.ListMap object BasicTypes { type AtumPartitions = ListMap[String, String] type AdditionalData = Map[String, Option[String]] + + /** + * Object contains helper methods to work with Atum partitions. + */ + object AtumPartitions { + def apply(elems: (String, String)): AtumPartitions = { + ListMap(elems) + } + + def apply(elems: List[(String, String)]): AtumPartitions = { + ListMap(elems: _*) + } + + def toSeqPartitionDTO(atumPartitions: AtumPartitions): PartitioningDTO = { + atumPartitions.map { case (key, value) => PartitionDTO(key, value) }.toSeq + } + + def fromPartitioning(partitioning: PartitioningDTO): AtumPartitions = { + AtumPartitions(partitioning.map(partition => Tuple2(partition.key, partition.value)).toList) + } + } + + object AdditionalData { + def transformAdditionalDataDTO(additionalDataDTO: AdditionalDataDTO): AdditionalData = { + additionalDataDTO.data.map{ case (k, v) => (k, v.flatMap(_.value)) } + } + } } diff --git a/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala b/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala index 4232f417..7a0bcae5 100644 --- a/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala +++ b/reader/src/main/scala/za/co/absa/atum/reader/PartitioningReader.scala @@ -17,32 +17,80 @@ package za.co.absa.atum.reader import cats.Monad +import cats.implicits.{toFlatMapOps, toFunctorOps} +import za.co.absa.atum.model.dto.{AdditionalDataDTO, PartitioningWithIdDTO} +import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.model.types.BasicTypes.AdditionalData.transformAdditionalDataDTO import za.co.absa.atum.model.types.BasicTypes.{AdditionalData, AtumPartitions} import za.co.absa.atum.model.types.Checkpoint +import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax import za.co.absa.atum.reader.server.GenericServerConnection -import scala.language.higherKinds +//import scala.language.higherKinds +// +//class PartitioningReader[F[_]: Monad](partitioning: AtumPartitions)( +// implicit serverConnection: GenericServerConnection[F], dispatcher: Dispatcher) { +// +// /** +// * Fetches additional data for the given partitioning. +// * @param partitioning The partitioning for which to fetch additional data. +// * @return AdditionalData containing the additional data. +// */ +// def getAdditionalData: F[AdditionalData] = { +// Monad[F].pure(dispatcher.getAdditionalData(partitioning).data.map { +// case (key, Some(itemDTO)) => key -> Some(itemDTO.value.get) +// case (key, None) => key -> None +// }) +// } +// +// /** +// * Fetches checkpoints for the given partitioning. +// * @param partitioning The partitioning for which to fetch checkpoints. +// * @return List of CheckpointDTO containing the checkpoints. +// */ +// def getCheckpoints(limit: Option[Int], offset: Option[Long], checkpointName: Option[String]): F[List[Checkpoint]] = { +// Monad[F].pure(dispatcher.getCheckpoints(partitioning, limit, offset, checkpointName).map { dto => +// Checkpoint( +// id = dto.id.toString, +// name = dto.name, +// author = dto.author, +// measuredByAtumAgent = dto.measuredByAtumAgent, +// processStartTime = dto.processStartTime, +// processEndTime = dto.processEndTime, +// measurements = dto.measurements +// ) +// }.toList) +// } +// +//} +// +//object PartitioningReader { +// def apply[F[_]: Monad](partitioning: AtumPartitions)( +// implicit serverConnection: GenericServerConnection[F], dispatcher: Dispatcher): PartitioningReader[F] = +// new PartitioningReader[F](partitioning) +//} -class PartitioningReader[F[_]: Monad](partitioning: AtumPartitions)( + +class PartitioningReader[F[_]: Monad](atumPartitions: AtumPartitions)( implicit serverConnection: GenericServerConnection[F], dispatcher: Dispatcher) { - /** - * Fetches additional data for the given partitioning. - * @param partitioning The partitioning for which to fetch additional data. - * @return AdditionalData containing the additional data. - */ def getAdditionalData: F[AdditionalData] = { - Monad[F].pure(dispatcher.getAdditionalData(partitioning).data.map { - case (key, Some(itemDTO)) => key -> Some(itemDTO.value) - case (key, None) => key -> None - }) + val partitioningDTO = AtumPartitions.toSeqPartitionDTO(atumPartitions) + val encodedPartitioning = partitioningDTO.asBase64EncodedJsonString + + for { + partitioningIdEffect <- serverConnection.query[SingleSuccessResponse[PartitioningWithIdDTO]]( + s"/api/v2/partitionings/?partitioning=${encodedPartitioning}") + + partitioningId = partitioningIdEffect.data.id + + additionalDataEndpoint = s"/api/v2/partitionings/${partitioningId}/additional-data" + additionalDataEffect <- serverConnection.query[SingleSuccessResponse[AdditionalDataDTO]](additionalDataEndpoint) + + additionalData = transformAdditionalDataDTO(additionalDataEffect.data) + } yield additionalData } - /** - * Fetches checkpoints for the given partitioning. - * @param partitioning The partitioning for which to fetch checkpoints. - * @return List of CheckpointDTO containing the checkpoints. - */ def getCheckpoints(limit: Option[Int], offset: Option[Long], checkpointName: Option[String]): F[List[Checkpoint]] = { Monad[F].pure(dispatcher.getCheckpoints(partitioning, limit, offset, checkpointName).map { dto => Checkpoint( @@ -56,7 +104,6 @@ class PartitioningReader[F[_]: Monad](partitioning: AtumPartitions)( ) }.toList) } - } object PartitioningReader { diff --git a/reader/src/main/scala/za/co/absa/atum/reader/server/GenericServerConnection.scala b/reader/src/main/scala/za/co/absa/atum/reader/server/GenericServerConnection.scala index 71fda5f7..35425253 100644 --- a/reader/src/main/scala/za/co/absa/atum/reader/server/GenericServerConnection.scala +++ b/reader/src/main/scala/za/co/absa/atum/reader/server/GenericServerConnection.scala @@ -3,8 +3,6 @@ package za.co.absa.atum.reader.server import cats.Monad import io.circe.Decoder -import scala.util.Try - abstract class GenericServerConnection[F[_] : Monad](val serverUrl: String) { - def query[R: Decoder](endpointUri: String): F[Try[R]] + def query[R: Decoder](endpointUri: String): F[R] } diff --git a/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala b/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala index 75ed3020..a4f47c6c 100644 --- a/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala +++ b/reader/src/test/scala/za/co/absa/atum/reader/PartitioningReaderUnitTests.scala @@ -21,41 +21,43 @@ import org.mockito.MockitoSugar import org.scalatest.funsuite.AnyFunSuiteLike import org.scalatest.matchers.should.Matchers import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO, CheckpointV2DTO} -import za.co.absa.atum.model.types.BasicTypes.{AtumPartitions, AdditionalData} +import za.co.absa.atum.model.types.BasicTypes.{AdditionalData, AtumPartitions} import za.co.absa.atum.model.types.Checkpoint -import za.co.absa.atum.reader.server.GenericServerConnection import java.time.ZonedDateTime import java.util.UUID +import cats.Monad +import scala.collection.immutable.ListMap +//import za.co.absa.atum.reader.server.GenericServerConnection.Dispatcher class PartitioningReaderUnitTests extends AnyFunSuiteLike with Matchers with MockitoSugar { + private implicit val idMonad: Monad[Id] = Monad[Id] + trait TestContext { - val partitioning: AtumPartitions = mock[AtumPartitions] - val serverConnection: GenericServerConnection[Id] = mock[GenericServerConnection[Id]] - val dispatcher: Dispatcher = mock[Dispatcher] - val reader: PartitioningReader[Id] = new PartitioningReader(partitioning)(serverConnection, dispatcher) + val partitioning: AtumPartitions = ListMap("key1" -> "value1", "key2" -> "value2") + implicit val dispatcher: Dispatcher = mock[Dispatcher] + val reader: PartitioningReader[Id] = new PartitioningReader[Id](partitioning) } + protected val additionalDataDTO1: AdditionalDataDTO = AdditionalDataDTO( + Map( + "key1" -> Some(AdditionalDataItemDTO(Some("value1"), "author")), + "key2" -> None, + "key3" -> Some(AdditionalDataItemDTO(Some("value3"), "author")) + )) + test("getAdditionalData should fetch and transform additional data correctly") { new TestContext { - val additionalDataDTO: AdditionalDataDTO = AdditionalDataDTO( - data = Map( - "key1" -> Some(AdditionalDataItemDTO(Some("value1"), "author1")), - "key2" -> None - ) - ) - - when(dispatcher.getAdditionalData(partitioning)).thenReturn(additionalDataDTO) + when(dispatcher.getAdditionalData(partitioning)).thenReturn(additionalDataDTO1) val result: Id[AdditionalData] = reader.getAdditionalData - result shouldEqual AdditionalData( - data = Map( - "key1" -> Some("value1"), - "key2" -> None - ) - ) + Map( + "key1" -> Some("value1"), + "key2" -> None, + "key3" -> Some("value3") + ) shouldEqual result } } @@ -99,4 +101,5 @@ class PartitioningReaderUnitTests extends AnyFunSuiteLike with Matchers with Moc }.toList } } + }