diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7f3914e..7912a96 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,13 +28,13 @@ jobs: - name: Install libuv run: sudo apt-get update && sudo apt-get install -y libuv1-dev - name: Setup Scala - uses: actions/setup-java@v4.0.0 + uses: actions/setup-java@v4.2.1 with: distribution: corretto java-version: '17' check-latest: true - name: Cache Dependencies - uses: coursier/cache-action@v6.4.4 + uses: coursier/cache-action@v6.4.5 - name: Check all code compiles run: sbt +Test/compile - name: Check artifacts build process @@ -53,13 +53,13 @@ jobs: - name: Install libuv run: sudo apt-get update && sudo apt-get install -y libuv1-dev - name: Setup Scala - uses: actions/setup-java@v4.0.0 + uses: actions/setup-java@v4.2.1 with: distribution: corretto java-version: '17' check-latest: true - name: Cache Dependencies - uses: coursier/cache-action@v6.4.4 + uses: coursier/cache-action@v6.4.5 - name: Check if the site workflow is up to date run: sbt ciCheckGithubWorkflow - name: Lint @@ -78,19 +78,19 @@ jobs: - name: Install libuv run: sudo apt-get update && sudo apt-get install -y libuv1-dev - name: Setup Scala - uses: actions/setup-java@v4.0.0 + uses: actions/setup-java@v4.2.1 with: distribution: corretto java-version: ${{ matrix.java }} check-latest: true - name: Cache Dependencies - uses: coursier/cache-action@v6.4.4 + uses: coursier/cache-action@v6.4.5 - name: Git Checkout uses: actions/checkout@v4.1.1 with: fetch-depth: '0' - name: Start up pubsub - run: docker compose up -d + run: docker compose up -d && until curl -s http://localhost:8085; do printf 'waiting for pubsub...'; sleep 1; done && echo "pubsub ready" - name: Test run: sbt +test update-readme: @@ -106,13 +106,13 @@ jobs: - name: Install libuv run: sudo apt-get update && sudo apt-get install -y libuv1-dev - name: Setup Scala - uses: actions/setup-java@v4.0.0 + uses: actions/setup-java@v4.2.1 with: distribution: corretto java-version: '17' check-latest: true - name: Cache Dependencies - uses: coursier/cache-action@v6.4.4 + uses: coursier/cache-action@v6.4.5 - name: Generate Readme run: sbt docs/generateReadme - name: Commit Changes @@ -129,7 +129,7 @@ jobs: app_private_key: ${{ secrets.APP_PRIVATE_KEY }} - name: Create Pull Request id: cpr - uses: peter-evans/create-pull-request@v5.0.2 + uses: peter-evans/create-pull-request@v6.0.2 with: body: |- Autogenerated changes after running the `sbt docs/generateReadme` command of the [zio-sbt-website](https://zio.dev/zio-sbt) plugin. @@ -180,13 +180,13 @@ jobs: - name: Install libuv run: sudo apt-get update && sudo apt-get install -y libuv1-dev - name: Setup Scala - uses: actions/setup-java@v4.0.0 + uses: actions/setup-java@v4.2.1 with: distribution: corretto java-version: '17' check-latest: true - name: Cache Dependencies - uses: coursier/cache-action@v6.4.4 + uses: coursier/cache-action@v6.4.5 - name: Release run: sbt ci-release env: @@ -209,13 +209,13 @@ jobs: - name: Install libuv run: sudo apt-get update && sudo apt-get install -y libuv1-dev - name: Setup Scala - uses: actions/setup-java@v4.0.0 + uses: actions/setup-java@v4.2.1 with: distribution: corretto java-version: '17' check-latest: true - name: Cache Dependencies - uses: coursier/cache-action@v6.4.4 + uses: coursier/cache-action@v6.4.5 - name: Setup NodeJs uses: actions/setup-node@v4 with: diff --git a/.scalafix.conf b/.scalafix.conf index d2ecdc9..b363c6f 100644 --- a/.scalafix.conf +++ b/.scalafix.conf @@ -1,5 +1,4 @@ rules = [ - Disable DisableSyntax ExplicitResultTypes LeakingImplicitClassVal @@ -11,19 +10,6 @@ rules = [ MissingFinal ] -Disable { - ifSynthetic = [ - "scala/Option.option2Iterable" - "scala/Predef.any2stringadd" - ] - - symbols = [ - "scala.Option.get", "scala.Some.get", "scala.None.get", - "scala.util.Either.LeftProjection.get", "scala.util.Either.RightProjection.get", - "scala.util.Try.get", "scala.util.Failure.get", "scala.util.Success.get" - ] -} - OrganizeImports { removeUnused = true targetDialect = Scala3 diff --git a/build.sbt b/build.sbt index 4bd0b21..e64a70d 100644 --- a/build.sbt +++ b/build.sbt @@ -1,20 +1,30 @@ import zio.sbt.githubactions.{Job, Step} enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin) +lazy val scala2Version = "2.13.14" + +lazy val scala3Version = "3.3.3" + inThisBuild( List( name := "ZIO Google Cloud Pub/Sub", - zioVersion := "2.0.22", + zioVersion := "2.1.3", organization := "com.anymindgroup", licenses := Seq(License.Apache2), homepage := Some(url("https://anymindgroup.com")), - crossScalaVersions := Seq("2.13.14", "3.3.3"), + scalaVersion := scala2Version, + crossScalaVersions := Seq(scala2Version, scala3Version), ciEnabledBranches := Seq("master"), ciJvmOptions ++= Seq("-Xms2G", "-Xmx2G", "-Xss4M", "-XX:+UseG1GC"), ciTargetJavaVersions := Seq("17", "21"), ciTestJobs := ciTestJobs.value.map { case j if j.id == "test" => - val startPubsub = Step.SingleStep(name = "Start up pubsub", run = Some("docker compose up -d")) + val startPubsub = Step.SingleStep( + name = "Start up pubsub", + run = Some( + "docker compose up -d && until curl -s http://localhost:8085; do printf 'waiting for pubsub...'; sleep 1; done && echo \"pubsub ready\"" + ), + ) j.copy(steps = j.steps.flatMap { case s: Step.SingleStep if s.name.contains("Git Checkout") => Seq(s, startPubsub) case s => Seq(s) @@ -46,7 +56,6 @@ lazy val commonSettings = List( Compile / scalacOptions --= sys.env.get("CI").fold(Seq("-Xfatal-warnings"))(_ => Nil), Test / scalafixConfig := Some(new File(".scalafix_test.conf")), Test / scalacOptions --= Seq("-Xfatal-warnings"), - version ~= { v => if (v.contains('+')) s"${v.replace('+', '-')}-SNAPSHOT" else v }, credentials += { for { username <- sys.env.get("ARTIFACT_REGISTRY_USERNAME") @@ -115,7 +124,7 @@ lazy val zioPubsubSerdeVulcan = (project in file("zio-gc-pubsub-serde-vulcan")) ) ) -val circeVersion = "0.14.6" +val circeVersion = "0.14.7" lazy val zioPubsubSerdeCirce = crossProject(JVMPlatform, NativePlatform) .in(file("zio-gc-pubsub-serde-circe")) .settings(moduleName := "zio-gc-pubsub-serde-circe") @@ -130,7 +139,7 @@ lazy val zioPubsubSerdeCirce = crossProject(JVMPlatform, NativePlatform) ) ) -val googleCloudPubsubVersion = "1.129.3" +val googleCloudPubsubVersion = "1.130.1" lazy val zioPubsubGoogle = (project in file("zio-gc-pubsub-google")) .settings(moduleName := "zio-gc-pubsub-google") .dependsOn(zioPubsub.jvm) diff --git a/docker-compose.yaml b/docker-compose.yaml index da7b0fc..25b3c0a 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,8 +1,7 @@ -version: "3.3" services: pubsub: # https://console.cloud.google.com/gcr/images/google.com:cloudsdktool/GLOBAL/cloud-sdk - image: gcr.io/google.com/cloudsdktool/cloud-sdk:461.0.0-emulators + image: gcr.io/google.com/cloudsdktool/cloud-sdk:480.0.0-emulators ports: - "8085:8085" command: gcloud beta emulators pubsub start --project=any --host-port=0.0.0.0:8085 diff --git a/project/build.properties b/project/build.properties index 49214c4..be54e77 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version = 1.9.9 +sbt.version = 1.10.0 diff --git a/project/plugins.sbt b/project/plugins.sbt index a037e5f..9a6efc8 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,14 +1,12 @@ -val zioSbtVersion = "0.4.0-alpha.23" +val zioSbtVersion = "0.4.0-alpha.25" addSbtPlugin("dev.zio" % "zio-sbt-ecosystem" % zioSbtVersion) addSbtPlugin("dev.zio" % "zio-sbt-website" % zioSbtVersion) addSbtPlugin("dev.zio" % "zio-sbt-ci" % zioSbtVersion) -addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.0") +addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.1") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2") -addSbtPlugin("com.github.sbt" % "sbt-dynver" % "5.0.1") - addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.12.1") addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.12") diff --git a/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/AvroPublisherSpec.scala b/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/AvroPublisherSpec.scala index 1a10822..c1e9d49 100644 --- a/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/AvroPublisherSpec.scala +++ b/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/AvroPublisherSpec.scala @@ -118,9 +118,9 @@ object AvroPublisherSpec extends ZIOSpecDefault { ) _ <- stream.via(Pipeline.processPipeline(e => consumedRef.getAndUpdate(_ :+ e.data))).runDrain.forkScoped testEvents = testEventsData.map(d => PublishMessage[TestEvent](d, None, Map.empty[String, String])) - _ <- ZIO.foreachDiscard(testEvents)(e => p.publish(e)) *> ZIO.sleep(200.millis) - consumed <- consumedRef.get - } yield assert(consumed)(equalTo(testEventsData.toVector)) + _ <- ZIO.foreachDiscard(testEvents)(e => p.publish(e)) + consumed <- consumedRef.get.repeatUntil(_.length == testEventsData.length).timeout(5.seconds) + } yield assert(consumed)(equalTo(Some(testEventsData.toVector))) } } ).provideSomeShared[Scope]( diff --git a/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala b/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala index 377e35f..72985cc 100644 --- a/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala +++ b/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala @@ -19,6 +19,7 @@ import zio.test.{ assert, assertCompletes, assertTrue, + assertZIO, check, } import zio.{Promise, Queue, Random, Ref, Schedule, Scope, ZIO, durationInt} @@ -141,8 +142,7 @@ object StreamingPullSubscriberSpec extends ZIOSpecDefault { .exit processedAckIds <- processedRef.get ackedAndNackedIds = ackedRef.get ++ nackedRef.get - queueEmpty <- ackQueue.isEmpty - _ <- assertTrue(queueEmpty) + _ <- assertZIO(ackQueue.size)(equalTo(0)) _ <- assertTrue(processedAckIds.size >= interruptAfterCount) _ <- assertTrue(ackedAndNackedIds.size >= interruptAfterCount) _ <- assert(processedAckIds)(hasSameElements(ackedAndNackedIds)) diff --git a/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala b/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala index 20e59aa..49dd6a4 100644 --- a/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala +++ b/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala @@ -16,7 +16,7 @@ import com.google.pubsub.v1.{ } import zio.stream.{ZStream, ZStreamAspect} -import zio.{Cause, Chunk, Exit, Promise, Queue, RIO, Schedule, Scope, UIO, ZIO} +import zio.{Cause, Chunk, Promise, Queue, RIO, Schedule, Scope, UIO, ZIO} private[pubsub] object StreamingPullSubscriber { private def settingsFromConfig( @@ -106,16 +106,17 @@ private[pubsub] object StreamingPullSubscriber { _ <- ZStream .scoped[Any]( for { - sc <- ZIO.service[Scope] - _ <- sc.addFinalizerExit { - case Exit.Success(_) => - for { - // cancel receiving stream before processing the rest of the queue - _ <- ZIO.succeed(bidiStream.cancel()) - _ <- processAckQueue(ackQueue, bidiStream, None) - } yield () - case _ => - ZIO.unit // no finalizers needed on failures as we expect the bidi stream to be recovered + _ <- ZIO.serviceWithZIO[Scope] { + _.addFinalizerExit { + case e if e.isSuccess || e.isInterrupted => + for { + // cancel receiving stream before processing the rest of the queue + _ <- ZIO.succeed(bidiStream.cancel()) + _ <- processAckQueue(ackQueue, bidiStream, None) + } yield () + case _ => + ZIO.unit // no finalizers needed on failures as we expect the bidi stream to be recovered + } } _ <- ackStream .runForeachScoped(_ => ZIO.unit) diff --git a/zio-gc-pubsub-serde-circe/shared/src/main/scala/com/anymindgroup/pubsub/serde/CirceSerde.scala b/zio-gc-pubsub-serde-circe/shared/src/main/scala/com/anymindgroup/pubsub/serde/CirceSerde.scala index 2be63a5..27e55ba 100644 --- a/zio-gc-pubsub-serde-circe/shared/src/main/scala/com/anymindgroup/pubsub/serde/CirceSerde.scala +++ b/zio-gc-pubsub-serde-circe/shared/src/main/scala/com/anymindgroup/pubsub/serde/CirceSerde.scala @@ -8,9 +8,11 @@ import zio.{RIO, ZIO} object CirceSerde { def fromCirceCodec[T](codec: Codec[T]): Serde[Any, T] = new Serde[Any, T] { + private implicit val c: Codec[T] = codec + override def serialize(data: T): RIO[Any, Array[Byte]] = ZIO.succeed(codec(data).noSpacesSortKeys.getBytes) override def deserialize(message: ReceivedMessage.Raw): RIO[Any, T] = - ZIO.fromEither(decode[T](new String(message.data))(codec)) + ZIO.fromEither(decode[T](new String(message.data))) } } diff --git a/zio-gc-pubsub-serde-vulcan/src/main/scala/com/anymindgroup/pubsub/serde/VulcanSerde.scala b/zio-gc-pubsub-serde-vulcan/src/main/scala/com/anymindgroup/pubsub/serde/VulcanSerde.scala index 713ff4c..cb2adb5 100644 --- a/zio-gc-pubsub-serde-vulcan/src/main/scala/com/anymindgroup/pubsub/serde/VulcanSerde.scala +++ b/zio-gc-pubsub-serde-vulcan/src/main/scala/com/anymindgroup/pubsub/serde/VulcanSerde.scala @@ -8,13 +8,15 @@ import zio.{RIO, ZIO} object VulcanSerde { def fromAvroCodec[T](codec: Codec[T], encoding: Encoding): Serde[Any, T] = new Serde[Any, T] { + private implicit val c: Codec[T] = codec + override def serialize(data: T): RIO[Any, Array[Byte]] = ZIO .fromEither(encoding match { case Encoding.Binary => - Codec.toBinary(data)(codec) + Codec.toBinary(data) case Encoding.Json => - Codec.toJson(data)(codec).map(_.getBytes) + Codec.toJson(data).map(_.getBytes) }) .mapError(_.throwable) @@ -22,9 +24,9 @@ object VulcanSerde { ZIO .fromEither(encoding match { case Encoding.Binary => - codec.schema.flatMap(sc => Codec.fromBinary(message.data, sc)(codec)) + codec.schema.flatMap(sc => Codec.fromBinary(message.data, sc)) case Encoding.Json => - codec.schema.flatMap(sc => Codec.fromJson(new String(message.data), sc)(codec)) + codec.schema.flatMap(sc => Codec.fromJson(new String(message.data), sc)) }) .mapError(_.throwable) }