Skip to content

Commit

Permalink
dependency updates (#19)
Browse files Browse the repository at this point in the history
* update zio to 2.1.3
* update subscriber ack stream finalizer
  • Loading branch information
rolang authored Jun 16, 2024
1 parent da2e446 commit 2ce2eb0
Show file tree
Hide file tree
Showing 11 changed files with 59 additions and 62 deletions.
28 changes: 14 additions & 14 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ jobs:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v4.0.0
uses: actions/setup-java@v4.2.1
with:
distribution: corretto
java-version: '17'
check-latest: true
- name: Cache Dependencies
uses: coursier/cache-action@v6.4.4
uses: coursier/cache-action@v6.4.5
- name: Check all code compiles
run: sbt +Test/compile
- name: Check artifacts build process
Expand All @@ -53,13 +53,13 @@ jobs:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v4.0.0
uses: actions/setup-java@v4.2.1
with:
distribution: corretto
java-version: '17'
check-latest: true
- name: Cache Dependencies
uses: coursier/cache-action@v6.4.4
uses: coursier/cache-action@v6.4.5
- name: Check if the site workflow is up to date
run: sbt ciCheckGithubWorkflow
- name: Lint
Expand All @@ -78,19 +78,19 @@ jobs:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v4.0.0
uses: actions/setup-java@v4.2.1
with:
distribution: corretto
java-version: ${{ matrix.java }}
check-latest: true
- name: Cache Dependencies
uses: coursier/cache-action@v6.4.4
uses: coursier/cache-action@v6.4.5
- name: Git Checkout
uses: actions/checkout@v4.1.1
with:
fetch-depth: '0'
- name: Start up pubsub
run: docker compose up -d
run: docker compose up -d && until curl -s http://localhost:8085; do printf 'waiting for pubsub...'; sleep 1; done && echo "pubsub ready"
- name: Test
run: sbt +test
update-readme:
Expand All @@ -106,13 +106,13 @@ jobs:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v4.0.0
uses: actions/setup-java@v4.2.1
with:
distribution: corretto
java-version: '17'
check-latest: true
- name: Cache Dependencies
uses: coursier/cache-action@v6.4.4
uses: coursier/cache-action@v6.4.5
- name: Generate Readme
run: sbt docs/generateReadme
- name: Commit Changes
Expand All @@ -129,7 +129,7 @@ jobs:
app_private_key: ${{ secrets.APP_PRIVATE_KEY }}
- name: Create Pull Request
id: cpr
uses: peter-evans/create-pull-request@v5.0.2
uses: peter-evans/create-pull-request@v6.0.2
with:
body: |-
Autogenerated changes after running the `sbt docs/generateReadme` command of the [zio-sbt-website](https://zio.dev/zio-sbt) plugin.
Expand Down Expand Up @@ -180,13 +180,13 @@ jobs:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v4.0.0
uses: actions/setup-java@v4.2.1
with:
distribution: corretto
java-version: '17'
check-latest: true
- name: Cache Dependencies
uses: coursier/cache-action@v6.4.4
uses: coursier/cache-action@v6.4.5
- name: Release
run: sbt ci-release
env:
Expand All @@ -209,13 +209,13 @@ jobs:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v4.0.0
uses: actions/setup-java@v4.2.1
with:
distribution: corretto
java-version: '17'
check-latest: true
- name: Cache Dependencies
uses: coursier/cache-action@v6.4.4
uses: coursier/cache-action@v6.4.5
- name: Setup NodeJs
uses: actions/setup-node@v4
with:
Expand Down
14 changes: 0 additions & 14 deletions .scalafix.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
rules = [
Disable
DisableSyntax
ExplicitResultTypes
LeakingImplicitClassVal
Expand All @@ -11,19 +10,6 @@ rules = [
MissingFinal
]

Disable {
ifSynthetic = [
"scala/Option.option2Iterable"
"scala/Predef.any2stringadd"
]

symbols = [
"scala.Option.get", "scala.Some.get", "scala.None.get",
"scala.util.Either.LeftProjection.get", "scala.util.Either.RightProjection.get",
"scala.util.Try.get", "scala.util.Failure.get", "scala.util.Success.get"
]
}

OrganizeImports {
removeUnused = true
targetDialect = Scala3
Expand Down
21 changes: 15 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
import zio.sbt.githubactions.{Job, Step}
enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin)

lazy val scala2Version = "2.13.14"

lazy val scala3Version = "3.3.3"

inThisBuild(
List(
name := "ZIO Google Cloud Pub/Sub",
zioVersion := "2.0.22",
zioVersion := "2.1.3",
organization := "com.anymindgroup",
licenses := Seq(License.Apache2),
homepage := Some(url("https://anymindgroup.com")),
crossScalaVersions := Seq("2.13.14", "3.3.3"),
scalaVersion := scala2Version,
crossScalaVersions := Seq(scala2Version, scala3Version),
ciEnabledBranches := Seq("master"),
ciJvmOptions ++= Seq("-Xms2G", "-Xmx2G", "-Xss4M", "-XX:+UseG1GC"),
ciTargetJavaVersions := Seq("17", "21"),
ciTestJobs := ciTestJobs.value.map {
case j if j.id == "test" =>
val startPubsub = Step.SingleStep(name = "Start up pubsub", run = Some("docker compose up -d"))
val startPubsub = Step.SingleStep(
name = "Start up pubsub",
run = Some(
"docker compose up -d && until curl -s http://localhost:8085; do printf 'waiting for pubsub...'; sleep 1; done && echo \"pubsub ready\""
),
)
j.copy(steps = j.steps.flatMap {
case s: Step.SingleStep if s.name.contains("Git Checkout") => Seq(s, startPubsub)
case s => Seq(s)
Expand Down Expand Up @@ -46,7 +56,6 @@ lazy val commonSettings = List(
Compile / scalacOptions --= sys.env.get("CI").fold(Seq("-Xfatal-warnings"))(_ => Nil),
Test / scalafixConfig := Some(new File(".scalafix_test.conf")),
Test / scalacOptions --= Seq("-Xfatal-warnings"),
version ~= { v => if (v.contains('+')) s"${v.replace('+', '-')}-SNAPSHOT" else v },
credentials += {
for {
username <- sys.env.get("ARTIFACT_REGISTRY_USERNAME")
Expand Down Expand Up @@ -115,7 +124,7 @@ lazy val zioPubsubSerdeVulcan = (project in file("zio-gc-pubsub-serde-vulcan"))
)
)

val circeVersion = "0.14.6"
val circeVersion = "0.14.7"
lazy val zioPubsubSerdeCirce = crossProject(JVMPlatform, NativePlatform)
.in(file("zio-gc-pubsub-serde-circe"))
.settings(moduleName := "zio-gc-pubsub-serde-circe")
Expand All @@ -130,7 +139,7 @@ lazy val zioPubsubSerdeCirce = crossProject(JVMPlatform, NativePlatform)
)
)

val googleCloudPubsubVersion = "1.129.3"
val googleCloudPubsubVersion = "1.130.1"
lazy val zioPubsubGoogle = (project in file("zio-gc-pubsub-google"))
.settings(moduleName := "zio-gc-pubsub-google")
.dependsOn(zioPubsub.jvm)
Expand Down
3 changes: 1 addition & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
version: "3.3"
services:
pubsub:
# https://console.cloud.google.com/gcr/images/google.com:cloudsdktool/GLOBAL/cloud-sdk
image: gcr.io/google.com/cloudsdktool/cloud-sdk:461.0.0-emulators
image: gcr.io/google.com/cloudsdktool/cloud-sdk:480.0.0-emulators
ports:
- "8085:8085"
command: gcloud beta emulators pubsub start --project=any --host-port=0.0.0.0:8085
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.9.9
sbt.version = 1.10.0
6 changes: 2 additions & 4 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
val zioSbtVersion = "0.4.0-alpha.23"
val zioSbtVersion = "0.4.0-alpha.25"
addSbtPlugin("dev.zio" % "zio-sbt-ecosystem" % zioSbtVersion)
addSbtPlugin("dev.zio" % "zio-sbt-website" % zioSbtVersion)
addSbtPlugin("dev.zio" % "zio-sbt-ci" % zioSbtVersion)

addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.0")
addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.1")

addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2")

addSbtPlugin("com.github.sbt" % "sbt-dynver" % "5.0.1")

addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.12.1")

addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.12")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ object AvroPublisherSpec extends ZIOSpecDefault {
)
_ <- stream.via(Pipeline.processPipeline(e => consumedRef.getAndUpdate(_ :+ e.data))).runDrain.forkScoped
testEvents = testEventsData.map(d => PublishMessage[TestEvent](d, None, Map.empty[String, String]))
_ <- ZIO.foreachDiscard(testEvents)(e => p.publish(e)) *> ZIO.sleep(200.millis)
consumed <- consumedRef.get
} yield assert(consumed)(equalTo(testEventsData.toVector))
_ <- ZIO.foreachDiscard(testEvents)(e => p.publish(e))
consumed <- consumedRef.get.repeatUntil(_.length == testEventsData.length).timeout(5.seconds)
} yield assert(consumed)(equalTo(Some(testEventsData.toVector)))
}
}
).provideSomeShared[Scope](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import zio.test.{
assert,
assertCompletes,
assertTrue,
assertZIO,
check,
}
import zio.{Promise, Queue, Random, Ref, Schedule, Scope, ZIO, durationInt}
Expand Down Expand Up @@ -141,8 +142,7 @@ object StreamingPullSubscriberSpec extends ZIOSpecDefault {
.exit
processedAckIds <- processedRef.get
ackedAndNackedIds = ackedRef.get ++ nackedRef.get
queueEmpty <- ackQueue.isEmpty
_ <- assertTrue(queueEmpty)
_ <- assertZIO(ackQueue.size)(equalTo(0))
_ <- assertTrue(processedAckIds.size >= interruptAfterCount)
_ <- assertTrue(ackedAndNackedIds.size >= interruptAfterCount)
_ <- assert(processedAckIds)(hasSameElements(ackedAndNackedIds))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import com.google.pubsub.v1.{
}

import zio.stream.{ZStream, ZStreamAspect}
import zio.{Cause, Chunk, Exit, Promise, Queue, RIO, Schedule, Scope, UIO, ZIO}
import zio.{Cause, Chunk, Promise, Queue, RIO, Schedule, Scope, UIO, ZIO}

private[pubsub] object StreamingPullSubscriber {
private def settingsFromConfig(
Expand Down Expand Up @@ -106,16 +106,17 @@ private[pubsub] object StreamingPullSubscriber {
_ <- ZStream
.scoped[Any](
for {
sc <- ZIO.service[Scope]
_ <- sc.addFinalizerExit {
case Exit.Success(_) =>
for {
// cancel receiving stream before processing the rest of the queue
_ <- ZIO.succeed(bidiStream.cancel())
_ <- processAckQueue(ackQueue, bidiStream, None)
} yield ()
case _ =>
ZIO.unit // no finalizers needed on failures as we expect the bidi stream to be recovered
_ <- ZIO.serviceWithZIO[Scope] {
_.addFinalizerExit {
case e if e.isSuccess || e.isInterrupted =>
for {
// cancel receiving stream before processing the rest of the queue
_ <- ZIO.succeed(bidiStream.cancel())
_ <- processAckQueue(ackQueue, bidiStream, None)
} yield ()
case _ =>
ZIO.unit // no finalizers needed on failures as we expect the bidi stream to be recovered
}
}
_ <- ackStream
.runForeachScoped(_ => ZIO.unit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import zio.{RIO, ZIO}

object CirceSerde {
def fromCirceCodec[T](codec: Codec[T]): Serde[Any, T] = new Serde[Any, T] {
private implicit val c: Codec[T] = codec

override def serialize(data: T): RIO[Any, Array[Byte]] = ZIO.succeed(codec(data).noSpacesSortKeys.getBytes)

override def deserialize(message: ReceivedMessage.Raw): RIO[Any, T] =
ZIO.fromEither(decode[T](new String(message.data))(codec))
ZIO.fromEither(decode[T](new String(message.data)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,25 @@ import zio.{RIO, ZIO}

object VulcanSerde {
def fromAvroCodec[T](codec: Codec[T], encoding: Encoding): Serde[Any, T] = new Serde[Any, T] {
private implicit val c: Codec[T] = codec

override def serialize(data: T): RIO[Any, Array[Byte]] =
ZIO
.fromEither(encoding match {
case Encoding.Binary =>
Codec.toBinary(data)(codec)
Codec.toBinary(data)
case Encoding.Json =>
Codec.toJson(data)(codec).map(_.getBytes)
Codec.toJson(data).map(_.getBytes)
})
.mapError(_.throwable)

override def deserialize(message: ReceivedMessage.Raw): RIO[Any, T] =
ZIO
.fromEither(encoding match {
case Encoding.Binary =>
codec.schema.flatMap(sc => Codec.fromBinary(message.data, sc)(codec))
codec.schema.flatMap(sc => Codec.fromBinary(message.data, sc))
case Encoding.Json =>
codec.schema.flatMap(sc => Codec.fromJson(new String(message.data), sc)(codec))
codec.schema.flatMap(sc => Codec.fromJson(new String(message.data), sc))
})
.mapError(_.throwable)
}
Expand Down

0 comments on commit 2ce2eb0

Please sign in to comment.