Skip to content

Commit

Permalink
minor docs update and examples reformatting
Browse files Browse the repository at this point in the history
  • Loading branch information
rolang committed Jun 23, 2024
1 parent 56aaab1 commit f3b53fa
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 39 deletions.
36 changes: 23 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
## Modules

- `zio-pubsub` Core components/interfaces/models
- `zio-pubsub-google` Provides subscriber, publisher and admin clients implementations using the [Google Java](https://cloud.google.com/java/docs/reference/google-cloud-pubsub/latest/overview) library
- `zio-pubsub-google` Provides publisher, admin and [StreamingPull API](https://cloud.google.com/pubsub/docs/pull#streamingpull_api) based subscriber client implementations using [Google's Java](https://cloud.google.com/java/docs/reference/google-cloud-pubsub/latest/overview) library
- `zio-pubsub-serde-circe` Provides Json Serializer/Deserializer using the [circe](https://circe.github.io/circe) codec
- `zio-pubsub-serde-vulcan` Provides Avro schema Serializer/Deserializer using the [vulcan](https://fd4s.github.io/vulcan) codec

Expand All @@ -36,7 +36,11 @@ object BasicSubscription extends ZIOAppDefault:
.subscribe(subscriptionName = "basic_example", des = Serde.int)
.mapZIO { (message, ackReply) =>
for {
_ <- printLine(s"Received message with id ${message.meta.messageId.value} and data ${message.data}")
_ <- printLine(
s"Received message" +
s" with id ${message.meta.messageId.value}" +
s" and data ${message.data}"
)
_ <- ackReply.ack()
} yield ()
}
Expand All @@ -49,7 +53,10 @@ object BasicSubscription extends ZIOAppDefault:

ZLayer.scoped(
G.Subscriber.makeStreamingPullSubscriber(
connection = G.PubsubConnectionConfig.Emulator(G.PubsubConnectionConfig.GcpProject("any"), "localhost:8085")
connection = G.PubsubConnectionConfig.Emulator(
G.PubsubConnectionConfig.GcpProject("any"),
"localhost:8085",
)
)
)
}
Expand All @@ -64,16 +71,16 @@ import zio.stream.ZStream, zio.*
object SamplesPublisher extends ZIOAppDefault:
def run = ZStream
.repeatZIOWithSchedule(Random.nextInt, Schedule.fixed(2.seconds))
.mapZIO { sampleData =>
.mapZIO { sample =>
for {
messageId <- Publisher.publish[Any, Int](
PublishMessage(
data = sampleData,
attributes = Map.empty,
orderingKey = None,
)
)
_ <- Console.printLine(s"Published data $sampleData with message id ${messageId.value}")
mId <- Publisher.publish[Any, Int](
PublishMessage(
data = sample,
attributes = Map.empty,
orderingKey = None,
)
)
_ <- Console.printLine(s"Published data $sample with message id ${mId.value}")
} yield ()
}
.runDrain
Expand All @@ -86,7 +93,10 @@ object SamplesPublisher extends ZIOAppDefault:
ZLayer.scoped(
G.Publisher.make(
config = G.PublisherConfig(
connection = G.PubsubConnectionConfig.Emulator(G.PubsubConnectionConfig.GcpProject("any"), "localhost:8085"),
connection = G.PubsubConnectionConfig.Emulator(
G.PubsubConnectionConfig.GcpProject("any"),
"localhost:8085",
),
topicName = "basic_example",
encoding = Encoding.Binary,
enableOrdering = false,
Expand Down
36 changes: 23 additions & 13 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ sidebar_label: "Getting Started"
## Modules

- `zio-pubsub` Core components/interfaces/models
- `zio-pubsub-google` Provides subscriber, publisher and admin clients implementations using the [Google Java](https://cloud.google.com/java/docs/reference/google-cloud-pubsub/latest/overview) library
- `zio-pubsub-google` Provides publisher, admin and [StreamingPull API](https://cloud.google.com/pubsub/docs/pull#streamingpull_api) based subscriber client implementations using [Google's Java](https://cloud.google.com/java/docs/reference/google-cloud-pubsub/latest/overview) library
- `zio-pubsub-serde-circe` Provides Json Serializer/Deserializer using the [circe](https://circe.github.io/circe) codec
- `zio-pubsub-serde-vulcan` Provides Avro schema Serializer/Deserializer using the [vulcan](https://fd4s.github.io/vulcan) codec

Expand All @@ -36,7 +36,11 @@ object BasicSubscription extends ZIOAppDefault:
.subscribe(subscriptionName = "basic_example", des = Serde.int)
.mapZIO { (message, ackReply) =>
for {
_ <- printLine(s"Received message with id ${message.meta.messageId.value} and data ${message.data}")
_ <- printLine(
s"Received message" +
s" with id ${message.meta.messageId.value}" +
s" and data ${message.data}"
)
_ <- ackReply.ack()
} yield ()
}
Expand All @@ -49,7 +53,10 @@ object BasicSubscription extends ZIOAppDefault:

ZLayer.scoped(
G.Subscriber.makeStreamingPullSubscriber(
connection = G.PubsubConnectionConfig.Emulator(G.PubsubConnectionConfig.GcpProject("any"), "localhost:8085")
connection = G.PubsubConnectionConfig.Emulator(
G.PubsubConnectionConfig.GcpProject("any"),
"localhost:8085",
)
)
)
}
Expand All @@ -64,16 +71,16 @@ import zio.stream.ZStream, zio.*
object SamplesPublisher extends ZIOAppDefault:
def run = ZStream
.repeatZIOWithSchedule(Random.nextInt, Schedule.fixed(2.seconds))
.mapZIO { sampleData =>
.mapZIO { sample =>
for {
messageId <- Publisher.publish[Any, Int](
PublishMessage(
data = sampleData,
attributes = Map.empty,
orderingKey = None,
)
)
_ <- Console.printLine(s"Published data $sampleData with message id ${messageId.value}")
mId <- Publisher.publish[Any, Int](
PublishMessage(
data = sample,
attributes = Map.empty,
orderingKey = None,
)
)
_ <- Console.printLine(s"Published data $sample with message id ${mId.value}")
} yield ()
}
.runDrain
Expand All @@ -86,7 +93,10 @@ object SamplesPublisher extends ZIOAppDefault:
ZLayer.scoped(
G.Publisher.make(
config = G.PublisherConfig(
connection = G.PubsubConnectionConfig.Emulator(G.PubsubConnectionConfig.GcpProject("any"), "localhost:8085"),
connection = G.PubsubConnectionConfig.Emulator(
G.PubsubConnectionConfig.GcpProject("any"),
"localhost:8085",
),
topicName = "basic_example",
encoding = Encoding.Binary,
enableOrdering = false,
Expand Down
11 changes: 9 additions & 2 deletions examples/google/src/main/scala/BasicSubscription.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ object BasicSubscription extends ZIOAppDefault:
.subscribe(subscriptionName = "basic_example", des = Serde.int)
.mapZIO { (message, ackReply) =>
for {
_ <- printLine(s"Received message with id ${message.meta.messageId.value} and data ${message.data}")
_ <- printLine(
s"Received message" +
s" with id ${message.meta.messageId.value}" +
s" and data ${message.data}"
)
_ <- ackReply.ack()
} yield ()
}
Expand All @@ -19,7 +23,10 @@ object BasicSubscription extends ZIOAppDefault:

ZLayer.scoped(
G.Subscriber.makeStreamingPullSubscriber(
connection = G.PubsubConnectionConfig.Emulator(G.PubsubConnectionConfig.GcpProject("any"), "localhost:8085")
connection = G.PubsubConnectionConfig.Emulator(
G.PubsubConnectionConfig.GcpProject("any"),
"localhost:8085",
)
)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ object PubAndSubAndAdminExample extends ZIOAppDefault:
}

val pubsubConnection: G.PubsubConnectionConfig =
G.PubsubConnectionConfig.Emulator(G.PubsubConnectionConfig.GcpProject("any"), "localhost:8085")
G.PubsubConnectionConfig.Emulator(
G.PubsubConnectionConfig.GcpProject("any"),
"localhost:8085",
)

val publisherLayer: TaskLayer[Publisher[Any, Int]] = ZLayer.scoped(
G.Publisher.make(
Expand Down
23 changes: 13 additions & 10 deletions examples/google/src/main/scala/SamplesPublisher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ import zio.stream.ZStream, zio.*
object SamplesPublisher extends ZIOAppDefault:
def run = ZStream
.repeatZIOWithSchedule(Random.nextInt, Schedule.fixed(2.seconds))
.mapZIO { sampleData =>
.mapZIO { sample =>
for {
messageId <- Publisher.publish[Any, Int](
PublishMessage(
data = sampleData,
attributes = Map.empty,
orderingKey = None,
)
)
_ <- Console.printLine(s"Published data $sampleData with message id ${messageId.value}")
mId <- Publisher.publish[Any, Int](
PublishMessage(
data = sample,
attributes = Map.empty,
orderingKey = None,
)
)
_ <- Console.printLine(s"Published data $sample with message id ${mId.value}")
} yield ()
}
.runDrain
Expand All @@ -26,7 +26,10 @@ object SamplesPublisher extends ZIOAppDefault:
ZLayer.scoped(
G.Publisher.make(
config = G.PublisherConfig(
connection = G.PubsubConnectionConfig.Emulator(G.PubsubConnectionConfig.GcpProject("any"), "localhost:8085"),
connection = G.PubsubConnectionConfig.Emulator(
G.PubsubConnectionConfig.GcpProject("any"),
"localhost:8085",
),
topicName = "basic_example",
encoding = Encoding.Binary,
enableOrdering = false,
Expand Down

0 comments on commit f3b53fa

Please sign in to comment.