Skip to content

Commit

Permalink
wip: 2024-08-25 13:40:15
Browse files Browse the repository at this point in the history
  • Loading branch information
markusylisiurunen committed Aug 25, 2024
1 parent 724e002 commit 22ad6bd
Show file tree
Hide file tree
Showing 13 changed files with 118 additions and 101 deletions.
2 changes: 1 addition & 1 deletion bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ package opinionatedevents
import "context"

type bridge interface {
take(ctx context.Context, msg *Message) *envelope
take(ctx context.Context, batch []*Message) *envelope
}
6 changes: 3 additions & 3 deletions bridge_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func newAsyncBridge(maxDeliveryAttempts int, waitBetweenAttempts int, destinatio
}
}

func (b *asyncBridge) take(ctx context.Context, msg *Message) *envelope {
env := newEnvelope(ctx, msg)
func (b *asyncBridge) take(ctx context.Context, batch []*Message) *envelope {
env := newEnvelope(ctx, batch)
go b.deliver(env)
return env
}
Expand All @@ -40,7 +40,7 @@ func (b *asyncBridge) deliver(envelope *envelope) {
// try delivering the message to all (pending) destinations
deliveredTo := []int{}
for i, destination := range destinations {
if err := destination.Deliver(envelope.ctx, envelope.msg); err == nil {
if err := destination.Deliver(envelope.ctx, envelope.batch); err == nil {
deliveredTo = append(deliveredTo, i)
}
}
Expand Down
20 changes: 10 additions & 10 deletions bridge_async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ func TestAsyncBridge(t *testing.T) {
bridge := newAsyncBridge(3, 0, destination)
// push the handler
attempts := 0
destination.pushHandler(func(_ context.Context, _ *Message) error {
destination.pushHandler(func(_ context.Context, _ []*Message) error {
attempts += 1
return nil
})
// deliver the message & assert
msg, err := NewMessage("test.test", nil)
assert.NoError(t, err)
envelope := bridge.take(ctx, msg)
envelope := bridge.take(ctx, []*Message{msg})
assert.NoError(t, waitForSuccessEnvelope(envelope))
assert.Equal(t, 1, attempts)
})
Expand All @@ -37,7 +37,7 @@ func TestAsyncBridge(t *testing.T) {
attempts := 0
for i := 0; i < 2; i++ {
isFirstHandler := i == 0
destination.pushHandler(func(_ context.Context, _ *Message) error {
destination.pushHandler(func(_ context.Context, _ []*Message) error {
attempts += 1
if isFirstHandler {
return fmt.Errorf("failed to deliver")
Expand All @@ -49,7 +49,7 @@ func TestAsyncBridge(t *testing.T) {
// deliver the message & assert
msg, err := NewMessage("test.test", nil)
assert.NoError(t, err)
envelope := bridge.take(ctx, msg)
envelope := bridge.take(ctx, []*Message{msg})
assert.NoError(t, waitForSuccessEnvelope(envelope))
assert.Equal(t, 2, attempts)
})
Expand All @@ -65,7 +65,7 @@ func TestAsyncBridge(t *testing.T) {
attemptIndex := i
shouldFail := i == 0
for u := 0; u < bridge.deliveryConfig.maxAttempts; u += 1 {
d.pushHandler(func(_ context.Context, _ *Message) error {
d.pushHandler(func(_ context.Context, _ []*Message) error {
attempts[attemptIndex] = attempts[attemptIndex] + 1
if shouldFail {
return fmt.Errorf("failed to deliver")
Expand All @@ -78,7 +78,7 @@ func TestAsyncBridge(t *testing.T) {
// deliver the message & assert
msg, err := NewMessage("test.test", nil)
assert.NoError(t, err)
envelope := bridge.take(ctx, msg)
envelope := bridge.take(ctx, []*Message{msg})
assert.Error(t, waitForSuccessEnvelope(envelope))
// the first one will fail (attempts should be 3)
assert.Equal(t, 3, attempts[0])
Expand All @@ -93,15 +93,15 @@ func TestAsyncBridge(t *testing.T) {
// push the handlers
attempts := 0
for i := 0; i < bridge.deliveryConfig.maxAttempts+1; i++ {
destination.pushHandler(func(_ context.Context, _ *Message) error {
destination.pushHandler(func(_ context.Context, _ []*Message) error {
attempts += 1
return fmt.Errorf("delivery failed")
})
}
// deliver the message & assert
msg, err := NewMessage("test.test", nil)
assert.NoError(t, err)
envelope := bridge.take(ctx, msg)
envelope := bridge.take(ctx, []*Message{msg})
assert.Error(t, waitForSuccessEnvelope(envelope))
assert.Equal(t, bridge.deliveryConfig.maxAttempts, attempts)
assert.Len(t, destination.handlers, 1) // there should be one handler left
Expand All @@ -113,15 +113,15 @@ func TestAsyncBridge(t *testing.T) {
bridge := newAsyncBridge(3, 0, destination)
// push the handler
waitFor := 100
destination.pushHandler(func(_ context.Context, _ *Message) error {
destination.pushHandler(func(_ context.Context, _ []*Message) error {
time.Sleep(time.Duration(waitFor) * time.Millisecond)
return nil
})
// deliver the message & assert
start := time.Now()
msg, err := NewMessage("test.test", nil)
assert.NoError(t, err)
envelope := bridge.take(ctx, msg)
envelope := bridge.take(ctx, []*Message{msg})
assert.NoError(t, waitForSuccessEnvelope(envelope))
duration := time.Since(start).Milliseconds()
assert.GreaterOrEqual(t, duration, int64(waitFor))
Expand Down
6 changes: 3 additions & 3 deletions bridge_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"
)

type testDestinationHandler = func(ctx context.Context, msg *Message) error
type testDestinationHandler = func(ctx context.Context, batch []*Message) error

type testDestination struct {
handlers []testDestinationHandler
Expand All @@ -17,11 +17,11 @@ func newTestDestination() *testDestination {
return &testDestination{handlers: []testDestinationHandler{}}
}

func (d *testDestination) Deliver(ctx context.Context, msg *Message) error {
func (d *testDestination) Deliver(ctx context.Context, batch []*Message) error {
if handler, err := d.nextHandler(); err != nil {
return err
} else {
return handler(ctx, msg)
return handler(ctx, batch)
}
}

Expand Down
6 changes: 3 additions & 3 deletions bridge_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ func newSyncBridge(destinations ...Destination) *syncBridge {
return &syncBridge{destinations: destinations}
}

func (b *syncBridge) take(ctx context.Context, msg *Message) *envelope {
env := newEnvelope(ctx, msg)
func (b *syncBridge) take(ctx context.Context, batch []*Message) *envelope {
env := newEnvelope(ctx, batch)
var possibleErr error = nil
for _, d := range b.destinations {
if err := d.Deliver(ctx, msg); err != nil {
if err := d.Deliver(ctx, batch); err != nil {
possibleErr = err
}
}
Expand Down
26 changes: 13 additions & 13 deletions bridge_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ func TestSyncBridge(t *testing.T) {
msg, err := NewMessage("test.test", nil)
assert.NoError(t, err)
// pass it through the sync bridge
envelope := bridge.take(ctx, msg)
envelope := bridge.take(ctx, []*Message{msg})
assert.Error(t, waitForSuccessEnvelope(envelope))
// try again with a handler defined
destination.pushHandler(func(_ context.Context, _ *Message) error {
destination.pushHandler(func(_ context.Context, _ []*Message) error {
return nil
})
msg, err = NewMessage("test.test", nil)
assert.NoError(t, err)
envelope = bridge.take(ctx, msg)
envelope = bridge.take(ctx, []*Message{msg})
assert.NoError(t, waitForSuccessEnvelope(envelope))
})

Expand All @@ -40,14 +40,14 @@ func TestSyncBridge(t *testing.T) {
handled := 0
for i := 0; i < countToHandle; i++ {
// push the handler
destination.pushHandler(func(_ context.Context, _ *Message) error {
destination.pushHandler(func(_ context.Context, _ []*Message) error {
handled += 1
return nil
})
// deliver the next message
msg, err := NewMessage("test.test", nil)
assert.NoError(t, err)
envelope := bridge.take(ctx, msg)
envelope := bridge.take(ctx, []*Message{msg})
assert.NoError(t, waitForSuccessEnvelope(envelope))
// check that it was handled
expected := i + 1
Expand All @@ -66,15 +66,15 @@ func TestSyncBridge(t *testing.T) {
bridge := newSyncBridge(destination)
// push the slow handler
waitFor := 250
destination.pushHandler(func(_ context.Context, _ *Message) error {
destination.pushHandler(func(_ context.Context, _ []*Message) error {
time.Sleep(time.Duration(waitFor) * time.Millisecond)
return nil
})
// deliver the message
startAt := time.Now()
msg, err := NewMessage("test.test", nil)
assert.NoError(t, err)
envelope := bridge.take(ctx, msg)
envelope := bridge.take(ctx, []*Message{msg})
assert.NoError(t, waitForSuccessEnvelope(envelope))
overAt := time.Now()
if overAt.Sub(startAt).Milliseconds() < int64(waitFor) {
Expand All @@ -87,13 +87,13 @@ func TestSyncBridge(t *testing.T) {
destination := newTestDestination()
bridge := newSyncBridge(destination)
// push the failing handler
destination.pushHandler(func(_ context.Context, _ *Message) error {
destination.pushHandler(func(_ context.Context, _ []*Message) error {
return errors.New("failed")
})
// deliver the message
msg, err := NewMessage("test.test", nil)
assert.NoError(t, err)
envelope := bridge.take(ctx, msg)
envelope := bridge.take(ctx, []*Message{msg})
assert.Error(t, waitForSuccessEnvelope(envelope))
})

Expand All @@ -102,13 +102,13 @@ func TestSyncBridge(t *testing.T) {
destination := newTestDestination()
bridge := newSyncBridge(destination)
// push the handler
destination.pushHandler(func(_ context.Context, _ *Message) error {
destination.pushHandler(func(_ context.Context, _ []*Message) error {
return nil
})
// deliver the message
msg, err := NewMessage("test.test", nil)
assert.NoError(t, err)
envelope := bridge.take(ctx, msg)
envelope := bridge.take(ctx, []*Message{msg})
var success bool
select {
case <-envelope.onSuccess():
Expand All @@ -124,13 +124,13 @@ func TestSyncBridge(t *testing.T) {
destination := newTestDestination()
bridge := newSyncBridge(destination)
// push the failing handler
destination.pushHandler(func(_ context.Context, _ *Message) error {
destination.pushHandler(func(_ context.Context, _ []*Message) error {
return errors.New("failed to deliver")
})
// deliver the message
msg, err := NewMessage("test.test", nil)
assert.NoError(t, err)
envelope := bridge.take(ctx, msg)
envelope := bridge.take(ctx, []*Message{msg})
var success bool
select {
case <-envelope.onSuccess():
Expand Down
2 changes: 1 addition & 1 deletion destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ package opinionatedevents
import "context"

type Destination interface {
Deliver(ctx context.Context, msg *Message) error
Deliver(ctx context.Context, batch []*Message) error
}
4 changes: 2 additions & 2 deletions destination_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func (d *httpDestination) setClient(client httpClient) {
d.client = client
}

func (d *httpDestination) Deliver(_ context.Context, msg *Message) error {
payload, err := json.Marshal(msg)
func (d *httpDestination) Deliver(_ context.Context, batch []*Message) error {
payload, err := json.Marshal(batch)
if err != nil {
return err
}
Expand Down
47 changes: 25 additions & 22 deletions destination_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestHTTPDestination(t *testing.T) {
destination.setClient(client)
msg, err := NewMessage("test.test", nil)
assert.NoError(t, err)
err = destination.Deliver(ctx, msg)
err = destination.Deliver(ctx, []*Message{msg})
assert.Error(t, err)
})

Expand All @@ -41,7 +41,7 @@ func TestHTTPDestination(t *testing.T) {
// publish the message
msg, err := NewMessage("test.test", nil)
assert.NoError(t, err)
err = destination.Deliver(ctx, msg)
err = destination.Deliver(ctx, []*Message{msg})
assert.NoError(t, err)
assert.Equal(t, 1, i)
})
Expand All @@ -62,7 +62,7 @@ func TestHTTPDestination(t *testing.T) {
// publish the message
msg, err := NewMessage("test.test", nil)
assert.NoError(t, err)
err = destination.Deliver(ctx, msg)
err = destination.Deliver(ctx, []*Message{msg})
assert.Error(t, err)
assert.Equal(t, 1, i)
})
Expand All @@ -87,31 +87,34 @@ func TestHTTPDestination(t *testing.T) {
if err != nil {
return nil, err
}
var payload map[string]interface{}
var payload []map[string]interface{}
if err := json.Unmarshal(body, &payload); err != nil {
return nil, err
}
meta, ok := payload["meta"].(map[string]interface{})
assert.True(t, ok)
// assert the data types
assert.IsType(t, "", payload["name"])
assert.IsType(t, "", payload["payload"])
assert.IsType(t, "", meta["published_at"])
// assert some fields
assert.Equal(t, "test.test", payload["name"])
assert.Equal(t, msg.GetPublishedAt().UTC().Format(time.RFC3339Nano), meta["published_at"])
// parse the message payload
payloadAsJson, err := base64.StdEncoding.DecodeString(payload["payload"].(string))
assert.NoError(t, err)
var data map[string]interface{}
assert.NoError(t, json.Unmarshal(payloadAsJson, &data))
assert.Equal(t, "world", data["hello"])
assert.Equal(t, true, data["ok"])
assert.Equal(t, 4.0, data["age"])
assert.Len(t, payload, 1)
for _, i := range payload {
meta, ok := i["meta"].(map[string]interface{})
assert.True(t, ok)
// assert the data types
assert.IsType(t, "", i["name"])
assert.IsType(t, "", i["payload"])
assert.IsType(t, "", meta["published_at"])
// assert some fields
assert.Equal(t, "test.test", i["name"])
assert.Equal(t, msg.GetPublishedAt().UTC().Format(time.RFC3339Nano), meta["published_at"])
// parse the message payload
payloadAsJson, err := base64.StdEncoding.DecodeString(i["payload"].(string))
assert.NoError(t, err)
var data map[string]interface{}
assert.NoError(t, json.Unmarshal(payloadAsJson, &data))
assert.Equal(t, "world", data["hello"])
assert.Equal(t, true, data["ok"])
assert.Equal(t, 4.0, data["age"])
}
return &http.Response{StatusCode: 200}, nil
})
// publish the message
deliveryErr := destination.Deliver(ctx, msg)
deliveryErr := destination.Deliver(ctx, []*Message{msg})
assert.NoError(t, deliveryErr)
})
}
Expand Down
Loading

0 comments on commit 22ad6bd

Please sign in to comment.