Skip to content

Commit 3d97dd2

Browse files
committed
merge master!
2 parents 81aaec6 + 97275d3 commit 3d97dd2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+4587
-189
lines changed

README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@ in the plugins repo. State and persistence becomes a core requirement beyond pro
4545

4646
## Getting Started
4747

48-
To make use of Go Micro import it
48+
To make use of Go Micro
4949

5050
```golang
51-
import "go-micro.dev/v5"
51+
go get "go-micro.dev/v5"
5252
```
5353

5454
Create a service and register a handler
@@ -105,3 +105,7 @@ curl -XPOST \
105105
-d '{"name": "alice"}' \
106106
http://localhost:8080
107107
```
108+
109+
## Adopters
110+
111+
- [Sourse](https://sourse.eu) - Work in the field of earth observation, including embedded Kubernetes running onboard aircraft, and we’ve built a mission management SaaS platform using Go Micro.

broker/broker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type Subscriber interface {
4141

4242
var (
4343
// DefaultBroker is the default Broker.
44-
DefaultBroker = NewBroker()
44+
DefaultBroker = NewMemoryBroker()
4545
)
4646

4747
func Init(opts ...Option) error {

broker/http.go renamed to broker/http/http.go

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
// Package broker provides a http based message broker
2-
package broker
1+
// Package http provides a http based message broker
2+
package http
33

44
import (
55
"bytes"
@@ -16,6 +16,7 @@ import (
1616
"time"
1717

1818
"github.com/google/uuid"
19+
"go-micro.dev/v5/broker"
1920
"go-micro.dev/v5/codec/json"
2021
merr "go-micro.dev/v5/errors"
2122
"go-micro.dev/v5/registry"
@@ -29,7 +30,7 @@ import (
2930

3031
// HTTP Broker is a point to point async broker.
3132
type httpBroker struct {
32-
opts Options
33+
opts broker.Options
3334

3435
r registry.Registry
3536

@@ -51,8 +52,8 @@ type httpBroker struct {
5152
}
5253

5354
type httpSubscriber struct {
54-
opts SubscribeOptions
55-
fn Handler
55+
opts broker.SubscribeOptions
56+
fn broker.Handler
5657
svc *registry.Service
5758
hb *httpBroker
5859
id string
@@ -61,7 +62,7 @@ type httpSubscriber struct {
6162

6263
type httpEvent struct {
6364
err error
64-
m *Message
65+
m *broker.Message
6566
t string
6667
}
6768

@@ -108,8 +109,8 @@ func newTransport(config *tls.Config) *http.Transport {
108109
return t
109110
}
110111

111-
func newHttpBroker(opts ...Option) Broker {
112-
options := *NewOptions(opts...)
112+
func newHttpBroker(opts ...broker.Option) broker.Broker {
113+
options := *broker.NewOptions(opts...)
113114

114115
options.Registry = registry.DefaultRegistry
115116
options.Codec = json.Marshaler{}
@@ -161,15 +162,15 @@ func (h *httpEvent) Error() error {
161162
return h.err
162163
}
163164

164-
func (h *httpEvent) Message() *Message {
165+
func (h *httpEvent) Message() *broker.Message {
165166
return h.m
166167
}
167168

168169
func (h *httpEvent) Topic() string {
169170
return h.t
170171
}
171172

172-
func (h *httpSubscriber) Options() SubscribeOptions {
173+
func (h *httpSubscriber) Options() broker.SubscribeOptions {
173174
return h.opts
174175
}
175176

@@ -308,7 +309,7 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
308309
return
309310
}
310311

311-
var m *Message
312+
var m *broker.Message
312313
if err = h.opts.Codec.Unmarshal(b, &m); err != nil {
313314
errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err)
314315
w.WriteHeader(500)
@@ -330,7 +331,7 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
330331
id := req.Form.Get("id")
331332

332333
//nolint:prealloc
333-
var subs []Handler
334+
var subs []broker.Handler
334335

335336
h.RLock()
336337
for _, subscriber := range h.subscribers[topic] {
@@ -458,7 +459,7 @@ func (h *httpBroker) Disconnect() error {
458459
return err
459460
}
460461

461-
func (h *httpBroker) Init(opts ...Option) error {
462+
func (h *httpBroker) Init(opts ...broker.Option) error {
462463
h.RLock()
463464
if h.running {
464465
h.RUnlock()
@@ -505,13 +506,13 @@ func (h *httpBroker) Init(opts ...Option) error {
505506
return nil
506507
}
507508

508-
func (h *httpBroker) Options() Options {
509+
func (h *httpBroker) Options() broker.Options {
509510
return h.opts
510511
}
511512

512-
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
513+
func (h *httpBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
513514
// create the message first
514-
m := &Message{
515+
m := &broker.Message{
515516
Header: make(map[string]string),
516517
Body: msg.Body,
517518
}
@@ -637,10 +638,10 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
637638
return nil
638639
}
639640

640-
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
641+
func (h *httpBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
641642
var err error
642643
var host, port string
643-
options := NewSubscribeOptions(opts...)
644+
options := broker.NewSubscribeOptions(opts...)
644645

645646
// parse address for host, port
646647
host, port, err = net.SplitHostPort(h.Address())
@@ -705,7 +706,7 @@ func (h *httpBroker) String() string {
705706
return "http"
706707
}
707708

708-
// NewBroker returns a new http broker.
709-
func NewBroker(opts ...Option) Broker {
709+
// NewHttpBroker returns a new http broker.
710+
func NewHttpBroker(opts ...broker.Option) broker.Broker {
710711
return newHttpBroker(opts...)
711712
}

broker/http_test.go renamed to broker/http/http_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package broker_test
1+
package http_test
22

33
import (
44
"sync"
@@ -7,6 +7,7 @@ import (
77

88
"github.com/google/uuid"
99
"go-micro.dev/v5/broker"
10+
"go-micro.dev/v5/broker/http"
1011
"go-micro.dev/v5/registry"
1112
)
1213

@@ -60,7 +61,7 @@ func sub(b *testing.B, c int) {
6061
b.StopTimer()
6162
m := newTestRegistry()
6263

63-
brker := broker.NewBroker(broker.Registry(m))
64+
brker := http.NewHttpBroker(broker.Registry(m))
6465
topic := uuid.New().String()
6566

6667
if err := brker.Init(); err != nil {
@@ -121,7 +122,7 @@ func sub(b *testing.B, c int) {
121122
func pub(b *testing.B, c int) {
122123
b.StopTimer()
123124
m := newTestRegistry()
124-
brk := broker.NewBroker(broker.Registry(m))
125+
brk := http.NewHttpBroker(broker.Registry(m))
125126
topic := uuid.New().String()
126127

127128
if err := brk.Init(); err != nil {
@@ -190,7 +191,7 @@ func pub(b *testing.B, c int) {
190191

191192
func TestBroker(t *testing.T) {
192193
m := newTestRegistry()
193-
b := broker.NewBroker(broker.Registry(m))
194+
b := http.NewHttpBroker(broker.Registry(m))
194195

195196
if err := b.Init(); err != nil {
196197
t.Fatalf("Unexpected init error: %v", err)
@@ -239,7 +240,7 @@ func TestBroker(t *testing.T) {
239240

240241
func TestConcurrentSubBroker(t *testing.T) {
241242
m := newTestRegistry()
242-
b := broker.NewBroker(broker.Registry(m))
243+
b := http.NewHttpBroker(broker.Registry(m))
243244

244245
if err := b.Init(); err != nil {
245246
t.Fatalf("Unexpected init error: %v", err)
@@ -298,7 +299,7 @@ func TestConcurrentSubBroker(t *testing.T) {
298299

299300
func TestConcurrentPubBroker(t *testing.T) {
300301
m := newTestRegistry()
301-
b := broker.NewBroker(broker.Registry(m))
302+
b := http.NewHttpBroker(broker.Registry(m))
302303

303304
if err := b.Init(); err != nil {
304305
t.Fatalf("Unexpected init error: %v", err)

broker/nats/context.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package nats
2+
3+
import (
4+
"context"
5+
6+
"go-micro.dev/v5/broker"
7+
)
8+
9+
// setBrokerOption returns a function to setup a context with given value.
10+
func setBrokerOption(k, v interface{}) broker.Option {
11+
return func(o *broker.Options) {
12+
if o.Context == nil {
13+
o.Context = context.Background()
14+
}
15+
o.Context = context.WithValue(o.Context, k, v)
16+
}
17+
}

0 commit comments

Comments
 (0)