diff --git a/go_test.mod b/go_test.mod index f5b731dd0..87cbafe52 100644 --- a/go_test.mod +++ b/go_test.mod @@ -1,23 +1,28 @@ module github.com/nats-io/nats.go -go 1.19 +go 1.22 + +toolchain go1.22.3 require ( github.com/golang/protobuf v1.4.2 - github.com/klauspost/compress v1.17.8 + github.com/klauspost/compress v1.17.9 github.com/nats-io/jwt v1.2.2 github.com/nats-io/nats-server/v2 v2.10.16 github.com/nats-io/nkeys v0.4.7 github.com/nats-io/nuid v1.0.1 go.uber.org/goleak v1.3.0 - golang.org/x/text v0.15.0 + golang.org/x/text v0.17.0 google.golang.org/protobuf v1.23.0 ) require ( - github.com/minio/highwayhash v1.0.2 // indirect - github.com/nats-io/jwt/v2 v2.5.7 // indirect - golang.org/x/crypto v0.23.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/time v0.5.0 // indirect + github.com/google/go-tpm v0.9.1 // indirect + github.com/minio/highwayhash v1.0.3 // indirect + github.com/nats-io/jwt/v2 v2.5.8 // indirect + golang.org/x/crypto v0.26.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/time v0.6.0 // indirect ) + +replace github.com/nats-io/nats-server/v2 => /Users/tomaszpietrek/coding/nats-server diff --git a/go_test.sum b/go_test.sum index f89d755ba..fc8ad255a 100644 --- a/go_test.sum +++ b/go_test.sum @@ -1,4 +1,5 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= @@ -8,43 +9,45 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= -github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= -github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= -github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-tpm v0.9.1 h1:0pGc4X//bAlmZzMKf8iz6IsDo1nYTbYJ6FZN/rg4zdM= +github.com/google/go-tpm v0.9.1/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= +github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= -github.com/nats-io/jwt/v2 v2.5.7 h1:j5lH1fUXCnJnY8SsQeB/a/z9Azgu2bYIDvtPVNdxe2c= -github.com/nats-io/jwt/v2 v2.5.7/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= -github.com/nats-io/nats-server/v2 v2.10.16 h1:2jXaiydp5oB/nAx/Ytf9fdCi9QN6ItIc9eehX8kwVV0= -github.com/nats-io/nats-server/v2 v2.10.16/go.mod h1:Pksi38H2+6xLe1vQx0/EA4bzetM0NqyIHcIbmgXSkIU= +github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= +github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= -golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= -golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= +golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -54,3 +57,4 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/jetstream/api.go b/jetstream/api.go index 1cea088ed..b9c94e15e 100644 --- a/jetstream/api.go +++ b/jetstream/api.go @@ -101,6 +101,9 @@ const ( // apiMsgDeleteT is the endpoint to remove a message. apiMsgDeleteT = "STREAM.MSG.DELETE.%s" + + // apiConsumerUnpinT is the endpoint to unpin a consumer. + apiConsumerUnpinT = "CONSUMER.UNPIN.%s.%s" ) func (js *jetStream) apiRequestJSON(ctx context.Context, subject string, resp any, data ...[]byte) (*jetStreamMsg, error) { diff --git a/jetstream/consumer.go b/jetstream/consumer.go index aa9003f3d..45539158d 100644 --- a/jetstream/consumer.go +++ b/jetstream/consumer.go @@ -329,3 +329,37 @@ func validateConsumerName(dur string) error { } return nil } + +func unpinConsumer(ctx context.Context, js *jetStream, stream, consumer, group string) error { + ctx, cancel := wrapContextWithoutDeadline(ctx) + if cancel != nil { + defer cancel() + } + if err := validateConsumerName(consumer); err != nil { + return err + } + unpinSubject := apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerUnpinT, stream, consumer)) + + var req = consumerUnpinRequest{ + Group: group, + } + + reqJSON, err := json.Marshal(req) + if err != nil { + return err + } + + var resp apiResponse + + if _, err := js.apiRequestJSON(ctx, unpinSubject, &resp, reqJSON); err != nil { + return err + } + if resp.Error != nil { + if resp.Error.ErrorCode == JSErrCodeConsumerNotFound { + return ErrConsumerNotFound + } + return resp.Error + } + + return nil +} diff --git a/jetstream/consumer_config.go b/jetstream/consumer_config.go index 4e2e3d6e0..0285ee741 100644 --- a/jetstream/consumer_config.go +++ b/jetstream/consumer_config.go @@ -217,6 +217,10 @@ type ( // associating metadata on the consumer. This feature requires // nats-server v2.10.0 or later. Metadata map[string]string `json:"metadata,omitempty"` + + PriorityPolicy PriorityPolicy `json:"priority_policy,omitempty"` + PinnedTTL time.Duration `json:"priority_timeout,omitempty"` + PriorityGroups []string `json:"priority_groups,omitempty"` } // OrderedConsumerConfig is the configuration of an ordered JetStream @@ -283,8 +287,42 @@ type ( Stream uint64 `json:"stream_seq"` Last *time.Time `json:"last_active,omitempty"` } + + PriorityPolicy int +) + +const ( + PriorityPolicyNone PriorityPolicy = iota + PriorityPolicyPinned + PriorityPolicyOverflow ) +func (p *PriorityPolicy) UnmarshalJSON(data []byte) error { + switch string(data) { + case jsonString(""): + *p = PriorityPolicyNone + case jsonString("pinned_client"): + *p = PriorityPolicyPinned + case jsonString("overflow"): + *p = PriorityPolicyOverflow + default: + return fmt.Errorf("nats: can not unmarshal %q", data) + } + return nil +} + +func (p PriorityPolicy) MarshalJSON() ([]byte, error) { + switch p { + case PriorityPolicyNone: + return json.Marshal("") + case PriorityPolicyPinned: + return json.Marshal("pinned_client") + case PriorityPolicyOverflow: + return json.Marshal("overflow") + } + return nil, fmt.Errorf("nats: unknown priority policy %v", p) +} + const ( // DeliverAllPolicy starts delivering messages from the very beginning of a // stream. This is the default. diff --git a/jetstream/errors.go b/jetstream/errors.go index fb364341c..c95318864 100644 --- a/jetstream/errors.go +++ b/jetstream/errors.go @@ -195,6 +195,10 @@ var ( // consumer. ErrNoMessages JetStreamError = &jsError{message: "no messages"} + // ErrPinIdMismatch is returned when Pin ID sent in ther request does not match + // the currently pinned consumer subscriber ID on the server. + ErrPinIdMismatch JetStreamError = &jsError{message: "pin ID mismatch"} + // ErrMaxBytesExceeded is returned when a message would exceed MaxBytes set // on a pull request. ErrMaxBytesExceeded JetStreamError = &jsError{message: "message size exceeds max bytes"} diff --git a/jetstream/jetstream_options.go b/jetstream/jetstream_options.go index a08d203fb..fecb83e30 100644 --- a/jetstream/jetstream_options.go +++ b/jetstream/jetstream_options.go @@ -194,6 +194,54 @@ func (t PullThresholdBytes) configureMessages(opts *consumeOpts) error { return nil } +type PullMinPending int + +func (min PullMinPending) configureConsume(opts *consumeOpts) error { + if min < 1 { + return fmt.Errorf("%w: min pending should be more than 0", ErrInvalidOption) + } + opts.MinPending = int64(min) + return nil +} + +func (min PullMinPending) configureMessages(opts *consumeOpts) error { + if min < 1 { + return fmt.Errorf("%w: min pending should be more than 0", ErrInvalidOption) + } + opts.MinPending = int64(min) + return nil +} + +type PullMinAckPending int + +func (min PullMinAckPending) configureConsume(opts *consumeOpts) error { + if min < 1 { + return fmt.Errorf("%w: min pending should be more than 0", ErrInvalidOption) + } + opts.MinPending = int64(min) + return nil +} + +func (min PullMinAckPending) configureMessages(opts *consumeOpts) error { + if min < 1 { + return fmt.Errorf("%w: min pending should be more than 0", ErrInvalidOption) + } + opts.MinPending = int64(min) + return nil +} + +type PriorityGroup string + +func (group PriorityGroup) configureConsume(opts *consumeOpts) error { + opts.Group = string(group) + return nil +} + +func (group PriorityGroup) configureMessages(opts *consumeOpts) error { + opts.Group = string(group) + return nil +} + // PullHeartbeat sets the idle heartbeat duration for a pull subscription // If a client does not receive a heartbeat message from a stream for more // than the idle heartbeat setting, the subscription will be removed @@ -259,6 +307,33 @@ func WithMessagesErrOnMissingHeartbeat(hbErr bool) PullMessagesOpt { }) } +func FetchMinPending(min int64) FetchOpt { + return func(req *pullRequest) error { + if min < 1 { + return fmt.Errorf("%w: min pending should be more than 0", ErrInvalidOption) + } + req.MinPending = min + return nil + } +} + +func FetchMinAckPending(min int64) FetchOpt { + return func(req *pullRequest) error { + if min < 1 { + return fmt.Errorf("%w: min ack pending should be more than 0", ErrInvalidOption) + } + req.MinAckPending = min + return nil + } +} + +func WithPriorityGroup(group string) FetchOpt { + return func(req *pullRequest) error { + req.Group = group + return nil + } +} + // FetchMaxWait sets custom timeout for fetching predefined batch of messages. // // If not provided, a default of 30 seconds will be used. diff --git a/jetstream/message.go b/jetstream/message.go index 81e151268..eb74550e7 100644 --- a/jetstream/message.go +++ b/jetstream/message.go @@ -146,6 +146,7 @@ const ( reqTimeout = "408" maxBytesExceeded = "409" noResponders = "503" + pinIdMismatch = "423" ) // Headers used when publishing messages. @@ -413,6 +414,8 @@ func checkMsg(msg *nats.Msg) (bool, error) { return false, nats.ErrTimeout case controlMsg: return false, nil + case pinIdMismatch: + return false, ErrPinIdMismatch case maxBytesExceeded: if strings.Contains(strings.ToLower(descr), "message size exceeds maxbytes") { return false, ErrMaxBytesExceeded diff --git a/jetstream/pull.go b/jetstream/pull.go index 001a0d183..954aa17ac 100644 --- a/jetstream/pull.go +++ b/jetstream/pull.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "math" + "slices" "sync" "sync/atomic" "time" @@ -81,20 +82,28 @@ type ( name string info *ConsumerInfo subscriptions map[string]*pullSubscription + PinId string } pullRequest struct { - Expires time.Duration `json:"expires,omitempty"` - Batch int `json:"batch,omitempty"` - MaxBytes int `json:"max_bytes,omitempty"` - NoWait bool `json:"no_wait,omitempty"` - Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` + Expires time.Duration `json:"expires,omitempty"` + Batch int `json:"batch,omitempty"` + MaxBytes int `json:"max_bytes,omitempty"` + NoWait bool `json:"no_wait,omitempty"` + Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` + MinPending int64 `json:"min_pending,omitempty"` + MinAckPending int64 `json:"min_ack_pending,omitempty"` + PinId string `json:"id,omitempty"` + Group string `json:"group,omitempty"` } consumeOpts struct { Expires time.Duration MaxMessages int MaxBytes int + MinPending int64 + MinAckPending int64 + Group string Heartbeat time.Duration ErrHandler ConsumeErrHandlerFunc ReportMissingHeartbeats bool @@ -177,6 +186,19 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) ( if err != nil { return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err) } + + if len(p.info.Config.PriorityGroups) != 0 { + if consumeOpts.Group == "" { + return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "priority group is required for priority consumer") + } + + if !slices.Contains(p.info.Config.PriorityGroups, consumeOpts.Group) { + return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "invalid priority group") + } + } else if consumeOpts.Group != "" { + return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "priority group is not supported for this consumer") + } + p.Lock() subject := apiSubj(p.jetStream.apiPrefix, fmt.Sprintf(apiRequestNextT, p.stream, p.name)) @@ -206,6 +228,13 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) ( if sub.hbMonitor != nil { sub.hbMonitor.Stop() } + // Fixme(jrm): clean up - kinda redundant with checkMsg, and double + // check of `Status` header. + if status := msg.Header.Get("Status"); status != "" { + if status == pinIdMismatch { + p.PinId = "" + } + } userMsg, msgErr := checkMsg(msg) if !userMsg && msgErr == nil { if sub.hbMonitor != nil { @@ -242,6 +271,10 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) ( } return } + if pinId := msg.Header.Get("Nats-Pin-Id"); pinId != "" { + // TODO(jrm): do we need a lock here? + p.PinId = pinId + } handler(p.jetStream.toJSMsg(msg)) sub.Lock() sub.decrementPendingMsgs(msg) @@ -274,10 +307,14 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) ( batchSize = min(batchSize, sub.consumeOpts.StopAfter-sub.delivered) } if err := sub.pull(&pullRequest{ - Expires: consumeOpts.Expires, - Batch: batchSize, - MaxBytes: consumeOpts.MaxBytes, - Heartbeat: consumeOpts.Heartbeat, + Expires: consumeOpts.Expires, + Batch: batchSize, + MaxBytes: consumeOpts.MaxBytes, + Heartbeat: consumeOpts.Heartbeat, + MinPending: consumeOpts.MinPending, + MinAckPending: consumeOpts.MinAckPending, + Group: consumeOpts.Group, + PinId: p.PinId, }, subject); err != nil { sub.errs <- err } @@ -309,10 +346,14 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) ( } sub.fetchNext <- &pullRequest{ - Expires: sub.consumeOpts.Expires, - Batch: sub.consumeOpts.MaxMessages, - MaxBytes: sub.consumeOpts.MaxBytes, - Heartbeat: sub.consumeOpts.Heartbeat, + Expires: sub.consumeOpts.Expires, + Batch: sub.consumeOpts.MaxMessages, + MaxBytes: sub.consumeOpts.MaxBytes, + Heartbeat: sub.consumeOpts.Heartbeat, + MinPending: sub.consumeOpts.MinPending, + MinAckPending: sub.consumeOpts.MinAckPending, + Group: sub.consumeOpts.Group, + PinId: p.PinId, } if sub.hbMonitor != nil { sub.hbMonitor.Reset(2 * sub.consumeOpts.Heartbeat) @@ -332,10 +373,14 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) ( batchSize = min(batchSize, sub.consumeOpts.StopAfter-sub.delivered) } sub.fetchNext <- &pullRequest{ - Expires: sub.consumeOpts.Expires, - Batch: batchSize, - MaxBytes: sub.consumeOpts.MaxBytes, - Heartbeat: sub.consumeOpts.Heartbeat, + Expires: sub.consumeOpts.Expires, + Batch: batchSize, + MaxBytes: sub.consumeOpts.MaxBytes, + Heartbeat: sub.consumeOpts.Heartbeat, + MinPending: sub.consumeOpts.MinPending, + MinAckPending: sub.consumeOpts.MinAckPending, + Group: sub.consumeOpts.Group, + PinId: p.PinId, } if sub.hbMonitor != nil { sub.hbMonitor.Reset(2 * sub.consumeOpts.Heartbeat) @@ -403,6 +448,8 @@ func (s *pullSubscription) checkPending() { Batch: batchSize, MaxBytes: maxBytes, Heartbeat: s.consumeOpts.Heartbeat, + PinId: s.consumer.PinId, + Group: s.consumeOpts.Group, } s.pending.msgCount = s.consumeOpts.MaxMessages @@ -725,8 +772,7 @@ func (p *pullConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, // request. It will not wait for more messages to arrive. func (p *pullConsumer) FetchNoWait(batch int) (MessageBatch, error) { req := &pullRequest{ - Batch: batch, - NoWait: true, + Batch: batch, } return p.fetch(req) @@ -751,6 +797,8 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) { if err != nil { return nil, err } + fmt.Printf("setting request pin id: %s\n", p.PinId) + req.PinId = p.PinId if err := sub.pull(req, subject); err != nil { return nil, err } @@ -767,6 +815,13 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) { if hbTimer != nil { hbTimer.Reset(2 * req.Heartbeat) } + // Fixme(jrm): clean up - kinda redundant with checkMsg, and double + // check of `Status` header. + if status := msg.Header.Get("Status"); status != "" { + if status == pinIdMismatch { + res.err = err + } + } userMsg, err := checkMsg(msg) if err != nil { errNotTimeoutOrNoMsgs := !errors.Is(err, nats.ErrTimeout) && !errors.Is(err, ErrNoMessages) @@ -781,6 +836,9 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) { p.Unlock() continue } + if pinId := msg.Header.Get("Nats-Pin-Id"); pinId != "" { + p.PinId = pinId + } res.msgs <- p.jetStream.toJSMsg(msg) meta, err := msg.Metadata() if err != nil { @@ -904,8 +962,9 @@ func (s *pullSubscription) pull(req *pullRequest, subject string) error { if err != nil { return err } - reply := s.subscription.Subject + fmt.Printf("sending json for reply %s %s\n", reply, string(reqJSON)) + if err := s.consumer.jetStream.conn.PublishRequest(subject, reply, reqJSON); err != nil { return err } diff --git a/jetstream/stream.go b/jetstream/stream.go index 4741a51c4..3512d0677 100644 --- a/jetstream/stream.go +++ b/jetstream/stream.go @@ -108,6 +108,10 @@ type ( // ConsumerNames returns a ConsumerNameLister enabling iterating over a // channel of consumer names. ConsumerNames(context.Context) ConsumerNameLister + + // UnpinConsumer unpins the currently pinned client for a consumer for the given group name. + // If consumer does not exist, ErrConsumerNotFound is returned. + UnpinConsumer(ctx context.Context, consumer string, group string) error } RawStreamMsg struct { @@ -234,6 +238,10 @@ type ( apiPaged Consumers []string `json:"consumers"` } + + consumerUnpinRequest struct { + Group string `json:"group"` + } ) // CreateOrUpdateConsumer creates a consumer on a given stream with @@ -716,3 +724,9 @@ func (s *consumerLister) consumerNames(ctx context.Context, stream string) ([]st s.offset += len(resp.Consumers) return resp.Consumers, nil } + +// UnpinConsumer unpins the currently pinned client for a consumer for the given group name. +// If consumer does not exist, ErrConsumerNotFound is returned. +func (s *stream) UnpinConsumer(ctx context.Context, consumer string, group string) error { + return unpinConsumer(ctx, s.jetStream, s.name, consumer, group) +} diff --git a/jetstream/test/consumer_test.go b/jetstream/test/consumer_test.go index bae2ff494..a51f65a8f 100644 --- a/jetstream/test/consumer_test.go +++ b/jetstream/test/consumer_test.go @@ -16,6 +16,7 @@ package test import ( "context" "errors" + "fmt" "testing" "time" @@ -120,6 +121,531 @@ func TestConsumerInfo(t *testing.T) { }) } +func TestConsumerOverflow(t *testing.T) { + + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ + Durable: "cons", + AckPolicy: jetstream.AckExplicitPolicy, + Description: "test consumer", + PriorityPolicy: jetstream.PriorityPolicyOverflow, + PriorityGroups: []string{"A"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Check that consumer got proper priority policy and TTL + info := c.CachedInfo() + if info.Config.PriorityPolicy != jetstream.PriorityPolicyOverflow { + t.Fatalf("Invalid priority policy; expected: %v; got: %v", jetstream.PriorityPolicyOverflow, info.Config.PriorityPolicy) + } + + for i := 0; i < 100; i++ { + _, err = js.Publish(ctx, "FOO.bar", []byte("hello")) + } + + // We are below overflow, so we should not get any moessages. + msgs, err := c.Fetch(10, jetstream.FetchMinPending(110), jetstream.FetchMaxWait(1*time.Second), jetstream.WithPriorityGroup("A")) + count := 0 + for msg := range msgs.Messages() { + msg.Ack() + count++ + } + if count != 0 { + t.Fatalf("Expected 0 messages, got %d", count) + } + + // Add more messages + for i := 0; i < 100; i++ { + _, err = js.Publish(ctx, "FOO.bar", []byte("hello")) + } + + msgs, err = c.Fetch(10, jetstream.FetchMinPending(110), jetstream.WithPriorityGroup("A")) + count = 0 + for msg := range msgs.Messages() { + msg.Ack() + count++ + } + if count != 10 { + t.Fatalf("Expected 10 messages, got %d", count) + } +} + +func TestConsumerPinned(t *testing.T) { + + t.Run("messages", func(t *testing.T) { + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ + Durable: "cons", + AckPolicy: jetstream.AckExplicitPolicy, + Description: "test consumer", + PriorityPolicy: jetstream.PriorityPolicyPinned, + PinnedTTL: 50 * time.Second, + PriorityGroups: []string{"A"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + for i := 0; i < 1000; i++ { + _, err = js.Publish(ctx, "FOO.bar", []byte("hello")) + } + + msgs, err := c.Messages(jetstream.PriorityGroup("A")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + msg, err := msgs.Next() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if msg.Headers().Get("Nats-Pin-Id") == "" { + t.Fatalf("Expected pinned message") + } + fmt.Println("got first messages") + + second, err := s.Consumer(ctx, "cons") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + noMsgs, err := second.Messages(jetstream.PriorityGroup("A")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + done := make(chan struct{}) + errC := make(chan error) + go func() { + _, err := noMsgs.Next() + if err != nil { + errC <- err + return + } + done <- struct{}{} + }() + + select { + case <-done: + t.Fatalf("Expected no message") + case <-time.After(2 * time.Second): + noMsgs.Stop() + } + select { + case <-time.After(5 * time.Second): + t.Fatalf("Expected error") + case <-errC: + } + + }) + + t.Run("consume", func(t *testing.T) { + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ + Durable: "cons", + AckPolicy: jetstream.AckExplicitPolicy, + Description: "test consumer", + PriorityPolicy: jetstream.PriorityPolicyPinned, + PinnedTTL: 50 * time.Second, + PriorityGroups: []string{"A"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + for i := 0; i < 1000; i++ { + _, err = js.Publish(ctx, "FOO.bar", []byte("hello")) + } + + gcount := make(chan struct{}, 100) + + // Initially pinned consumer instance + initialyPinned, err := s.Consumer(ctx, "cons") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // test priority group validation + // invalid priority group + _, err = initialyPinned.Consume(func(m jetstream.Msg) { + }, jetstream.PriorityGroup("BAD")) + if err == nil || err.Error() != "nats: invalid jetstream option: invalid priority group" { + t.Fatalf("Expected invalid priority group error") + } + + // no priority group + _, err = initialyPinned.Consume(func(m jetstream.Msg) { + }) + if err == nil || err.Error() != "nats: invalid jetstream option: priority group is required for priority consumer" { + t.Fatalf("Expected invalid priority group error") + } + + count := 0 + ip, err := initialyPinned.Consume(func(m jetstream.Msg) { + m.Ack() + count++ + gcount <- struct{}{} + }, jetstream.PullThresholdMessages(10), jetstream.PriorityGroup("A")) + defer ip.Stop() + + // Second consume instance that should remain passive. + notPinnedC := 0 + np, err := c.Consume(func(m jetstream.Msg) { + m.Ack() + notPinnedC++ + gcount <- struct{}{} + }, jetstream.PriorityGroup("A")) + defer np.Stop() + + outer: + for { + select { + case <-gcount: + if count == 1000 { + break outer + } + case <-time.After(30 * time.Second): + t.Fatalf("Did not get all messages in time") + } + } + + if count != 1000 { + t.Fatalf("Expected 1000 messages for pinned consumer, got %d", count) + } + if notPinnedC != 0 { + t.Fatalf("Expected 0 messages for not pinned, got %d", notPinnedC) + } + + }) + + t.Run("fetch", func(t *testing.T) { + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ + Durable: "cons", + AckPolicy: jetstream.AckExplicitPolicy, + Description: "test consumer", + PriorityPolicy: jetstream.PriorityPolicyPinned, + PinnedTTL: 5 * time.Second, + PriorityGroups: []string{"A"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Check that consumer got proper priority policy and TTL + info := c.CachedInfo() + if info.Config.PriorityPolicy != jetstream.PriorityPolicyPinned { + t.Fatalf("Invalid priority policy; expected: %v; got: %v", jetstream.PriorityPolicyPinned, info.Config.PriorityPolicy) + } + if info.Config.PinnedTTL != 5*time.Second { + t.Fatalf("Invalid pinned TTL; expected: %v; got: %v", 2*time.Second, info.Config.PinnedTTL) + } + + for i := 0; i < 100; i++ { + _, err = js.Publish(ctx, "FOO.bar", []byte("hello")) + } + + // Initial fetch. + // Should get all messages and get a Pin ID. + msgs, err := c.Fetch(10, jetstream.WithPriorityGroup("A")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + count := 0 + id := "" + for msg := range msgs.Messages() { + msg.Ack() + count++ + natsMsgId := msg.Headers().Get("Nats-Pin-Id") + if id == "" { + id = natsMsgId + } else { + if id != natsMsgId { + t.Fatalf("Expected Nats-Msg-Id to be the same for all messages") + } + } + } + if count != 10 { + t.Fatalf("Expected 10 messages, got %d", count) + + } + + // Different + cdiff, err := js.Consumer(ctx, "foo", "cons") + msgs2, err := cdiff.Fetch(10, jetstream.FetchMaxWait(1*time.Second), jetstream.WithPriorityGroup("A")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + count = 0 + for msg := range msgs2.Messages() { + msg.Ack() + count++ + } + if count != 0 { + t.Fatalf("Expected 0 messages, got %d", count) + } + if msgs2.Error() != nil { + t.Fatalf("Unexpected error: %v", msgs2.Error()) + } + + count = 0 + + // the same again, should be fine + msgs3, err := c.Fetch(10, jetstream.FetchMaxWait(3*time.Second), jetstream.WithPriorityGroup("A")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + for msg := range msgs3.Messages() { + if pinId := msg.Headers().Get("Nats-Pin-Id"); pinId == "" { + t.Fatalf("missing Nats-Pin-Id header") + } + msg.Ack() + count++ + } + if count != 10 { + t.Fatalf("Expected 10 messages, got %d", count) + } + if msgs3.Error() != nil { + t.Fatalf("Unexpected error: %v", msgs3.Error()) + } + + // Wait for the TTL to expire, expect different ID + count = 0 + time.Sleep(10 * time.Second) + // The same instance, should work fine. + msgs4, err := c.Fetch(10, jetstream.FetchMaxWait(3*time.Second), jetstream.WithPriorityGroup("A")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + for msg := range msgs4.Messages() { + if msg == nil { + break + } + newId := msg.Headers().Get("Nats-Pin-Id") + if newId == id { + t.Fatalf("Expected new pull to have different ID. old: %s, new: %s", id, newId) + } + msg.Ack() + count++ + } + if count != 10 { + t.Fatalf("Expected 10 messages, got %d", count) + } + if msgs4.Error() != nil { + t.Fatalf("Unexpected error: %v", msgs4.Error()) + } + }) + + t.Run("unpin", func(t *testing.T) { + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ + Durable: "cons", + AckPolicy: jetstream.AckExplicitPolicy, + Description: "test consumer", + PriorityPolicy: jetstream.PriorityPolicyPinned, + PinnedTTL: 50 * time.Second, + PriorityGroups: []string{"A"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + for i := 0; i < 1000; i++ { + _, err = js.Publish(ctx, "FOO.bar", []byte("hello")) + } + + msgs, err := c.Messages(jetstream.PriorityGroup("A")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + msg, err := msgs.Next() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + firstPinId := msg.Headers().Get("Nats-Pin-Id") + if firstPinId == "" { + t.Fatalf("Expected pinned message") + } + fmt.Printf("got first pin ID messages (ID: %s)\n", firstPinId) + + second, err := s.Consumer(ctx, "cons") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + noMsgs, err := second.Messages(jetstream.PriorityGroup("A")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + done := make(chan struct{}) + errC := make(chan error) + go func() { + _, err := noMsgs.Next() + if err != nil { + errC <- err + return + } + done <- struct{}{} + }() + + select { + case <-done: + t.Fatalf("Expected no message") + case <-time.After(2 * time.Second): + noMsgs.Stop() + } + select { + case <-time.After(5 * time.Second): + t.Fatalf("Expected error") + case <-errC: + } + + third, err := s.Consumer(ctx, "cons") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + yesMsgs, err := third.Messages(jetstream.PriorityGroup("A")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + go func() { + msg, err := yesMsgs.Next() + newPinId := msg.Headers().Get("Nats-Pin-Id") + fmt.Printf("got new pin ID message after unpin (ID: %s)\n", newPinId) + if newPinId == firstPinId || newPinId == "" { + errC <- fmt.Errorf("Expected new pin ID, got %s", newPinId) + return + } + if err != nil { + errC <- err + return + } + done <- struct{}{} + }() + + err = s.UnpinConsumer(ctx, "cons", "A") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + select { + case <-done: + case err := <-errC: + t.Fatalf("Unexpected error: %v", err) + case <-time.After(4 * time.Second): + t.Fatalf("Should not time out") + } + yesMsgs.Stop() + }) +} + func TestConsumerCachedInfo(t *testing.T) { srv := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, srv)