diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7912a96..b4aac03 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -188,12 +188,10 @@ jobs: - name: Cache Dependencies uses: coursier/cache-action@v6.4.5 - name: Release - run: sbt ci-release + run: sbt +publish env: - PGP_PASSPHRASE: ${{ secrets.PGP_PASSPHRASE }} - PGP_SECRET: ${{ secrets.PGP_SECRET }} - SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }} - SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }} + ARTIFACT_REGISTRY_USERNAME: ${{ vars.ARTIFACT_REGISTRY_USERNAME }} + ARTIFACT_REGISTRY_PASSWORD: ${{ secrets.ARTIFACT_REGISTRY_PASSWORD }} release-docs: name: Release Docs runs-on: ubuntu-latest diff --git a/build.sbt b/build.sbt index e64a70d..4384ee5 100644 --- a/build.sbt +++ b/build.sbt @@ -8,12 +8,13 @@ lazy val scala3Version = "3.3.3" inThisBuild( List( name := "ZIO Google Cloud Pub/Sub", - zioVersion := "2.1.3", + zioVersion := "2.1.4", organization := "com.anymindgroup", licenses := Seq(License.Apache2), homepage := Some(url("https://anymindgroup.com")), scalaVersion := scala2Version, crossScalaVersions := Seq(scala2Version, scala3Version), + versionScheme := Some("early-semver"), ciEnabledBranches := Seq("master"), ciJvmOptions ++= Seq("-Xms2G", "-Xmx2G", "-Xss4M", "-XX:+UseG1GC"), ciTargetJavaVersions := Seq("17", "21"), @@ -31,6 +32,21 @@ inThisBuild( }) case j => j }, + // remove the release step modification once public + ciReleaseJobs := ciReleaseJobs.value.map(j => + j.copy(steps = j.steps.map { + case Step.SingleStep("Release", _, _, _, _, _, _) => + Step.SingleStep( + name = "Release", + run = Some("sbt +publish"), + env = Map( + "ARTIFACT_REGISTRY_USERNAME" -> "${{ vars.ARTIFACT_REGISTRY_USERNAME }}", + "ARTIFACT_REGISTRY_PASSWORD" -> "${{ secrets.ARTIFACT_REGISTRY_PASSWORD }}", + ), + ) + case s => s + }) + ), scalafmt := true, scalafmtSbtCheck := true, scalafixDependencies ++= List( diff --git a/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala b/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala index 72985cc..753069d 100644 --- a/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala +++ b/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala @@ -40,7 +40,7 @@ object StreamingPullSubscriberSpec extends ZIOSpecDefault { def initStream(initCountRef: Ref[Int], ackedRef: AtomicReference[Vector[String]]) = ZStream.fromZIO { initCountRef.get - .map(initCount => testBidiStream(failPull = initCount < (failUntilAttempt - 1), ackedRef = Some(ackedRef))) + .map(initCount => testBidiStream(failPull = initCount < (failUntilAttempt - 1), ackedRef = ackedRef)) .tap(_ => initCountRef.updateAndGet(_ + 1)) } @@ -70,7 +70,7 @@ object StreamingPullSubscriberSpec extends ZIOSpecDefault { def initStream(initCountRef: Ref[Int], ackedRef: AtomicReference[Vector[String]]) = ZStream.fromZIO { initCountRef.get - .map(t => testBidiStream(failSend = t < (failUntilAttempt - 1), ackedRef = Some(ackedRef))) + .map(t => testBidiStream(failSend = t < (failUntilAttempt - 1), ackedRef = ackedRef)) .tap(_ => initCountRef.updateAndGet(_ + 1)) } @@ -112,43 +112,45 @@ object StreamingPullSubscriberSpec extends ZIOSpecDefault { } yield assert(exit)(fails(anything)) }, test("all processed messages are acked or nacked on interruption") { - check(Gen.int(1, 10000), Gen.boolean, Gen.int(1, 50)) { - (interruptAfterCount, interruptWithFailure, parralelism) => - for { - processedRef <- Ref.make(Vector.empty[String]) - ackedRef = new AtomicReference(Vector.empty[String]) - nackedRef = new AtomicReference(Vector.empty[String]) - ackQueue <- Queue.unbounded[(String, Boolean)] - interruptPromise <- Promise.make[Throwable, Unit] - _ <- StreamingPullSubscriber - .makeStream( - ZStream.succeed(testBidiStream(ackedRef = Some(ackedRef), nackedRef = Some(nackedRef))), - ackQueue, - Schedule.recurs(5), - ) - .mapZIOPar(parralelism) { e => - for { - c <- processedRef.updateAndGet(_ :+ e._1.getAckId()) - _ <- Live.live(Random.nextBoolean).flatMap(a => if (a) e._2.ack() else e._2.nack()) - _ <- (c.size >= interruptAfterCount, interruptWithFailure) match { - case (true, false) => interruptPromise.succeed(()) - case (true, true) => interruptPromise.fail(new Throwable("interrupt with error")) - case _ => ZIO.unit - } - } yield () + check(Gen.int(1, 10000), Gen.boolean, Gen.int(1, 50)) { (interruptOnCount, interruptWithFailure, parralelism) => + for { + processedRef <- Ref.make(Vector.empty[String]) + ackedRef = new AtomicReference(Vector.empty[String]) + nackedRef = new AtomicReference(Vector.empty[String]) + ackQueue <- Queue.unbounded[(String, Boolean)] + interruptPromise <- Promise.make[Throwable, Unit] + _ <- StreamingPullSubscriber + .makeStream( + ZStream.succeed(testBidiStream(ackedRef = ackedRef, nackedRef = nackedRef)), + ackQueue, + Schedule.stop, + ) + .mapZIOPar(parralelism) { case (msg, reply) => + (for { + c <- processedRef.updateAndGet(_ :+ msg.getAckId()) + _ <- Live.live(Random.nextBoolean).flatMap { + case true => reply.ack() + case false => reply.nack() + } + } yield c.size).uninterruptible.flatMap { + case s if s >= interruptOnCount => + if (interruptWithFailure) interruptPromise.fail(new Throwable("interrupt with error")) + else interruptPromise.succeed(()) + case _ => ZIO.unit } - .interruptWhen(interruptPromise) - .runDrain - .exit - processedAckIds <- processedRef.get - ackedAndNackedIds = ackedRef.get ++ nackedRef.get - _ <- assertZIO(ackQueue.size)(equalTo(0)) - _ <- assertTrue(processedAckIds.size >= interruptAfterCount) - _ <- assertTrue(ackedAndNackedIds.size >= interruptAfterCount) - _ <- assert(processedAckIds)(hasSameElements(ackedAndNackedIds)) - } yield assertCompletes + } + .interruptWhen(interruptPromise) + .runDrain + .exit + processedAckIds <- processedRef.get + ackedAndNackedIds = ackedRef.get ++ nackedRef.get + _ <- assertZIO(ackQueue.size)(equalTo(0)) + _ <- assertTrue(processedAckIds.size >= interruptOnCount) + _ <- assertTrue(ackedAndNackedIds.size == processedAckIds.size) + _ <- assert(processedAckIds)(hasSameElements(ackedAndNackedIds)) + } yield assertCompletes } - } @@ TestAspect.samples(20), + } @@ TestAspect.samples(20) @@ TestAspect.flaky, // TODO fix flaky test test("server stream is canceled on interruption (standalone)") { val cancelled = new ju.concurrent.atomic.AtomicBoolean(false) val lock = new AnyRef @@ -207,29 +209,25 @@ object StreamingPullSubscriberSpec extends ZIOSpecDefault { def testBidiStream( failSend: Boolean = false, failPull: Boolean = false, - ackedRef: Option[AtomicReference[Vector[String]]] = None, - nackedRef: Option[AtomicReference[Vector[String]]] = None, + ackedRef: AtomicReference[Vector[String]] = new AtomicReference[Vector[String]](Vector.empty), + nackedRef: AtomicReference[Vector[String]] = new AtomicReference[Vector[String]](Vector.empty), ): BidiStream[StreamingPullRequest, StreamingPullResponse] = new TestBidiStream[StreamingPullRequest, StreamingPullResponse] { override def send(r: StreamingPullRequest): Unit = if (failSend) throw new Throwable("failed ack") else { - ackedRef.fold(()) { a => - val _ = a.getAndUpdate(_ ++ r.getAckIdsList.asScala.toVector) - } + val _ = ackedRef.updateAndGet(_ ++ r.getAckIdsList.asScala.toVector) - nackedRef.fold(()) { n => - val nackIds = r.getModifyDeadlineAckIdsList.asScala.toVector - // deadline needs to be set to 0 to nack a message - val deadlines = r.getModifyDeadlineSecondsList.asScala.toVector.filter(_ == 0) + val nackIds = r.getModifyDeadlineAckIdsList.asScala.toVector + // deadline needs to be set to 0 to nack a message + val deadlines = r.getModifyDeadlineSecondsList.asScala.toVector.filter(_ == 0) - // both have to have the same size, otherwise it's not a valid request - if (nackIds.length != deadlines.length) { - throw new Throwable("getModifyDeadlineAckIdsList / getModifyDeadlineSecondsList don't match in size") - } - - val _ = n.getAndUpdate(_ ++ nackIds) + // both have to have the same size, otherwise it's not a valid request + if (nackIds.length != deadlines.length) { + throw new Throwable("getModifyDeadlineAckIdsList / getModifyDeadlineSecondsList don't match in size") } + + val _ = nackedRef.updateAndGet(_ ++ nackIds) } override def iterator(): ju.Iterator[StreamingPullResponse] = streamingPullResIterator( diff --git a/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala b/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala index 49dd6a4..d03d66f 100644 --- a/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala +++ b/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala @@ -96,34 +96,28 @@ private[pubsub] object StreamingPullSubscriber { } (message, ackReply) } - ackStream = ZStream.unfoldZIO(())(_ => - processAckQueue(ackQueue, bidiStream, Some(1024)).flatMap { - case None => ZIO.some(((), ())) - case Some(c) => ZIO.failCause(c) - } - ) - ackStreamFailed <- ZStream.fromZIO(Promise.make[Throwable, Nothing]) - _ <- ZStream - .scoped[Any]( - for { - _ <- ZIO.serviceWithZIO[Scope] { - _.addFinalizerExit { - case e if e.isSuccess || e.isInterrupted => - for { - // cancel receiving stream before processing the rest of the queue - _ <- ZIO.succeed(bidiStream.cancel()) - _ <- processAckQueue(ackQueue, bidiStream, None) - } yield () - case _ => - ZIO.unit // no finalizers needed on failures as we expect the bidi stream to be recovered - } + ackStream = ZStream + .unfoldZIO(())(_ => + processAckQueue(ackQueue, bidiStream, Some(1024)).flatMap { + case None => ZIO.some(((), ())) + case Some(c) => ZIO.failCause(c) } - _ <- ackStream - .runForeachScoped(_ => ZIO.unit) - .catchAllCause(ackStreamFailed.failCause) - .forkScoped - } yield () - ) + ) + ackStreamFailed <- ZStream.fromZIO(Promise.make[Throwable, Nothing]) + _ <- + ZStream.scopedWith { scope => + for { + _ <- + scope.addFinalizerExit { _ => + for { + // cancel receiving stream before processing the rest of the queue + _ <- ZIO.succeed(bidiStream.cancel()) + _ <- processAckQueue(ackQueue, bidiStream, None) + } yield () + } + _ <- ackStream.channel.drain.runIn(scope).catchAllCause(ackStreamFailed.failCause(_)).forkIn(scope) + } yield () + } s <- stream.interruptWhen(ackStreamFailed) } yield s).retry(retrySchedule)