Skip to content

Commit d89b6f8

Browse files
committed
feat: default stream
1 parent c920ae2 commit d89b6f8

File tree

6 files changed

+147
-10
lines changed

6 files changed

+147
-10
lines changed

cmd/cmd.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"go-micro.dev/v5/debug/profile/http"
2424
"go-micro.dev/v5/debug/profile/pprof"
2525
"go-micro.dev/v5/debug/trace"
26+
"go-micro.dev/v5/events"
2627
"go-micro.dev/v5/logger"
2728
mprofile "go-micro.dev/v5/profile"
2829
"go-micro.dev/v5/registry"
@@ -293,6 +294,7 @@ var (
293294
DefaultCaches = map[string]func(...cache.Option) cache.Cache{
294295
"redis": redis.NewRedisCache,
295296
}
297+
DefaultStreams = map[string]func(...events.Option) (events.Stream, error){}
296298
)
297299

298300
func init() {
@@ -313,6 +315,7 @@ func newCmd(opts ...Option) Cmd {
313315
DebugProfile: &profile.DefaultProfile,
314316
Config: &config.DefaultConfig,
315317
Cache: &cache.DefaultCache,
318+
Stream: &events.DefaultStream,
316319

317320
Brokers: DefaultBrokers,
318321
Clients: DefaultClients,
@@ -376,7 +379,10 @@ func (c *cmd) Before(ctx *cli.Context) error {
376379
if profileName != "" {
377380
switch profileName {
378381
case "local":
379-
imported := mprofile.LocalProfile()
382+
imported, ierr := mprofile.LocalProfile()
383+
if ierr != nil {
384+
return fmt.Errorf("failed to load local profile: %v", ierr)
385+
}
380386
*c.opts.Registry = imported.Registry
381387
registry.DefaultRegistry = imported.Registry
382388
*c.opts.Broker = imported.Broker
@@ -386,7 +392,10 @@ func (c *cmd) Before(ctx *cli.Context) error {
386392
*c.opts.Transport = imported.Transport
387393
transport.DefaultTransport = imported.Transport
388394
case "nats":
389-
imported := mprofile.NatsProfile()
395+
imported, ierr := mprofile.NatsProfile()
396+
if ierr != nil {
397+
return fmt.Errorf("failed to load nats profile: %v", ierr)
398+
}
390399
// Set the registry
391400
sopts, clopts := c.setRegistry(imported.Registry)
392401
serverOpts = append(serverOpts, sopts...)
@@ -407,6 +416,11 @@ func (c *cmd) Before(ctx *cli.Context) error {
407416
serverOpts = append(serverOpts, sopts...)
408417
clientOpts = append(clientOpts, clopts...)
409418

419+
// Set the stream
420+
sopts, clopts = c.setStream(imported.Stream)
421+
serverOpts = append(serverOpts, sopts...)
422+
clientOpts = append(clientOpts, clopts...)
423+
410424
// Add more profiles as needed
411425
default:
412426
return fmt.Errorf("unsupported profile: %s", profileName)
@@ -701,6 +715,17 @@ func (c *cmd) setRegistry(r registry.Registry) ([]server.Option, []client.Option
701715
registry.DefaultRegistry = *c.opts.Registry
702716
return serverOpts, clientOpts
703717
}
718+
func (c *cmd) setStream(s events.Stream) ([]server.Option, []client.Option) {
719+
var serverOpts []server.Option
720+
var clientOpts []client.Option
721+
*c.opts.Stream = s
722+
// TODO: do server and client need a Stream?
723+
// serverOpts = append(serverOpts, server.Registry(*c.opts.Registry))
724+
// clientOpts = append(clientOpts, client.Registry(*c.opts.Registry))
725+
726+
events.DefaultStream = *c.opts.Stream
727+
return serverOpts, clientOpts
728+
}
704729

705730
func (c *cmd) setBroker(b broker.Broker) ([]server.Option, []client.Option) {
706731
var serverOpts []server.Option

cmd/options.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"go-micro.dev/v5/config"
1111
"go-micro.dev/v5/debug/profile"
1212
"go-micro.dev/v5/debug/trace"
13+
"go-micro.dev/v5/events"
1314
"go-micro.dev/v5/registry"
1415
"go-micro.dev/v5/selector"
1516
"go-micro.dev/v5/server"
@@ -42,13 +43,15 @@ type Options struct {
4243
Broker *broker.Broker
4344
Auths map[string]func(...auth.Option) auth.Auth
4445
Store *store.Store
46+
Stream *events.Stream
4547
Configs map[string]func(...config.Option) (config.Config, error)
4648
Clients map[string]func(...client.Option) client.Client
4749
Registries map[string]func(...registry.Option) registry.Registry
4850
Selectors map[string]func(...selector.Option) selector.Selector
4951
Servers map[string]func(...server.Option) server.Server
5052
Transports map[string]func(...transport.Option) transport.Transport
5153
Stores map[string]func(...store.Option) store.Store
54+
Streams map[string]func(...events.Option) events.Stream
5255
Tracers map[string]func(...trace.Option) trace.Tracer
5356
Version string
5457

@@ -141,6 +144,13 @@ func Store(s *store.Store) Option {
141144
}
142145
}
143146

147+
func Stream(s *events.Stream) Option {
148+
return func(o *Options) {
149+
o.Stream = s
150+
events.DefaultStream = *s
151+
}
152+
}
153+
144154
func Tracer(t *trace.Tracer) Option {
145155
return func(o *Options) {
146156
o.Tracer = t
@@ -169,6 +179,13 @@ func NewBroker(name string, b func(...broker.Option) broker.Broker) Option {
169179
}
170180
}
171181

182+
// New stream func.
183+
func NewStream(name string, b func(...events.Option) events.Stream) Option {
184+
return func(o *Options) {
185+
o.Streams[name] = b
186+
}
187+
}
188+
172189
// New cache func.
173190
func NewCache(name string, c func(...cache.Option) cache.Cache) Option {
174191
return func(o *Options) {

events/natsjs/README.md

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,48 @@
11
# NATS JetStream
22

33
This plugin uses NATS with JetStream to send and receive events.
4+
5+
## Create a stream
6+
7+
```go
8+
ev, err := natsjs.NewStream(
9+
natsjs.Address("nats://10.0.1.46:4222"),
10+
natsjs.MaxAge(24*160*time.Minute),
11+
)
12+
```
13+
14+
## Consume a stream
15+
16+
```go
17+
ee, err := events.Consume("test",
18+
events.WithAutoAck(false, time.Second*30),
19+
events.WithGroup("testgroup"),
20+
)
21+
if err != nil {
22+
panic(err)
23+
}
24+
go func() {
25+
for {
26+
msg := <-ee
27+
// Process the message
28+
logger.Info("Received message:", string(msg.Payload))
29+
err := msg.Ack()
30+
if err != nil {
31+
logger.Error("Error acknowledging message:", err)
32+
} else {
33+
logger.Info("Message acknowledged")
34+
}
35+
}
36+
}()
37+
38+
```
39+
40+
## Publish an Event to the stream
41+
42+
```go
43+
err = ev.Publish("test", []byte("hello world"))
44+
if err != nil {
45+
panic(err)
46+
}
47+
```
48+

events/natsjs/nats.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,15 +179,23 @@ func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan eve
179179
// not acknowledging the message is the way to indicate an error occurred
180180
return
181181
}
182-
183-
if !options.AutoAck {
182+
if options.AutoAck {
184183
// set up the ack funcs
185184
evt.SetAckFunc(func() error {
186185
return msg.Ack()
187186
})
187+
188188
evt.SetNackFunc(func() error {
189189
return msg.Nak()
190190
})
191+
} else {
192+
// set up the ack funcs
193+
evt.SetAckFunc(func() error {
194+
return nil
195+
})
196+
evt.SetNackFunc(func() error {
197+
return nil
198+
})
191199
}
192200

193201
// push onto the channel and wait for the consumer to take the event off before we acknowledge it.
@@ -198,6 +206,7 @@ func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan eve
198206
}
199207

200208
if err := msg.Ack(nats.Context(ctx)); err != nil {
209+
201210
log.Logf(logger.ErrorLevel, "Error acknowledging message: %v", err)
202211
}
203212
}
@@ -208,6 +217,12 @@ func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan eve
208217
cfg := &nats.StreamConfig{
209218
Name: topic,
210219
}
220+
if s.opts.RetentionPolicy != 0 {
221+
cfg.Retention = nats.RetentionPolicy(s.opts.RetentionPolicy)
222+
}
223+
if s.opts.MaxAge > 0 {
224+
cfg.MaxAge = s.opts.MaxAge
225+
}
211226

212227
_, err = s.natsJetStreamCtx.AddStream(cfg)
213228
if err != nil {
@@ -223,7 +238,7 @@ func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan eve
223238
}
224239

225240
if options.AutoAck {
226-
subOpts = append(subOpts, nats.AckNone())
241+
subOpts = append(subOpts, nats.AckAll())
227242
} else {
228243
subOpts = append(subOpts, nats.AckExplicit())
229244
}

events/natsjs/options.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package natsjs
22

33
import (
44
"crypto/tls"
5+
"time"
56

67
"go-micro.dev/v5/logger"
78
)
@@ -19,6 +20,9 @@ type Options struct {
1920
DisableDurableStreams bool
2021
Username string
2122
Password string
23+
RetentionPolicy int
24+
MaxAge time.Duration
25+
MaxMsgSize int
2226
}
2327

2428
// Option is a function which configures options.
@@ -94,3 +98,19 @@ func Authenticate(username, password string) Option {
9498
o.Password = password
9599
}
96100
}
101+
func RetentionPolicy(rp int) Option {
102+
return func(o *Options) {
103+
o.RetentionPolicy = rp
104+
}
105+
}
106+
107+
func MaxMsgSize(size int) Option {
108+
return func(o *Options) {
109+
o.MaxMsgSize = size
110+
}
111+
}
112+
func MaxAge(age time.Duration) Option {
113+
return func(o *Options) {
114+
o.MaxAge = age
115+
}
116+
}

profile/profile.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
natslib "github.com/nats-io/nats.go"
99
"go-micro.dev/v5/broker"
1010
"go-micro.dev/v5/broker/nats"
11+
"go-micro.dev/v5/events"
12+
nevents "go-micro.dev/v5/events/natsjs"
1113
"go-micro.dev/v5/registry"
1214
nreg "go-micro.dev/v5/registry/nats"
1315
"go-micro.dev/v5/store"
@@ -22,34 +24,46 @@ type Profile struct {
2224
Broker broker.Broker
2325
Store store.Store
2426
Transport transport.Transport
27+
Stream events.Stream
2528
}
2629

2730
// LocalProfile returns a profile with local mDNS as the registry, HTTP as the broker, file as the store, and HTTP as the transport
2831
// It is used for local development and testing
29-
func LocalProfile() Profile {
32+
func LocalProfile() (Profile, error) {
33+
stream, err := events.NewStream()
3034
return Profile{
3135
Registry: registry.NewMDNSRegistry(),
3236
Broker: broker.NewHttpBroker(),
3337
Store: store.NewFileStore(),
3438
Transport: transport.NewHTTPTransport(),
35-
}
39+
Stream: stream,
40+
}, err
3641
}
3742

3843
// NatsProfile returns a profile with NATS as the registry, broker, store, and transport
3944
// It uses the environment variable MICR_NATS_ADDRESS to set the NATS server address
4045
// If the variable is not set, it defaults to nats://0.0.0.0:4222 which will connect to a local NATS server
41-
func NatsProfile() Profile {
46+
func NatsProfile() (Profile, error) {
4247
addr := os.Getenv("MICRO_NATS_ADDRESS")
4348
if addr == "" {
4449
addr = "nats://0.0.0.0:4222"
4550
}
4651
// Split the address by comma, trim whitespace, and convert to a slice of strings
4752
addrs := splitNatsAdressList(addr)
53+
4854
reg := nreg.NewNatsRegistry(registry.Addrs(addrs...))
49-
brok := nats.NewNatsBroker(broker.Addrs(addrs...))
55+
56+
nopts := natslib.GetDefaultOptions()
57+
nopts.Servers = addrs
58+
brok := nats.NewNatsBroker(broker.Addrs(addrs...), nats.Options(nopts))
59+
5060
st := nstore.NewStore(nstore.NatsOptions(natslib.Options{Servers: addrs}))
5161
tx := ntx.NewTransport(ntx.Options(natslib.Options{Servers: addrs}))
5262

63+
stream, err := nevents.NewStream(
64+
nevents.Address(addr),
65+
)
66+
5367
registry.DefaultRegistry = reg
5468
broker.DefaultBroker = brok
5569
store.DefaultStore = st
@@ -59,7 +73,8 @@ func NatsProfile() Profile {
5973
Broker: brok,
6074
Store: st,
6175
Transport: tx,
62-
}
76+
Stream: stream,
77+
}, err
6378
}
6479

6580
func splitNatsAdressList(addr string) []string {

0 commit comments

Comments
 (0)