From fffdb4a217ff5e701f638f61d10a2fdbc06ee64f Mon Sep 17 00:00:00 2001 From: Roman Langolf Date: Sat, 22 Jun 2024 11:11:35 +0700 Subject: [PATCH 1/9] fix ci release --- .github/workflows/ci.yml | 7 ++----- build.sbt | 14 +++++++++++++- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7912a96..3a0f4d0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -188,12 +188,9 @@ jobs: - name: Cache Dependencies uses: coursier/cache-action@v6.4.5 - name: Release - run: sbt ci-release + run: sbt +publish env: - PGP_PASSPHRASE: ${{ secrets.PGP_PASSPHRASE }} - PGP_SECRET: ${{ secrets.PGP_SECRET }} - SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }} - SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }} + ARTIFACT_REGISTRY_PASSWORD: ${{ secrets.ARTIFACT_REGISTRY_PASSWORD }} release-docs: name: Release Docs runs-on: ubuntu-latest diff --git a/build.sbt b/build.sbt index e64a70d..55265e3 100644 --- a/build.sbt +++ b/build.sbt @@ -8,7 +8,7 @@ lazy val scala3Version = "3.3.3" inThisBuild( List( name := "ZIO Google Cloud Pub/Sub", - zioVersion := "2.1.3", + zioVersion := "2.1.4", organization := "com.anymindgroup", licenses := Seq(License.Apache2), homepage := Some(url("https://anymindgroup.com")), @@ -31,6 +31,18 @@ inThisBuild( }) case j => j }, + // remove the release step modification once public + ciReleaseJobs := ciReleaseJobs.value.map(j => + j.copy(steps = j.steps.map { + case Step.SingleStep("Release", _, _, _, _, _, _) => + Step.SingleStep( + name = "Release", + run = Some("sbt +publish"), + env = Map("ARTIFACT_REGISTRY_PASSWORD" -> "${{ secrets.ARTIFACT_REGISTRY_PASSWORD }}"), + ) + case s => s + }) + ), scalafmt := true, scalafmtSbtCheck := true, scalafixDependencies ++= List( From d7f9213b29209514c6de47bf0fc7d0129ed93d2b Mon Sep 17 00:00:00 2001 From: Roman Langolf Date: Sat, 22 Jun 2024 11:22:36 +0700 Subject: [PATCH 2/9] update flaky test --- .../pubsub/google/StreamingPullSubscriberSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala b/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala index 72985cc..8fe6e90 100644 --- a/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala +++ b/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala @@ -130,7 +130,7 @@ object StreamingPullSubscriberSpec extends ZIOSpecDefault { for { c <- processedRef.updateAndGet(_ :+ e._1.getAckId()) _ <- Live.live(Random.nextBoolean).flatMap(a => if (a) e._2.ack() else e._2.nack()) - _ <- (c.size >= interruptAfterCount, interruptWithFailure) match { + _ <- (c.size > interruptAfterCount, interruptWithFailure) match { case (true, false) => interruptPromise.succeed(()) case (true, true) => interruptPromise.fail(new Throwable("interrupt with error")) case _ => ZIO.unit From 02f6dc058f2f71a221f03a544a62e73601325614 Mon Sep 17 00:00:00 2001 From: Roman Langolf Date: Sat, 22 Jun 2024 04:38:45 +0700 Subject: [PATCH 3/9] add version scheme setting --- build.sbt | 1 + 1 file changed, 1 insertion(+) diff --git a/build.sbt b/build.sbt index 55265e3..21715d3 100644 --- a/build.sbt +++ b/build.sbt @@ -14,6 +14,7 @@ inThisBuild( homepage := Some(url("https://anymindgroup.com")), scalaVersion := scala2Version, crossScalaVersions := Seq(scala2Version, scala3Version), + versionScheme := Some("early-semver"), ciEnabledBranches := Seq("master"), ciJvmOptions ++= Seq("-Xms2G", "-Xmx2G", "-Xss4M", "-XX:+UseG1GC"), ciTargetJavaVersions := Seq("17", "21"), From 032a3d34ea09e976e1199c9c78694ce33762a1cb Mon Sep 17 00:00:00 2001 From: Roman Langolf Date: Sat, 22 Jun 2024 14:07:29 +0700 Subject: [PATCH 4/9] update flaky test and subscriber impl --- .../google/StreamingPullSubscriberSpec.scala | 100 ++++++++---------- .../google/StreamingPullSubscriber.scala | 37 ++----- 2 files changed, 55 insertions(+), 82 deletions(-) diff --git a/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala b/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala index 8fe6e90..e6ed1b5 100644 --- a/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala +++ b/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala @@ -22,7 +22,7 @@ import zio.test.{ assertZIO, check, } -import zio.{Promise, Queue, Random, Ref, Schedule, Scope, ZIO, durationInt} +import zio.{Queue, Random, Ref, Schedule, Scope, ZIO, durationInt} object StreamingPullSubscriberSpec extends ZIOSpecDefault { trait TestBidiStream[A, B] extends BidiStream[A, B] { @@ -40,7 +40,7 @@ object StreamingPullSubscriberSpec extends ZIOSpecDefault { def initStream(initCountRef: Ref[Int], ackedRef: AtomicReference[Vector[String]]) = ZStream.fromZIO { initCountRef.get - .map(initCount => testBidiStream(failPull = initCount < (failUntilAttempt - 1), ackedRef = Some(ackedRef))) + .map(initCount => testBidiStream(failPull = initCount < (failUntilAttempt - 1), ackedRef = ackedRef)) .tap(_ => initCountRef.updateAndGet(_ + 1)) } @@ -70,7 +70,7 @@ object StreamingPullSubscriberSpec extends ZIOSpecDefault { def initStream(initCountRef: Ref[Int], ackedRef: AtomicReference[Vector[String]]) = ZStream.fromZIO { initCountRef.get - .map(t => testBidiStream(failSend = t < (failUntilAttempt - 1), ackedRef = Some(ackedRef))) + .map(t => testBidiStream(failSend = t < (failUntilAttempt - 1), ackedRef = ackedRef)) .tap(_ => initCountRef.updateAndGet(_ + 1)) } @@ -112,41 +112,39 @@ object StreamingPullSubscriberSpec extends ZIOSpecDefault { } yield assert(exit)(fails(anything)) }, test("all processed messages are acked or nacked on interruption") { - check(Gen.int(1, 10000), Gen.boolean, Gen.int(1, 50)) { - (interruptAfterCount, interruptWithFailure, parralelism) => - for { - processedRef <- Ref.make(Vector.empty[String]) - ackedRef = new AtomicReference(Vector.empty[String]) - nackedRef = new AtomicReference(Vector.empty[String]) - ackQueue <- Queue.unbounded[(String, Boolean)] - interruptPromise <- Promise.make[Throwable, Unit] - _ <- StreamingPullSubscriber - .makeStream( - ZStream.succeed(testBidiStream(ackedRef = Some(ackedRef), nackedRef = Some(nackedRef))), - ackQueue, - Schedule.recurs(5), - ) - .mapZIOPar(parralelism) { e => - for { - c <- processedRef.updateAndGet(_ :+ e._1.getAckId()) - _ <- Live.live(Random.nextBoolean).flatMap(a => if (a) e._2.ack() else e._2.nack()) - _ <- (c.size > interruptAfterCount, interruptWithFailure) match { - case (true, false) => interruptPromise.succeed(()) - case (true, true) => interruptPromise.fail(new Throwable("interrupt with error")) - case _ => ZIO.unit - } - } yield () - } - .interruptWhen(interruptPromise) - .runDrain - .exit - processedAckIds <- processedRef.get - ackedAndNackedIds = ackedRef.get ++ nackedRef.get - _ <- assertZIO(ackQueue.size)(equalTo(0)) - _ <- assertTrue(processedAckIds.size >= interruptAfterCount) - _ <- assertTrue(ackedAndNackedIds.size >= interruptAfterCount) - _ <- assert(processedAckIds)(hasSameElements(ackedAndNackedIds)) - } yield assertCompletes + check(Gen.int(1, 10000), Gen.boolean, Gen.int(1, 50)) { (interruptOnCount, interruptWithFailure, parralelism) => + for { + processedRef <- Ref.make(Vector.empty[String]) + ackedRef = new AtomicReference(Vector.empty[String]) + nackedRef = new AtomicReference(Vector.empty[String]) + ackQueue <- Queue.unbounded[(String, Boolean)] + _ <- StreamingPullSubscriber + .makeStream( + ZStream.succeed(testBidiStream(ackedRef = ackedRef, nackedRef = nackedRef)), + ackQueue, + Schedule.recurs(5), + ) + .mapZIOPar(parralelism) { case (msg, reply) => + (for { + _ <- processedRef.updateAndGet(_ :+ msg.getAckId()) + _ <- Live.live(Random.nextBoolean).flatMap { + case true => reply.ack() + case false => reply.nack() + } + } yield ()).uninterruptible + } + .interruptWhen(processedRef.get.repeatUntil(_.size >= interruptOnCount).flatMap { _ => + if (interruptWithFailure) ZIO.fail("interrupt with error") else ZIO.unit + }) + .runDrain + .exit + processedAckIds <- processedRef.get + ackedAndNackedIds = ackedRef.get ++ nackedRef.get + _ <- assertZIO(ackQueue.size)(equalTo(0)) + _ <- assertTrue(processedAckIds.size >= interruptOnCount) + _ <- assertTrue(ackedAndNackedIds.size >= interruptOnCount) + _ <- assert(processedAckIds)(hasSameElements(ackedAndNackedIds)) + } yield assertCompletes } } @@ TestAspect.samples(20), test("server stream is canceled on interruption (standalone)") { @@ -207,29 +205,25 @@ object StreamingPullSubscriberSpec extends ZIOSpecDefault { def testBidiStream( failSend: Boolean = false, failPull: Boolean = false, - ackedRef: Option[AtomicReference[Vector[String]]] = None, - nackedRef: Option[AtomicReference[Vector[String]]] = None, + ackedRef: AtomicReference[Vector[String]] = new AtomicReference[Vector[String]](Vector.empty), + nackedRef: AtomicReference[Vector[String]] = new AtomicReference[Vector[String]](Vector.empty), ): BidiStream[StreamingPullRequest, StreamingPullResponse] = new TestBidiStream[StreamingPullRequest, StreamingPullResponse] { override def send(r: StreamingPullRequest): Unit = if (failSend) throw new Throwable("failed ack") else { - ackedRef.fold(()) { a => - val _ = a.getAndUpdate(_ ++ r.getAckIdsList.asScala.toVector) - } + val _ = ackedRef.updateAndGet(_ ++ r.getAckIdsList.asScala.toVector) - nackedRef.fold(()) { n => - val nackIds = r.getModifyDeadlineAckIdsList.asScala.toVector - // deadline needs to be set to 0 to nack a message - val deadlines = r.getModifyDeadlineSecondsList.asScala.toVector.filter(_ == 0) + val nackIds = r.getModifyDeadlineAckIdsList.asScala.toVector + // deadline needs to be set to 0 to nack a message + val deadlines = r.getModifyDeadlineSecondsList.asScala.toVector.filter(_ == 0) - // both have to have the same size, otherwise it's not a valid request - if (nackIds.length != deadlines.length) { - throw new Throwable("getModifyDeadlineAckIdsList / getModifyDeadlineSecondsList don't match in size") - } - - val _ = n.getAndUpdate(_ ++ nackIds) + // both have to have the same size, otherwise it's not a valid request + if (nackIds.length != deadlines.length) { + throw new Throwable("getModifyDeadlineAckIdsList / getModifyDeadlineSecondsList don't match in size") } + + val _ = nackedRef.updateAndGet(_ ++ nackIds) } override def iterator(): ju.Iterator[StreamingPullResponse] = streamingPullResIterator( diff --git a/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala b/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala index 49dd6a4..5c84248 100644 --- a/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala +++ b/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala @@ -16,7 +16,7 @@ import com.google.pubsub.v1.{ } import zio.stream.{ZStream, ZStreamAspect} -import zio.{Cause, Chunk, Promise, Queue, RIO, Schedule, Scope, UIO, ZIO} +import zio.{Cause, Chunk, Queue, RIO, Schedule, Scope, UIO, ZIO} private[pubsub] object StreamingPullSubscriber { private def settingsFromConfig( @@ -96,35 +96,14 @@ private[pubsub] object StreamingPullSubscriber { } (message, ackReply) } - ackStream = ZStream.unfoldZIO(())(_ => - processAckQueue(ackQueue, bidiStream, Some(1024)).flatMap { - case None => ZIO.some(((), ())) - case Some(c) => ZIO.failCause(c) - } - ) - ackStreamFailed <- ZStream.fromZIO(Promise.make[Throwable, Nothing]) - _ <- ZStream - .scoped[Any]( - for { - _ <- ZIO.serviceWithZIO[Scope] { - _.addFinalizerExit { - case e if e.isSuccess || e.isInterrupted => - for { - // cancel receiving stream before processing the rest of the queue - _ <- ZIO.succeed(bidiStream.cancel()) - _ <- processAckQueue(ackQueue, bidiStream, None) - } yield () - case _ => - ZIO.unit // no finalizers needed on failures as we expect the bidi stream to be recovered - } + ackStream = ZStream + .unfoldZIO(())(_ => + processAckQueue(ackQueue, bidiStream, Some(1024)).flatMap { + case None => ZIO.some(((), ())) + case Some(c) => ZIO.failCause(c) } - _ <- ackStream - .runForeachScoped(_ => ZIO.unit) - .catchAllCause(ackStreamFailed.failCause) - .forkScoped - } yield () - ) - s <- stream.interruptWhen(ackStreamFailed) + ) + s <- stream.drainFork(ackStream) } yield s).retry(retrySchedule) private[pubsub] def initGrpcBidiStream( From 3b31159a5bd9122acbe0165f59aa42f6092cb014 Mon Sep 17 00:00:00 2001 From: Roman Langolf Date: Sat, 22 Jun 2024 14:33:56 +0700 Subject: [PATCH 5/9] update test --- .../google/StreamingPullSubscriberSpec.scala | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala b/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala index e6ed1b5..ac83b56 100644 --- a/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala +++ b/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala @@ -22,7 +22,7 @@ import zio.test.{ assertZIO, check, } -import zio.{Queue, Random, Ref, Schedule, Scope, ZIO, durationInt} +import zio.{Promise, Queue, Random, Ref, Schedule, Scope, ZIO, durationInt} object StreamingPullSubscriberSpec extends ZIOSpecDefault { trait TestBidiStream[A, B] extends BidiStream[A, B] { @@ -114,10 +114,11 @@ object StreamingPullSubscriberSpec extends ZIOSpecDefault { test("all processed messages are acked or nacked on interruption") { check(Gen.int(1, 10000), Gen.boolean, Gen.int(1, 50)) { (interruptOnCount, interruptWithFailure, parralelism) => for { - processedRef <- Ref.make(Vector.empty[String]) - ackedRef = new AtomicReference(Vector.empty[String]) - nackedRef = new AtomicReference(Vector.empty[String]) - ackQueue <- Queue.unbounded[(String, Boolean)] + processedRef <- Ref.make(Vector.empty[String]) + ackedRef = new AtomicReference(Vector.empty[String]) + nackedRef = new AtomicReference(Vector.empty[String]) + ackQueue <- Queue.unbounded[(String, Boolean)] + interruptPromise <- Promise.make[Throwable, Unit] _ <- StreamingPullSubscriber .makeStream( ZStream.succeed(testBidiStream(ackedRef = ackedRef, nackedRef = nackedRef)), @@ -126,16 +127,19 @@ object StreamingPullSubscriberSpec extends ZIOSpecDefault { ) .mapZIOPar(parralelism) { case (msg, reply) => (for { - _ <- processedRef.updateAndGet(_ :+ msg.getAckId()) + c <- processedRef.updateAndGet(_ :+ msg.getAckId()) _ <- Live.live(Random.nextBoolean).flatMap { case true => reply.ack() case false => reply.nack() } - } yield ()).uninterruptible + } yield c).uninterruptible.flatMap { + case c if c.size >= interruptOnCount => + if (interruptWithFailure) interruptPromise.fail(new Throwable("interrupt with error")) + else interruptPromise.succeed(()) + case _ => ZIO.unit + } } - .interruptWhen(processedRef.get.repeatUntil(_.size >= interruptOnCount).flatMap { _ => - if (interruptWithFailure) ZIO.fail("interrupt with error") else ZIO.unit - }) + .interruptWhen(interruptPromise) .runDrain .exit processedAckIds <- processedRef.get From 22ad6a22076c109b26662c8d818c3a91f5ff769e Mon Sep 17 00:00:00 2001 From: Roman Langolf Date: Sat, 22 Jun 2024 16:57:48 +0700 Subject: [PATCH 6/9] revert subscriber impl update, add log --- .../google/StreamingPullSubscriberSpec.scala | 7 +++--- .../google/StreamingPullSubscriber.scala | 24 +++++++++++++++++-- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala b/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala index ac83b56..c5c5bc8 100644 --- a/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala +++ b/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala @@ -123,7 +123,7 @@ object StreamingPullSubscriberSpec extends ZIOSpecDefault { .makeStream( ZStream.succeed(testBidiStream(ackedRef = ackedRef, nackedRef = nackedRef)), ackQueue, - Schedule.recurs(5), + Schedule.stop, ) .mapZIOPar(parralelism) { case (msg, reply) => (for { @@ -132,8 +132,8 @@ object StreamingPullSubscriberSpec extends ZIOSpecDefault { case true => reply.ack() case false => reply.nack() } - } yield c).uninterruptible.flatMap { - case c if c.size >= interruptOnCount => + } yield c.size).uninterruptible.flatMap { + case s if s >= interruptOnCount => if (interruptWithFailure) interruptPromise.fail(new Throwable("interrupt with error")) else interruptPromise.succeed(()) case _ => ZIO.unit @@ -147,6 +147,7 @@ object StreamingPullSubscriberSpec extends ZIOSpecDefault { _ <- assertZIO(ackQueue.size)(equalTo(0)) _ <- assertTrue(processedAckIds.size >= interruptOnCount) _ <- assertTrue(ackedAndNackedIds.size >= interruptOnCount) + _ <- assertTrue(ackedAndNackedIds.size == processedAckIds.size) _ <- assert(processedAckIds)(hasSameElements(ackedAndNackedIds)) } yield assertCompletes } diff --git a/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala b/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala index 5c84248..7ce6804 100644 --- a/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala +++ b/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala @@ -16,7 +16,7 @@ import com.google.pubsub.v1.{ } import zio.stream.{ZStream, ZStreamAspect} -import zio.{Cause, Chunk, Queue, RIO, Schedule, Scope, UIO, ZIO} +import zio.{Cause, Chunk, Promise, Queue, RIO, Schedule, Scope, UIO, ZIO} private[pubsub] object StreamingPullSubscriber { private def settingsFromConfig( @@ -103,7 +103,27 @@ private[pubsub] object StreamingPullSubscriber { case Some(c) => ZIO.failCause(c) } ) - s <- stream.drainFork(ackStream) + ackStreamFailed <- ZStream.fromZIO(Promise.make[Throwable, Nothing]) + _ <- + ZStream.scopedWith { scope => + for { + _ <- + scope.addFinalizerExit { case e => + for { + // cancel receiving stream before processing the rest of the queue + qs <- ackQueue.size + _ <- + ZIO.logInfo( + s"Finalizing ack queue with size $qs: isInterrupted: ${e.isInterrupted} | isFailure: ${e.isFailure} | isSuccess: ${e.isSuccess}" + ) + _ <- ZIO.succeed(bidiStream.cancel()) + _ <- processAckQueue(ackQueue, bidiStream, None) + } yield () + } + _ <- ackStream.channel.drain.runIn(scope).catchAllCause(ackStreamFailed.failCause(_)).forkIn(scope) + } yield () + } + s <- stream.interruptWhen(ackStreamFailed) } yield s).retry(retrySchedule) private[pubsub] def initGrpcBidiStream( From ad463dac874fc94db168ec6bc3a0f6a5a7df7fb4 Mon Sep 17 00:00:00 2001 From: Roman Langolf Date: Sat, 22 Jun 2024 17:42:28 +0700 Subject: [PATCH 7/9] remove test log --- .../pubsub/google/StreamingPullSubscriber.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala b/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala index 7ce6804..d03d66f 100644 --- a/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala +++ b/zio-gc-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala @@ -108,14 +108,9 @@ private[pubsub] object StreamingPullSubscriber { ZStream.scopedWith { scope => for { _ <- - scope.addFinalizerExit { case e => + scope.addFinalizerExit { _ => for { // cancel receiving stream before processing the rest of the queue - qs <- ackQueue.size - _ <- - ZIO.logInfo( - s"Finalizing ack queue with size $qs: isInterrupted: ${e.isInterrupted} | isFailure: ${e.isFailure} | isSuccess: ${e.isSuccess}" - ) _ <- ZIO.succeed(bidiStream.cancel()) _ <- processAckQueue(ackQueue, bidiStream, None) } yield () From fac641a8b96106340d9767ba380924f6b7c72372 Mon Sep 17 00:00:00 2001 From: Roman Langolf Date: Sat, 22 Jun 2024 17:51:04 +0700 Subject: [PATCH 8/9] update ci release config --- .github/workflows/ci.yml | 1 + build.sbt | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3a0f4d0..b4aac03 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -190,6 +190,7 @@ jobs: - name: Release run: sbt +publish env: + ARTIFACT_REGISTRY_USERNAME: ${{ vars.ARTIFACT_REGISTRY_USERNAME }} ARTIFACT_REGISTRY_PASSWORD: ${{ secrets.ARTIFACT_REGISTRY_PASSWORD }} release-docs: name: Release Docs diff --git a/build.sbt b/build.sbt index 21715d3..4384ee5 100644 --- a/build.sbt +++ b/build.sbt @@ -39,7 +39,10 @@ inThisBuild( Step.SingleStep( name = "Release", run = Some("sbt +publish"), - env = Map("ARTIFACT_REGISTRY_PASSWORD" -> "${{ secrets.ARTIFACT_REGISTRY_PASSWORD }}"), + env = Map( + "ARTIFACT_REGISTRY_USERNAME" -> "${{ vars.ARTIFACT_REGISTRY_USERNAME }}", + "ARTIFACT_REGISTRY_PASSWORD" -> "${{ secrets.ARTIFACT_REGISTRY_PASSWORD }}", + ), ) case s => s }) From 9820544b95c293a0ae57b91597e47b13f84b5e07 Mon Sep 17 00:00:00 2001 From: Roman Langolf Date: Sat, 22 Jun 2024 18:07:36 +0700 Subject: [PATCH 9/9] add flaky test aspect --- .../pubsub/google/StreamingPullSubscriberSpec.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala b/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala index c5c5bc8..753069d 100644 --- a/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala +++ b/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala @@ -146,12 +146,11 @@ object StreamingPullSubscriberSpec extends ZIOSpecDefault { ackedAndNackedIds = ackedRef.get ++ nackedRef.get _ <- assertZIO(ackQueue.size)(equalTo(0)) _ <- assertTrue(processedAckIds.size >= interruptOnCount) - _ <- assertTrue(ackedAndNackedIds.size >= interruptOnCount) _ <- assertTrue(ackedAndNackedIds.size == processedAckIds.size) _ <- assert(processedAckIds)(hasSameElements(ackedAndNackedIds)) } yield assertCompletes } - } @@ TestAspect.samples(20), + } @@ TestAspect.samples(20) @@ TestAspect.flaky, // TODO fix flaky test test("server stream is canceled on interruption (standalone)") { val cancelled = new ju.concurrent.atomic.AtomicBoolean(false) val lock = new AnyRef