diff --git a/bridge.go b/bridge.go index b4c6e09..9586a7c 100644 --- a/bridge.go +++ b/bridge.go @@ -3,5 +3,5 @@ package opinionatedevents import "context" type bridge interface { - take(ctx context.Context, msg *Message) *envelope + take(ctx context.Context, batch []*Message) *envelope } diff --git a/bridge_async.go b/bridge_async.go index 9ab3282..2423bec 100644 --- a/bridge_async.go +++ b/bridge_async.go @@ -25,8 +25,8 @@ func newAsyncBridge(maxDeliveryAttempts int, waitBetweenAttempts int, destinatio } } -func (b *asyncBridge) take(ctx context.Context, msg *Message) *envelope { - env := newEnvelope(ctx, msg) +func (b *asyncBridge) take(ctx context.Context, batch []*Message) *envelope { + env := newEnvelope(ctx, batch) go b.deliver(env) return env } @@ -40,7 +40,7 @@ func (b *asyncBridge) deliver(envelope *envelope) { // try delivering the message to all (pending) destinations deliveredTo := []int{} for i, destination := range destinations { - if err := destination.Deliver(envelope.ctx, envelope.msg); err == nil { + if err := destination.Deliver(envelope.ctx, envelope.batch); err == nil { deliveredTo = append(deliveredTo, i) } } diff --git a/bridge_async_test.go b/bridge_async_test.go index b26174f..ecee137 100644 --- a/bridge_async_test.go +++ b/bridge_async_test.go @@ -17,14 +17,14 @@ func TestAsyncBridge(t *testing.T) { bridge := newAsyncBridge(3, 0, destination) // push the handler attempts := 0 - destination.pushHandler(func(_ context.Context, _ *Message) error { + destination.pushHandler(func(_ context.Context, _ []*Message) error { attempts += 1 return nil }) // deliver the message & assert msg, err := NewMessage("test.test", nil) assert.NoError(t, err) - envelope := bridge.take(ctx, msg) + envelope := bridge.take(ctx, []*Message{msg}) assert.NoError(t, waitForSuccessEnvelope(envelope)) assert.Equal(t, 1, attempts) }) @@ -37,7 +37,7 @@ func TestAsyncBridge(t *testing.T) { attempts := 0 for i := 0; i < 2; i++ { isFirstHandler := i == 0 - destination.pushHandler(func(_ context.Context, _ *Message) error { + destination.pushHandler(func(_ context.Context, _ []*Message) error { attempts += 1 if isFirstHandler { return fmt.Errorf("failed to deliver") @@ -49,7 +49,7 @@ func TestAsyncBridge(t *testing.T) { // deliver the message & assert msg, err := NewMessage("test.test", nil) assert.NoError(t, err) - envelope := bridge.take(ctx, msg) + envelope := bridge.take(ctx, []*Message{msg}) assert.NoError(t, waitForSuccessEnvelope(envelope)) assert.Equal(t, 2, attempts) }) @@ -65,7 +65,7 @@ func TestAsyncBridge(t *testing.T) { attemptIndex := i shouldFail := i == 0 for u := 0; u < bridge.deliveryConfig.maxAttempts; u += 1 { - d.pushHandler(func(_ context.Context, _ *Message) error { + d.pushHandler(func(_ context.Context, _ []*Message) error { attempts[attemptIndex] = attempts[attemptIndex] + 1 if shouldFail { return fmt.Errorf("failed to deliver") @@ -78,7 +78,7 @@ func TestAsyncBridge(t *testing.T) { // deliver the message & assert msg, err := NewMessage("test.test", nil) assert.NoError(t, err) - envelope := bridge.take(ctx, msg) + envelope := bridge.take(ctx, []*Message{msg}) assert.Error(t, waitForSuccessEnvelope(envelope)) // the first one will fail (attempts should be 3) assert.Equal(t, 3, attempts[0]) @@ -93,7 +93,7 @@ func TestAsyncBridge(t *testing.T) { // push the handlers attempts := 0 for i := 0; i < bridge.deliveryConfig.maxAttempts+1; i++ { - destination.pushHandler(func(_ context.Context, _ *Message) error { + destination.pushHandler(func(_ context.Context, _ []*Message) error { attempts += 1 return fmt.Errorf("delivery failed") }) @@ -101,7 +101,7 @@ func TestAsyncBridge(t *testing.T) { // deliver the message & assert msg, err := NewMessage("test.test", nil) assert.NoError(t, err) - envelope := bridge.take(ctx, msg) + envelope := bridge.take(ctx, []*Message{msg}) assert.Error(t, waitForSuccessEnvelope(envelope)) assert.Equal(t, bridge.deliveryConfig.maxAttempts, attempts) assert.Len(t, destination.handlers, 1) // there should be one handler left @@ -113,7 +113,7 @@ func TestAsyncBridge(t *testing.T) { bridge := newAsyncBridge(3, 0, destination) // push the handler waitFor := 100 - destination.pushHandler(func(_ context.Context, _ *Message) error { + destination.pushHandler(func(_ context.Context, _ []*Message) error { time.Sleep(time.Duration(waitFor) * time.Millisecond) return nil }) @@ -121,7 +121,7 @@ func TestAsyncBridge(t *testing.T) { start := time.Now() msg, err := NewMessage("test.test", nil) assert.NoError(t, err) - envelope := bridge.take(ctx, msg) + envelope := bridge.take(ctx, []*Message{msg}) assert.NoError(t, waitForSuccessEnvelope(envelope)) duration := time.Since(start).Milliseconds() assert.GreaterOrEqual(t, duration, int64(waitFor)) diff --git a/bridge_helpers.go b/bridge_helpers.go index 84e3c42..75bd43f 100644 --- a/bridge_helpers.go +++ b/bridge_helpers.go @@ -7,7 +7,7 @@ import ( "time" ) -type testDestinationHandler = func(ctx context.Context, msg *Message) error +type testDestinationHandler = func(ctx context.Context, batch []*Message) error type testDestination struct { handlers []testDestinationHandler @@ -17,11 +17,11 @@ func newTestDestination() *testDestination { return &testDestination{handlers: []testDestinationHandler{}} } -func (d *testDestination) Deliver(ctx context.Context, msg *Message) error { +func (d *testDestination) Deliver(ctx context.Context, batch []*Message) error { if handler, err := d.nextHandler(); err != nil { return err } else { - return handler(ctx, msg) + return handler(ctx, batch) } } diff --git a/bridge_sync.go b/bridge_sync.go index eba77da..97a18df 100644 --- a/bridge_sync.go +++ b/bridge_sync.go @@ -10,11 +10,11 @@ func newSyncBridge(destinations ...Destination) *syncBridge { return &syncBridge{destinations: destinations} } -func (b *syncBridge) take(ctx context.Context, msg *Message) *envelope { - env := newEnvelope(ctx, msg) +func (b *syncBridge) take(ctx context.Context, batch []*Message) *envelope { + env := newEnvelope(ctx, batch) var possibleErr error = nil for _, d := range b.destinations { - if err := d.Deliver(ctx, msg); err != nil { + if err := d.Deliver(ctx, batch); err != nil { possibleErr = err } } diff --git a/bridge_sync_test.go b/bridge_sync_test.go index 00a6302..f642d6b 100644 --- a/bridge_sync_test.go +++ b/bridge_sync_test.go @@ -18,15 +18,15 @@ func TestSyncBridge(t *testing.T) { msg, err := NewMessage("test.test", nil) assert.NoError(t, err) // pass it through the sync bridge - envelope := bridge.take(ctx, msg) + envelope := bridge.take(ctx, []*Message{msg}) assert.Error(t, waitForSuccessEnvelope(envelope)) // try again with a handler defined - destination.pushHandler(func(_ context.Context, _ *Message) error { + destination.pushHandler(func(_ context.Context, _ []*Message) error { return nil }) msg, err = NewMessage("test.test", nil) assert.NoError(t, err) - envelope = bridge.take(ctx, msg) + envelope = bridge.take(ctx, []*Message{msg}) assert.NoError(t, waitForSuccessEnvelope(envelope)) }) @@ -40,14 +40,14 @@ func TestSyncBridge(t *testing.T) { handled := 0 for i := 0; i < countToHandle; i++ { // push the handler - destination.pushHandler(func(_ context.Context, _ *Message) error { + destination.pushHandler(func(_ context.Context, _ []*Message) error { handled += 1 return nil }) // deliver the next message msg, err := NewMessage("test.test", nil) assert.NoError(t, err) - envelope := bridge.take(ctx, msg) + envelope := bridge.take(ctx, []*Message{msg}) assert.NoError(t, waitForSuccessEnvelope(envelope)) // check that it was handled expected := i + 1 @@ -66,7 +66,7 @@ func TestSyncBridge(t *testing.T) { bridge := newSyncBridge(destination) // push the slow handler waitFor := 250 - destination.pushHandler(func(_ context.Context, _ *Message) error { + destination.pushHandler(func(_ context.Context, _ []*Message) error { time.Sleep(time.Duration(waitFor) * time.Millisecond) return nil }) @@ -74,7 +74,7 @@ func TestSyncBridge(t *testing.T) { startAt := time.Now() msg, err := NewMessage("test.test", nil) assert.NoError(t, err) - envelope := bridge.take(ctx, msg) + envelope := bridge.take(ctx, []*Message{msg}) assert.NoError(t, waitForSuccessEnvelope(envelope)) overAt := time.Now() if overAt.Sub(startAt).Milliseconds() < int64(waitFor) { @@ -87,13 +87,13 @@ func TestSyncBridge(t *testing.T) { destination := newTestDestination() bridge := newSyncBridge(destination) // push the failing handler - destination.pushHandler(func(_ context.Context, _ *Message) error { + destination.pushHandler(func(_ context.Context, _ []*Message) error { return errors.New("failed") }) // deliver the message msg, err := NewMessage("test.test", nil) assert.NoError(t, err) - envelope := bridge.take(ctx, msg) + envelope := bridge.take(ctx, []*Message{msg}) assert.Error(t, waitForSuccessEnvelope(envelope)) }) @@ -102,13 +102,13 @@ func TestSyncBridge(t *testing.T) { destination := newTestDestination() bridge := newSyncBridge(destination) // push the handler - destination.pushHandler(func(_ context.Context, _ *Message) error { + destination.pushHandler(func(_ context.Context, _ []*Message) error { return nil }) // deliver the message msg, err := NewMessage("test.test", nil) assert.NoError(t, err) - envelope := bridge.take(ctx, msg) + envelope := bridge.take(ctx, []*Message{msg}) var success bool select { case <-envelope.onSuccess(): @@ -124,13 +124,13 @@ func TestSyncBridge(t *testing.T) { destination := newTestDestination() bridge := newSyncBridge(destination) // push the failing handler - destination.pushHandler(func(_ context.Context, _ *Message) error { + destination.pushHandler(func(_ context.Context, _ []*Message) error { return errors.New("failed to deliver") }) // deliver the message msg, err := NewMessage("test.test", nil) assert.NoError(t, err) - envelope := bridge.take(ctx, msg) + envelope := bridge.take(ctx, []*Message{msg}) var success bool select { case <-envelope.onSuccess(): diff --git a/destination.go b/destination.go index 70b4b14..05bd233 100644 --- a/destination.go +++ b/destination.go @@ -3,5 +3,5 @@ package opinionatedevents import "context" type Destination interface { - Deliver(ctx context.Context, msg *Message) error + Deliver(ctx context.Context, batch []*Message) error } diff --git a/destination_http.go b/destination_http.go index 60b0e0a..ce84022 100644 --- a/destination_http.go +++ b/destination_http.go @@ -28,8 +28,8 @@ func (d *httpDestination) setClient(client httpClient) { d.client = client } -func (d *httpDestination) Deliver(_ context.Context, msg *Message) error { - payload, err := json.Marshal(msg) +func (d *httpDestination) Deliver(_ context.Context, batch []*Message) error { + payload, err := json.Marshal(batch) if err != nil { return err } diff --git a/destination_http_test.go b/destination_http_test.go index d411f0a..1b375c5 100644 --- a/destination_http_test.go +++ b/destination_http_test.go @@ -21,7 +21,7 @@ func TestHTTPDestination(t *testing.T) { destination.setClient(client) msg, err := NewMessage("test.test", nil) assert.NoError(t, err) - err = destination.Deliver(ctx, msg) + err = destination.Deliver(ctx, []*Message{msg}) assert.Error(t, err) }) @@ -41,7 +41,7 @@ func TestHTTPDestination(t *testing.T) { // publish the message msg, err := NewMessage("test.test", nil) assert.NoError(t, err) - err = destination.Deliver(ctx, msg) + err = destination.Deliver(ctx, []*Message{msg}) assert.NoError(t, err) assert.Equal(t, 1, i) }) @@ -62,7 +62,7 @@ func TestHTTPDestination(t *testing.T) { // publish the message msg, err := NewMessage("test.test", nil) assert.NoError(t, err) - err = destination.Deliver(ctx, msg) + err = destination.Deliver(ctx, []*Message{msg}) assert.Error(t, err) assert.Equal(t, 1, i) }) @@ -87,31 +87,34 @@ func TestHTTPDestination(t *testing.T) { if err != nil { return nil, err } - var payload map[string]interface{} + var payload []map[string]interface{} if err := json.Unmarshal(body, &payload); err != nil { return nil, err } - meta, ok := payload["meta"].(map[string]interface{}) - assert.True(t, ok) - // assert the data types - assert.IsType(t, "", payload["name"]) - assert.IsType(t, "", payload["payload"]) - assert.IsType(t, "", meta["published_at"]) - // assert some fields - assert.Equal(t, "test.test", payload["name"]) - assert.Equal(t, msg.GetPublishedAt().UTC().Format(time.RFC3339Nano), meta["published_at"]) - // parse the message payload - payloadAsJson, err := base64.StdEncoding.DecodeString(payload["payload"].(string)) - assert.NoError(t, err) - var data map[string]interface{} - assert.NoError(t, json.Unmarshal(payloadAsJson, &data)) - assert.Equal(t, "world", data["hello"]) - assert.Equal(t, true, data["ok"]) - assert.Equal(t, 4.0, data["age"]) + assert.Len(t, payload, 1) + for _, i := range payload { + meta, ok := i["meta"].(map[string]interface{}) + assert.True(t, ok) + // assert the data types + assert.IsType(t, "", i["name"]) + assert.IsType(t, "", i["payload"]) + assert.IsType(t, "", meta["published_at"]) + // assert some fields + assert.Equal(t, "test.test", i["name"]) + assert.Equal(t, msg.GetPublishedAt().UTC().Format(time.RFC3339Nano), meta["published_at"]) + // parse the message payload + payloadAsJson, err := base64.StdEncoding.DecodeString(i["payload"].(string)) + assert.NoError(t, err) + var data map[string]interface{} + assert.NoError(t, json.Unmarshal(payloadAsJson, &data)) + assert.Equal(t, "world", data["hello"]) + assert.Equal(t, true, data["ok"]) + assert.Equal(t, 4.0, data["age"]) + } return &http.Response{StatusCode: 200}, nil }) // publish the message - deliveryErr := destination.Deliver(ctx, msg) + deliveryErr := destination.Deliver(ctx, []*Message{msg}) assert.NoError(t, deliveryErr) }) } diff --git a/destination_postgres.go b/destination_postgres.go index 094559e..f0f737f 100644 --- a/destination_postgres.go +++ b/destination_postgres.go @@ -195,29 +195,32 @@ func (d *postgresDestination) setRouting(routing postgresRoutingProvider) { d.routing = routing } -func (d *postgresDestination) Deliver(ctx context.Context, msg *Message) error { - payload, err := json.Marshal(msg) - if err != nil { - return err - } +func (d *postgresDestination) Deliver(ctx context.Context, batch []*Message) error { + // FIXME: should insert a batch of messages in a single INSERT statement return d.tx.do(ctx, func(tx sqlTx) error { - queues, err := d.routing.queues(tx, msg.GetTopic()) - if err != nil { - return err - } - for _, queue := range queues { - if err := d.insertMessage(&postgresDestinationInsertMessageArgs{ - name: msg.GetName(), - payload: payload, - publishedAt: msg.GetPublishedAt(), - deliverAt: msg.GetDeliverAt(), - queue: queue, - topic: msg.GetTopic(), - tx: tx, - uuid: msg.GetUUID(), - }); err != nil { + for _, msg := range batch { + payload, err := json.Marshal(msg) + if err != nil { + return err + } + queues, err := d.routing.queues(tx, msg.GetTopic()) + if err != nil { return err } + for _, queue := range queues { + if err := d.insertMessage(&postgresDestinationInsertMessageArgs{ + name: msg.GetName(), + payload: payload, + publishedAt: msg.GetPublishedAt(), + deliverAt: msg.GetDeliverAt(), + queue: queue, + topic: msg.GetTopic(), + tx: tx, + uuid: msg.GetUUID(), + }); err != nil { + return err + } + } } return nil }) diff --git a/destination_postgres_test.go b/destination_postgres_test.go index 115671b..0c210c2 100644 --- a/destination_postgres_test.go +++ b/destination_postgres_test.go @@ -19,7 +19,7 @@ func TestPostgresDestination(t *testing.T) { for i := 0; i < 3; i += 1 { msg, err := NewMessage("customers.created", nil) assert.NoError(t, err) - err = destination.Deliver(context.Background(), msg) + err = destination.Deliver(context.Background(), []*Message{msg}) assert.NoError(t, err) } // each message should have been sent in its own transaction @@ -46,7 +46,7 @@ func TestPostgresDestination(t *testing.T) { for i := 0; i < 3; i += 1 { msg, err := NewMessage("customers.created", nil) assert.NoError(t, err) - err = destination.Deliver(ctx, msg) + err = destination.Deliver(ctx, []*Message{msg}) assert.NoError(t, err) } // the internal db should not have been used... @@ -72,7 +72,7 @@ func TestPostgresDestination(t *testing.T) { destination.setRouting(newTestRouting([]string{"topic.1", "topic.2"})) msg, err := NewMessage("customers.created", nil) assert.NoError(t, err) - err = destination.Deliver(context.Background(), msg) + err = destination.Deliver(context.Background(), []*Message{msg}) assert.NoError(t, err) // there should be one transaction from the internal db assert.Equal(t, 1, db.beginCount) diff --git a/envelope.go b/envelope.go index 84f330f..b064880 100644 --- a/envelope.go +++ b/envelope.go @@ -19,8 +19,8 @@ func newDeliveryEvent(name string) *deliveryEvent { } type envelope struct { - ctx context.Context - msg *Message + ctx context.Context + batch []*Message events chan *deliveryEvent @@ -139,10 +139,10 @@ func (e *envelope) addProxy(proxy chan *deliveryEvent) { }) } -func newEnvelope(ctx context.Context, msg *Message) *envelope { +func newEnvelope(ctx context.Context, batch []*Message) *envelope { env := &envelope{ - ctx: ctx, - msg: msg, + ctx: ctx, + batch: batch, events: make(chan *deliveryEvent), proxies: []chan *deliveryEvent{}, diff --git a/publisher.go b/publisher.go index 115ef5b..ce857b8 100644 --- a/publisher.go +++ b/publisher.go @@ -10,7 +10,7 @@ import ( ) type onDeliveryFailureHandler struct { - handler func(msg *Message) + handler func(batch []*Message) } type Publisher struct { @@ -62,10 +62,10 @@ func NewPublisher(opts ...publisherOption) (*Publisher, error) { return publisher, nil } -func (p *Publisher) OnDeliveryFailure(handler func(msg *Message)) func() { - p.onDeliveryFailureHandlers = append(p.onDeliveryFailureHandlers, &onDeliveryFailureHandler{ - handler: handler, - }) +func (p *Publisher) OnDeliveryFailure(handler func(batch []*Message)) func() { + p.onDeliveryFailureHandlers = append(p.onDeliveryFailureHandlers, + &onDeliveryFailureHandler{handler: handler}, + ) return func() { for i := 0; i < len(p.onDeliveryFailureHandlers); i += 1 { if reflect.ValueOf(p.onDeliveryFailureHandlers[i].handler).Pointer() == reflect.ValueOf(handler).Pointer() { @@ -76,24 +76,35 @@ func (p *Publisher) OnDeliveryFailure(handler func(msg *Message)) func() { } } -func (p *Publisher) Publish(ctx context.Context, msg *Message) error { - if msg.publishedAt.IsZero() { - msg.publishedAt = time.Now() - } - if msg.deliverAt.IsZero() { - msg.deliverAt = msg.publishedAt +func (p *Publisher) PublishOne(ctx context.Context, msg *Message) error { + batch := []*Message{msg} + return p.publish(ctx, batch) +} + +func (p *Publisher) PublishMany(ctx context.Context, batch []*Message) error { + return p.publish(ctx, batch) +} + +func (p *Publisher) publish(ctx context.Context, batch []*Message) error { + for _, msg := range batch { + if msg.publishedAt.IsZero() { + msg.publishedAt = time.Now() + } + if msg.deliverAt.IsZero() { + msg.deliverAt = msg.publishedAt + } } p.inFlightWaitingGroup.Add(1) - envelope := p.bridge.take(ctx, msg) + envelope := p.bridge.take(ctx, batch) // FIXME: this entire `isClosed` is fucked up, eg. there is no way to extract the actual error... if envelope.isClosed() { // the envelope was closed synchronously -> handle result synchronously p.inFlightWaitingGroup.Done() if envelope.isClosedWith(deliveryEventFailureName) { for _, handleFailure := range p.onDeliveryFailureHandlers { - handleFailure.handler(msg) + handleFailure.handler(batch) } - return fmt.Errorf("error publishing a message: %#v", msg) + return fmt.Errorf("error publishing a batch of messages: %#v", batch) } return nil } @@ -105,7 +116,7 @@ func (p *Publisher) Publish(ctx context.Context, msg *Message) error { case <-envelope.onFailure(): p.inFlightWaitingGroup.Done() for _, handleFailure := range p.onDeliveryFailureHandlers { - handleFailure.handler(msg) + handleFailure.handler(batch) } } }()