diff --git a/api/consumers.go b/api/consumers.go index 0ea9fd0f..6c0fe6fa 100644 --- a/api/consumers.go +++ b/api/consumers.go @@ -17,6 +17,8 @@ import ( "encoding/json" "fmt" "time" + + "gopkg.in/yaml.v3" ) // also update wellKnownSubjectSchemas @@ -203,6 +205,33 @@ func (p AckPolicy) String() string { } } +func (p *AckPolicy) UnmarshalYAML(data *yaml.Node) error { + switch data.Value { + case "none": + *p = AckNone + case "all": + *p = AckAll + case "explicit": + *p = AckExplicit + default: + return fmt.Errorf("can not unmarshal: %v", data.Value) + } + return nil +} + +func (p AckPolicy) MarshalYAML() (any, error) { + switch p { + case AckNone: + return "none", nil + case AckAll: + return "all", nil + case AckExplicit: + return "explicit", nil + default: + return nil, fmt.Errorf("unknown acknowlegement policy: %v", p) + } +} + func (p *AckPolicy) UnmarshalJSON(data []byte) error { switch string(data) { case jsonString("none"): @@ -249,6 +278,29 @@ func (p ReplayPolicy) String() string { } } +func (p *ReplayPolicy) UnmarshalYAML(data *yaml.Node) error { + switch data.Value { + case "instant": + *p = ReplayInstant + case "original": + *p = ReplayOriginal + default: + return fmt.Errorf("can not unmarshal %v", data.Value) + } + return nil +} + +func (p ReplayPolicy) MarshalYAML() (any, error) { + switch p { + case ReplayInstant: + return "original", nil + case ReplayOriginal: + return "instant", nil + default: + return nil, fmt.Errorf("unknown replay policy: %v", p) + } +} + func (p *ReplayPolicy) UnmarshalJSON(data []byte) error { switch string(data) { case jsonString("instant"): @@ -311,6 +363,46 @@ func (p DeliverPolicy) String() string { } } +func (p *DeliverPolicy) UnmarshalYAML(data *yaml.Node) error { + switch data.Value { + case "all", "undefined": + *p = DeliverAll + case "last": + *p = DeliverLast + case "new": + *p = DeliverNew + case "by_start_sequence": + *p = DeliverByStartSequence + case "by_start_time": + *p = DeliverByStartTime + case "last_per_subject": + *p = DeliverLastPerSubject + default: + return fmt.Errorf("can not unmarshal %q", data.Value) + } + + return nil +} + +func (p DeliverPolicy) MarshalYAML() (any, error) { + switch p { + case DeliverAll: + return "all", nil + case DeliverLast: + return "last", nil + case DeliverNew: + return "new", nil + case DeliverByStartSequence: + return "by_start_sequence", nil + case DeliverByStartTime: + return "by_start_time", nil + case DeliverLastPerSubject: + return "last_per_subject", nil + default: + return nil, fmt.Errorf("unknown deliver policy %v", p) + } +} + func (p *DeliverPolicy) UnmarshalJSON(data []byte) error { switch string(data) { case jsonString("all"), jsonString("undefined"): @@ -325,6 +417,8 @@ func (p *DeliverPolicy) UnmarshalJSON(data []byte) error { *p = DeliverByStartTime case jsonString("last_per_subject"): *p = DeliverLastPerSubject + default: + return fmt.Errorf("can not unmarshal %q", data) } return nil @@ -369,6 +463,33 @@ func (p PriorityPolicy) String() string { } } +func (p *PriorityPolicy) UnmarshalYAML(data *yaml.Node) error { + switch data.Value { + case "none": + *p = PriorityNone + case "overflow": + *p = PriorityOverflow + case "pinned_client": + *p = PriorityPinnedClient + default: + return fmt.Errorf("cannot unmarshal %v", data.Value) + } + return nil +} + +func (p PriorityPolicy) MarshalYAML() (any, error) { + switch p { + case PriorityNone: + return "none", nil + case PriorityOverflow: + return "overflow", nil + case PriorityPinnedClient: + return "pinned_client", nil + default: + return nil, fmt.Errorf("unknown priority policy: %v", p) + } +} + func (p *PriorityPolicy) UnmarshalJSON(data []byte) error { switch string(data) { case jsonString("none"): @@ -378,7 +499,7 @@ func (p *PriorityPolicy) UnmarshalJSON(data []byte) error { case jsonString("pinned_client"): *p = PriorityPinnedClient default: - return fmt.Errorf("unknown priority policy: %v", string(data)) + return fmt.Errorf("cannot unmarshal %v", string(data)) } return nil } @@ -400,47 +521,47 @@ func (p PriorityPolicy) MarshalJSON() ([]byte, error) { // // NATS Schema Type io.nats.jetstream.api.v1.consumer_configuration type ConsumerConfig struct { - Description string `json:"description,omitempty"` - AckPolicy AckPolicy `json:"ack_policy"` - AckWait time.Duration `json:"ack_wait,omitempty"` - DeliverPolicy DeliverPolicy `json:"deliver_policy"` - DeliverSubject string `json:"deliver_subject,omitempty"` - DeliverGroup string `json:"deliver_group,omitempty"` - Durable string `json:"durable_name,omitempty"` // Durable is deprecated. All consumers will have names. picked by clients. - Name string `json:"name,omitempty"` - FilterSubject string `json:"filter_subject,omitempty"` - FilterSubjects []string `json:"filter_subjects,omitempty"` - FlowControl bool `json:"flow_control,omitempty"` - Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` - MaxAckPending int `json:"max_ack_pending,omitempty"` - MaxDeliver int `json:"max_deliver,omitempty"` - BackOff []time.Duration `json:"backoff,omitempty"` - MaxWaiting int `json:"max_waiting,omitempty"` - OptStartSeq uint64 `json:"opt_start_seq,omitempty"` - OptStartTime *time.Time `json:"opt_start_time,omitempty"` - RateLimit uint64 `json:"rate_limit_bps,omitempty"` - ReplayPolicy ReplayPolicy `json:"replay_policy"` - SampleFrequency string `json:"sample_freq,omitempty"` - HeadersOnly bool `json:"headers_only,omitempty"` - MaxRequestBatch int `json:"max_batch,omitempty"` - MaxRequestExpires time.Duration `json:"max_expires,omitempty"` - MaxRequestMaxBytes int `json:"max_bytes,omitempty"` - InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` - Replicas int `json:"num_replicas"` - MemoryStorage bool `json:"mem_storage,omitempty"` + Description string `json:"description,omitempty" yaml:"description"` + AckPolicy AckPolicy `json:"ack_policy" yaml:"ack_policy"` + AckWait time.Duration `json:"ack_wait,omitempty" yaml:"ack_wait"` + DeliverPolicy DeliverPolicy `json:"deliver_policy" yaml:"deliver_policy"` + DeliverSubject string `json:"deliver_subject,omitempty" yaml:"deliver_subject"` + DeliverGroup string `json:"deliver_group,omitempty" yaml:"deliver_group"` + Durable string `json:"durable_name,omitempty" yaml:"durable_name"` // Durable is deprecated. All consumers will have names. picked by clients. + Name string `json:"name,omitempty" yaml:"name"` + FilterSubject string `json:"filter_subject,omitempty" yaml:"filter_subject"` + FilterSubjects []string `json:"filter_subjects,omitempty" yaml:"filter_subjects"` + FlowControl bool `json:"flow_control,omitempty" yaml:"flow_control"` + Heartbeat time.Duration `json:"idle_heartbeat,omitempty" yaml:"idle_heartbeat"` + MaxAckPending int `json:"max_ack_pending,omitempty" yaml:"max_ack_pending"` + MaxDeliver int `json:"max_deliver,omitempty" yaml:"max_deliver"` + BackOff []time.Duration `json:"backoff,omitempty" yaml:"backoff"` + MaxWaiting int `json:"max_waiting,omitempty" yaml:"max_waiting"` + OptStartSeq uint64 `json:"opt_start_seq,omitempty" yaml:"opt_start_seq"` + OptStartTime *time.Time `json:"opt_start_time,omitempty" yaml:"opt_start_time"` + RateLimit uint64 `json:"rate_limit_bps,omitempty" yaml:"rate_limit_bps"` + ReplayPolicy ReplayPolicy `json:"replay_policy" yaml:"replay_policy"` + SampleFrequency string `json:"sample_freq,omitempty" yaml:"sample_freq"` + HeadersOnly bool `json:"headers_only,omitempty" yaml:"headers_only"` + MaxRequestBatch int `json:"max_batch,omitempty" yaml:"max_batch"` + MaxRequestExpires time.Duration `json:"max_expires,omitempty" yaml:"max_expires"` + MaxRequestMaxBytes int `json:"max_bytes,omitempty" yaml:"max_bytes"` + InactiveThreshold time.Duration `json:"inactive_threshold,omitempty" yaml:"inactive_threshold"` + Replicas int `json:"num_replicas" yaml:"num_replicas"` + MemoryStorage bool `json:"mem_storage,omitempty" yaml:"mem_storage"` // Metadata is additional metadata for the Consumer. - Metadata map[string]string `json:"metadata,omitempty"` + Metadata map[string]string `json:"metadata,omitempty" yaml:"metadata"` // PauseUntil is for suspending the consumer until the deadline. - PauseUntil time.Time `json:"pause_until,omitempty"` + PauseUntil time.Time `json:"pause_until,omitempty" yaml:"pause_until"` // Priority groups - PriorityGroups []string `json:"priority_groups,omitempty"` - PriorityPolicy PriorityPolicy `json:"priority_policy,omitempty"` - PinnedTTL time.Duration `json:"priority_timeout,omitempty"` + PriorityGroups []string `json:"priority_groups,omitempty" yaml:"priority_groups"` + PriorityPolicy PriorityPolicy `json:"priority_policy,omitempty" yaml:"priority_policy"` + PinnedTTL time.Duration `json:"priority_timeout,omitempty" yaml:"priority_timeout"` // Don't add to general clients. - Direct bool `json:"direct,omitempty"` + Direct bool `json:"direct,omitempty" yaml:"direct"` } // SequenceInfo is the consumer and stream sequence that uniquely identify a message diff --git a/schema_source/jetstream/api/v1/definitions.json b/schema_source/jetstream/api/v1/definitions.json index b7f83aca..92b64548 100644 --- a/schema_source/jetstream/api/v1/definitions.json +++ b/schema_source/jetstream/api/v1/definitions.json @@ -708,11 +708,24 @@ "type": "string", "maxLength": 4096 }, + "deliver_policy": { + "description": "The point in the stream from which to receive messages", + "type": "string", + "enum": ["all", "last", "new", "by_start_sequence", "by_start_time", "last_per_subject"], + "default": "all" + }, "deliver_subject": { + "description": "The subject to delivery messages to", + "type": "string", + "minLength": 1 + }, + "deliver_group": { + "description": "The queue group name used to distribute messages among subscribers", "type": "string", "minLength": 1 }, "ack_policy": { + "description": "The requirement of client acknowledgments", "type": "string", "enum": ["none", "all", "explicit"], "default": "none" @@ -741,11 +754,13 @@ } }, "replay_policy": { + "description": "The rate at which messages will be pushed to a client", "type": "string", "enum": ["instant", "original"], "default": "instant" }, "sample_freq": { + "description": "Sets the percentage of acknowledgments that should be sampled for observability", "type": "string" }, "rate_limit_bps": { @@ -841,11 +856,20 @@ } }, "priority_policy": { - "description": "The policy the consumer is set to", + "description": "The priority policy the consumer is set to", "$ref": "#/definitions/priority_policy" }, - "pinned_ttl": { + "priority_timeout": { "description": "For pinned_client priority policy how long before the client times out" + }, + "opt_start_seq": { + "description": "Start sequence used with the DeliverByStartSequence deliver policy.", + "$ref": "#/definitions/golang_uint64", + "minimum": 0 + }, + "opt_start_time": { + "description": "Start time used with the DeliverByStartSequence deliver policy", + "$ref": "#/definitions/golang_time" } } }, diff --git a/schemas/jetstream/api/v1/consumer_configuration.json b/schemas/jetstream/api/v1/consumer_configuration.json index c5f1540f..990e6e48 100644 --- a/schemas/jetstream/api/v1/consumer_configuration.json +++ b/schemas/jetstream/api/v1/consumer_configuration.json @@ -125,11 +125,31 @@ "type": "string", "maxLength": 4096 }, + "deliver_policy": { + "description": "The point in the stream from which to receive messages", + "type": "string", + "enum": [ + "all", + "last", + "new", + "by_start_sequence", + "by_start_time", + "last_per_subject" + ], + "default": "all" + }, "deliver_subject": { + "description": "The subject to delivery messages to", + "type": "string", + "minLength": 1 + }, + "deliver_group": { + "description": "The queue group name used to distribute messages among subscribers", "type": "string", "minLength": 1 }, "ack_policy": { + "description": "The requirement of client acknowledgments", "type": "string", "enum": [ "none", @@ -167,6 +187,7 @@ } }, "replay_policy": { + "description": "The rate at which messages will be pushed to a client", "type": "string", "enum": [ "instant", @@ -175,6 +196,7 @@ "default": "instant" }, "sample_freq": { + "description": "Sets the percentage of acknowledgments that should be sampled for observability", "type": "string" }, "rate_limit_bps": { @@ -302,8 +324,21 @@ "pinned_client" ] }, - "pinned_ttl": { + "priority_timeout": { "description": "For pinned_client priority policy how long before the client times out" + }, + "opt_start_seq": { + "description": "Start sequence used with the DeliverByStartSequence deliver policy.", + "minimum": 0, + "$comment": "unsigned 64 bit integer", + "type": "integer", + "maximum": 18446744073709551615 + }, + "opt_start_time": { + "description": "Start time used with the DeliverByStartSequence deliver policy", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" } } } diff --git a/schemas/jetstream/api/v1/consumer_create_request.json b/schemas/jetstream/api/v1/consumer_create_request.json index 97b77c3a..e3d73b00 100644 --- a/schemas/jetstream/api/v1/consumer_create_request.json +++ b/schemas/jetstream/api/v1/consumer_create_request.json @@ -137,11 +137,31 @@ "type": "string", "maxLength": 4096 }, + "deliver_policy": { + "description": "The point in the stream from which to receive messages", + "type": "string", + "enum": [ + "all", + "last", + "new", + "by_start_sequence", + "by_start_time", + "last_per_subject" + ], + "default": "all" + }, "deliver_subject": { + "description": "The subject to delivery messages to", + "type": "string", + "minLength": 1 + }, + "deliver_group": { + "description": "The queue group name used to distribute messages among subscribers", "type": "string", "minLength": 1 }, "ack_policy": { + "description": "The requirement of client acknowledgments", "type": "string", "enum": [ "none", @@ -179,6 +199,7 @@ } }, "replay_policy": { + "description": "The rate at which messages will be pushed to a client", "type": "string", "enum": [ "instant", @@ -187,6 +208,7 @@ "default": "instant" }, "sample_freq": { + "description": "Sets the percentage of acknowledgments that should be sampled for observability", "type": "string" }, "rate_limit_bps": { @@ -314,8 +336,21 @@ "pinned_client" ] }, - "pinned_ttl": { + "priority_timeout": { "description": "For pinned_client priority policy how long before the client times out" + }, + "opt_start_seq": { + "description": "Start sequence used with the DeliverByStartSequence deliver policy.", + "minimum": 0, + "$comment": "unsigned 64 bit integer", + "type": "integer", + "maximum": 18446744073709551615 + }, + "opt_start_time": { + "description": "Start time used with the DeliverByStartSequence deliver policy", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" } } }, diff --git a/schemas/jetstream/api/v1/consumer_create_response.json b/schemas/jetstream/api/v1/consumer_create_response.json index 86aaa243..272dd74c 100644 --- a/schemas/jetstream/api/v1/consumer_create_response.json +++ b/schemas/jetstream/api/v1/consumer_create_response.json @@ -159,11 +159,31 @@ "type": "string", "maxLength": 4096 }, + "deliver_policy": { + "description": "The point in the stream from which to receive messages", + "type": "string", + "enum": [ + "all", + "last", + "new", + "by_start_sequence", + "by_start_time", + "last_per_subject" + ], + "default": "all" + }, "deliver_subject": { + "description": "The subject to delivery messages to", + "type": "string", + "minLength": 1 + }, + "deliver_group": { + "description": "The queue group name used to distribute messages among subscribers", "type": "string", "minLength": 1 }, "ack_policy": { + "description": "The requirement of client acknowledgments", "type": "string", "enum": [ "none", @@ -201,6 +221,7 @@ } }, "replay_policy": { + "description": "The rate at which messages will be pushed to a client", "type": "string", "enum": [ "instant", @@ -209,6 +230,7 @@ "default": "instant" }, "sample_freq": { + "description": "Sets the percentage of acknowledgments that should be sampled for observability", "type": "string" }, "rate_limit_bps": { @@ -336,8 +358,21 @@ "pinned_client" ] }, - "pinned_ttl": { + "priority_timeout": { "description": "For pinned_client priority policy how long before the client times out" + }, + "opt_start_seq": { + "description": "Start sequence used with the DeliverByStartSequence deliver policy.", + "minimum": 0, + "$comment": "unsigned 64 bit integer", + "type": "integer", + "maximum": 18446744073709551615 + }, + "opt_start_time": { + "description": "Start time used with the DeliverByStartSequence deliver policy", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" } } }, diff --git a/schemas/jetstream/api/v1/consumer_info_response.json b/schemas/jetstream/api/v1/consumer_info_response.json index 6fb453dd..9d9efdd6 100644 --- a/schemas/jetstream/api/v1/consumer_info_response.json +++ b/schemas/jetstream/api/v1/consumer_info_response.json @@ -159,11 +159,31 @@ "type": "string", "maxLength": 4096 }, + "deliver_policy": { + "description": "The point in the stream from which to receive messages", + "type": "string", + "enum": [ + "all", + "last", + "new", + "by_start_sequence", + "by_start_time", + "last_per_subject" + ], + "default": "all" + }, "deliver_subject": { + "description": "The subject to delivery messages to", + "type": "string", + "minLength": 1 + }, + "deliver_group": { + "description": "The queue group name used to distribute messages among subscribers", "type": "string", "minLength": 1 }, "ack_policy": { + "description": "The requirement of client acknowledgments", "type": "string", "enum": [ "none", @@ -201,6 +221,7 @@ } }, "replay_policy": { + "description": "The rate at which messages will be pushed to a client", "type": "string", "enum": [ "instant", @@ -209,6 +230,7 @@ "default": "instant" }, "sample_freq": { + "description": "Sets the percentage of acknowledgments that should be sampled for observability", "type": "string" }, "rate_limit_bps": { @@ -336,8 +358,21 @@ "pinned_client" ] }, - "pinned_ttl": { + "priority_timeout": { "description": "For pinned_client priority policy how long before the client times out" + }, + "opt_start_seq": { + "description": "Start sequence used with the DeliverByStartSequence deliver policy.", + "minimum": 0, + "$comment": "unsigned 64 bit integer", + "type": "integer", + "maximum": 18446744073709551615 + }, + "opt_start_time": { + "description": "Start time used with the DeliverByStartSequence deliver policy", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" } } }, diff --git a/schemas/jetstream/api/v1/consumer_list_response.json b/schemas/jetstream/api/v1/consumer_list_response.json index 20e9d356..41b3c8f1 100644 --- a/schemas/jetstream/api/v1/consumer_list_response.json +++ b/schemas/jetstream/api/v1/consumer_list_response.json @@ -224,11 +224,31 @@ "type": "string", "maxLength": 4096 }, + "deliver_policy": { + "description": "The point in the stream from which to receive messages", + "type": "string", + "enum": [ + "all", + "last", + "new", + "by_start_sequence", + "by_start_time", + "last_per_subject" + ], + "default": "all" + }, "deliver_subject": { + "description": "The subject to delivery messages to", + "type": "string", + "minLength": 1 + }, + "deliver_group": { + "description": "The queue group name used to distribute messages among subscribers", "type": "string", "minLength": 1 }, "ack_policy": { + "description": "The requirement of client acknowledgments", "type": "string", "enum": [ "none", @@ -266,6 +286,7 @@ } }, "replay_policy": { + "description": "The rate at which messages will be pushed to a client", "type": "string", "enum": [ "instant", @@ -274,6 +295,7 @@ "default": "instant" }, "sample_freq": { + "description": "Sets the percentage of acknowledgments that should be sampled for observability", "type": "string" }, "rate_limit_bps": { @@ -401,8 +423,21 @@ "pinned_client" ] }, - "pinned_ttl": { + "priority_timeout": { "description": "For pinned_client priority policy how long before the client times out" + }, + "opt_start_seq": { + "description": "Start sequence used with the DeliverByStartSequence deliver policy.", + "minimum": 0, + "$comment": "unsigned 64 bit integer", + "type": "integer", + "maximum": 18446744073709551615 + }, + "opt_start_time": { + "description": "Start time used with the DeliverByStartSequence deliver policy", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" } } },