From d5b397f402b0eee38f56e5fa79d5e07ad5692fcc Mon Sep 17 00:00:00 2001 From: Pieter Loubser Date: Wed, 22 Jan 2025 13:25:19 +0000 Subject: [PATCH] (#1145) Fix consumer interactive edit Here we add the yaml markup to ConsumerConfig, as well as update some missing values for conumer config in the API definition. We also add missing UmarshallYAML and MarshalYAML functions to the enum fields used by ConsumerConfig so that they can render correctly when editing. --- api/consumers.go | 191 ++++++++++++++---- .../jetstream/api/v1/definitions.json | 28 ++- .../api/v1/consumer_configuration.json | 37 +++- .../api/v1/consumer_create_request.json | 37 +++- .../api/v1/consumer_create_response.json | 37 +++- .../api/v1/consumer_info_response.json | 37 +++- .../api/v1/consumer_list_response.json | 37 +++- 7 files changed, 362 insertions(+), 42 deletions(-) diff --git a/api/consumers.go b/api/consumers.go index 0ea9fd0f..6c0fe6fa 100644 --- a/api/consumers.go +++ b/api/consumers.go @@ -17,6 +17,8 @@ import ( "encoding/json" "fmt" "time" + + "gopkg.in/yaml.v3" ) // also update wellKnownSubjectSchemas @@ -203,6 +205,33 @@ func (p AckPolicy) String() string { } } +func (p *AckPolicy) UnmarshalYAML(data *yaml.Node) error { + switch data.Value { + case "none": + *p = AckNone + case "all": + *p = AckAll + case "explicit": + *p = AckExplicit + default: + return fmt.Errorf("can not unmarshal: %v", data.Value) + } + return nil +} + +func (p AckPolicy) MarshalYAML() (any, error) { + switch p { + case AckNone: + return "none", nil + case AckAll: + return "all", nil + case AckExplicit: + return "explicit", nil + default: + return nil, fmt.Errorf("unknown acknowlegement policy: %v", p) + } +} + func (p *AckPolicy) UnmarshalJSON(data []byte) error { switch string(data) { case jsonString("none"): @@ -249,6 +278,29 @@ func (p ReplayPolicy) String() string { } } +func (p *ReplayPolicy) UnmarshalYAML(data *yaml.Node) error { + switch data.Value { + case "instant": + *p = ReplayInstant + case "original": + *p = ReplayOriginal + default: + return fmt.Errorf("can not unmarshal %v", data.Value) + } + return nil +} + +func (p ReplayPolicy) MarshalYAML() (any, error) { + switch p { + case ReplayInstant: + return "original", nil + case ReplayOriginal: + return "instant", nil + default: + return nil, fmt.Errorf("unknown replay policy: %v", p) + } +} + func (p *ReplayPolicy) UnmarshalJSON(data []byte) error { switch string(data) { case jsonString("instant"): @@ -311,6 +363,46 @@ func (p DeliverPolicy) String() string { } } +func (p *DeliverPolicy) UnmarshalYAML(data *yaml.Node) error { + switch data.Value { + case "all", "undefined": + *p = DeliverAll + case "last": + *p = DeliverLast + case "new": + *p = DeliverNew + case "by_start_sequence": + *p = DeliverByStartSequence + case "by_start_time": + *p = DeliverByStartTime + case "last_per_subject": + *p = DeliverLastPerSubject + default: + return fmt.Errorf("can not unmarshal %q", data.Value) + } + + return nil +} + +func (p DeliverPolicy) MarshalYAML() (any, error) { + switch p { + case DeliverAll: + return "all", nil + case DeliverLast: + return "last", nil + case DeliverNew: + return "new", nil + case DeliverByStartSequence: + return "by_start_sequence", nil + case DeliverByStartTime: + return "by_start_time", nil + case DeliverLastPerSubject: + return "last_per_subject", nil + default: + return nil, fmt.Errorf("unknown deliver policy %v", p) + } +} + func (p *DeliverPolicy) UnmarshalJSON(data []byte) error { switch string(data) { case jsonString("all"), jsonString("undefined"): @@ -325,6 +417,8 @@ func (p *DeliverPolicy) UnmarshalJSON(data []byte) error { *p = DeliverByStartTime case jsonString("last_per_subject"): *p = DeliverLastPerSubject + default: + return fmt.Errorf("can not unmarshal %q", data) } return nil @@ -369,6 +463,33 @@ func (p PriorityPolicy) String() string { } } +func (p *PriorityPolicy) UnmarshalYAML(data *yaml.Node) error { + switch data.Value { + case "none": + *p = PriorityNone + case "overflow": + *p = PriorityOverflow + case "pinned_client": + *p = PriorityPinnedClient + default: + return fmt.Errorf("cannot unmarshal %v", data.Value) + } + return nil +} + +func (p PriorityPolicy) MarshalYAML() (any, error) { + switch p { + case PriorityNone: + return "none", nil + case PriorityOverflow: + return "overflow", nil + case PriorityPinnedClient: + return "pinned_client", nil + default: + return nil, fmt.Errorf("unknown priority policy: %v", p) + } +} + func (p *PriorityPolicy) UnmarshalJSON(data []byte) error { switch string(data) { case jsonString("none"): @@ -378,7 +499,7 @@ func (p *PriorityPolicy) UnmarshalJSON(data []byte) error { case jsonString("pinned_client"): *p = PriorityPinnedClient default: - return fmt.Errorf("unknown priority policy: %v", string(data)) + return fmt.Errorf("cannot unmarshal %v", string(data)) } return nil } @@ -400,47 +521,47 @@ func (p PriorityPolicy) MarshalJSON() ([]byte, error) { // // NATS Schema Type io.nats.jetstream.api.v1.consumer_configuration type ConsumerConfig struct { - Description string `json:"description,omitempty"` - AckPolicy AckPolicy `json:"ack_policy"` - AckWait time.Duration `json:"ack_wait,omitempty"` - DeliverPolicy DeliverPolicy `json:"deliver_policy"` - DeliverSubject string `json:"deliver_subject,omitempty"` - DeliverGroup string `json:"deliver_group,omitempty"` - Durable string `json:"durable_name,omitempty"` // Durable is deprecated. All consumers will have names. picked by clients. - Name string `json:"name,omitempty"` - FilterSubject string `json:"filter_subject,omitempty"` - FilterSubjects []string `json:"filter_subjects,omitempty"` - FlowControl bool `json:"flow_control,omitempty"` - Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` - MaxAckPending int `json:"max_ack_pending,omitempty"` - MaxDeliver int `json:"max_deliver,omitempty"` - BackOff []time.Duration `json:"backoff,omitempty"` - MaxWaiting int `json:"max_waiting,omitempty"` - OptStartSeq uint64 `json:"opt_start_seq,omitempty"` - OptStartTime *time.Time `json:"opt_start_time,omitempty"` - RateLimit uint64 `json:"rate_limit_bps,omitempty"` - ReplayPolicy ReplayPolicy `json:"replay_policy"` - SampleFrequency string `json:"sample_freq,omitempty"` - HeadersOnly bool `json:"headers_only,omitempty"` - MaxRequestBatch int `json:"max_batch,omitempty"` - MaxRequestExpires time.Duration `json:"max_expires,omitempty"` - MaxRequestMaxBytes int `json:"max_bytes,omitempty"` - InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` - Replicas int `json:"num_replicas"` - MemoryStorage bool `json:"mem_storage,omitempty"` + Description string `json:"description,omitempty" yaml:"description"` + AckPolicy AckPolicy `json:"ack_policy" yaml:"ack_policy"` + AckWait time.Duration `json:"ack_wait,omitempty" yaml:"ack_wait"` + DeliverPolicy DeliverPolicy `json:"deliver_policy" yaml:"deliver_policy"` + DeliverSubject string `json:"deliver_subject,omitempty" yaml:"deliver_subject"` + DeliverGroup string `json:"deliver_group,omitempty" yaml:"deliver_group"` + Durable string `json:"durable_name,omitempty" yaml:"durable_name"` // Durable is deprecated. All consumers will have names. picked by clients. + Name string `json:"name,omitempty" yaml:"name"` + FilterSubject string `json:"filter_subject,omitempty" yaml:"filter_subject"` + FilterSubjects []string `json:"filter_subjects,omitempty" yaml:"filter_subjects"` + FlowControl bool `json:"flow_control,omitempty" yaml:"flow_control"` + Heartbeat time.Duration `json:"idle_heartbeat,omitempty" yaml:"idle_heartbeat"` + MaxAckPending int `json:"max_ack_pending,omitempty" yaml:"max_ack_pending"` + MaxDeliver int `json:"max_deliver,omitempty" yaml:"max_deliver"` + BackOff []time.Duration `json:"backoff,omitempty" yaml:"backoff"` + MaxWaiting int `json:"max_waiting,omitempty" yaml:"max_waiting"` + OptStartSeq uint64 `json:"opt_start_seq,omitempty" yaml:"opt_start_seq"` + OptStartTime *time.Time `json:"opt_start_time,omitempty" yaml:"opt_start_time"` + RateLimit uint64 `json:"rate_limit_bps,omitempty" yaml:"rate_limit_bps"` + ReplayPolicy ReplayPolicy `json:"replay_policy" yaml:"replay_policy"` + SampleFrequency string `json:"sample_freq,omitempty" yaml:"sample_freq"` + HeadersOnly bool `json:"headers_only,omitempty" yaml:"headers_only"` + MaxRequestBatch int `json:"max_batch,omitempty" yaml:"max_batch"` + MaxRequestExpires time.Duration `json:"max_expires,omitempty" yaml:"max_expires"` + MaxRequestMaxBytes int `json:"max_bytes,omitempty" yaml:"max_bytes"` + InactiveThreshold time.Duration `json:"inactive_threshold,omitempty" yaml:"inactive_threshold"` + Replicas int `json:"num_replicas" yaml:"num_replicas"` + MemoryStorage bool `json:"mem_storage,omitempty" yaml:"mem_storage"` // Metadata is additional metadata for the Consumer. - Metadata map[string]string `json:"metadata,omitempty"` + Metadata map[string]string `json:"metadata,omitempty" yaml:"metadata"` // PauseUntil is for suspending the consumer until the deadline. - PauseUntil time.Time `json:"pause_until,omitempty"` + PauseUntil time.Time `json:"pause_until,omitempty" yaml:"pause_until"` // Priority groups - PriorityGroups []string `json:"priority_groups,omitempty"` - PriorityPolicy PriorityPolicy `json:"priority_policy,omitempty"` - PinnedTTL time.Duration `json:"priority_timeout,omitempty"` + PriorityGroups []string `json:"priority_groups,omitempty" yaml:"priority_groups"` + PriorityPolicy PriorityPolicy `json:"priority_policy,omitempty" yaml:"priority_policy"` + PinnedTTL time.Duration `json:"priority_timeout,omitempty" yaml:"priority_timeout"` // Don't add to general clients. - Direct bool `json:"direct,omitempty"` + Direct bool `json:"direct,omitempty" yaml:"direct"` } // SequenceInfo is the consumer and stream sequence that uniquely identify a message diff --git a/schema_source/jetstream/api/v1/definitions.json b/schema_source/jetstream/api/v1/definitions.json index b7f83aca..92b64548 100644 --- a/schema_source/jetstream/api/v1/definitions.json +++ b/schema_source/jetstream/api/v1/definitions.json @@ -708,11 +708,24 @@ "type": "string", "maxLength": 4096 }, + "deliver_policy": { + "description": "The point in the stream from which to receive messages", + "type": "string", + "enum": ["all", "last", "new", "by_start_sequence", "by_start_time", "last_per_subject"], + "default": "all" + }, "deliver_subject": { + "description": "The subject to delivery messages to", + "type": "string", + "minLength": 1 + }, + "deliver_group": { + "description": "The queue group name used to distribute messages among subscribers", "type": "string", "minLength": 1 }, "ack_policy": { + "description": "The requirement of client acknowledgments", "type": "string", "enum": ["none", "all", "explicit"], "default": "none" @@ -741,11 +754,13 @@ } }, "replay_policy": { + "description": "The rate at which messages will be pushed to a client", "type": "string", "enum": ["instant", "original"], "default": "instant" }, "sample_freq": { + "description": "Sets the percentage of acknowledgments that should be sampled for observability", "type": "string" }, "rate_limit_bps": { @@ -841,11 +856,20 @@ } }, "priority_policy": { - "description": "The policy the consumer is set to", + "description": "The priority policy the consumer is set to", "$ref": "#/definitions/priority_policy" }, - "pinned_ttl": { + "priority_timeout": { "description": "For pinned_client priority policy how long before the client times out" + }, + "opt_start_seq": { + "description": "Start sequence used with the DeliverByStartSequence deliver policy.", + "$ref": "#/definitions/golang_uint64", + "minimum": 0 + }, + "opt_start_time": { + "description": "Start time used with the DeliverByStartSequence deliver policy", + "$ref": "#/definitions/golang_time" } } }, diff --git a/schemas/jetstream/api/v1/consumer_configuration.json b/schemas/jetstream/api/v1/consumer_configuration.json index c5f1540f..990e6e48 100644 --- a/schemas/jetstream/api/v1/consumer_configuration.json +++ b/schemas/jetstream/api/v1/consumer_configuration.json @@ -125,11 +125,31 @@ "type": "string", "maxLength": 4096 }, + "deliver_policy": { + "description": "The point in the stream from which to receive messages", + "type": "string", + "enum": [ + "all", + "last", + "new", + "by_start_sequence", + "by_start_time", + "last_per_subject" + ], + "default": "all" + }, "deliver_subject": { + "description": "The subject to delivery messages to", + "type": "string", + "minLength": 1 + }, + "deliver_group": { + "description": "The queue group name used to distribute messages among subscribers", "type": "string", "minLength": 1 }, "ack_policy": { + "description": "The requirement of client acknowledgments", "type": "string", "enum": [ "none", @@ -167,6 +187,7 @@ } }, "replay_policy": { + "description": "The rate at which messages will be pushed to a client", "type": "string", "enum": [ "instant", @@ -175,6 +196,7 @@ "default": "instant" }, "sample_freq": { + "description": "Sets the percentage of acknowledgments that should be sampled for observability", "type": "string" }, "rate_limit_bps": { @@ -302,8 +324,21 @@ "pinned_client" ] }, - "pinned_ttl": { + "priority_timeout": { "description": "For pinned_client priority policy how long before the client times out" + }, + "opt_start_seq": { + "description": "Start sequence used with the DeliverByStartSequence deliver policy.", + "minimum": 0, + "$comment": "unsigned 64 bit integer", + "type": "integer", + "maximum": 18446744073709551615 + }, + "opt_start_time": { + "description": "Start time used with the DeliverByStartSequence deliver policy", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" } } } diff --git a/schemas/jetstream/api/v1/consumer_create_request.json b/schemas/jetstream/api/v1/consumer_create_request.json index 97b77c3a..e3d73b00 100644 --- a/schemas/jetstream/api/v1/consumer_create_request.json +++ b/schemas/jetstream/api/v1/consumer_create_request.json @@ -137,11 +137,31 @@ "type": "string", "maxLength": 4096 }, + "deliver_policy": { + "description": "The point in the stream from which to receive messages", + "type": "string", + "enum": [ + "all", + "last", + "new", + "by_start_sequence", + "by_start_time", + "last_per_subject" + ], + "default": "all" + }, "deliver_subject": { + "description": "The subject to delivery messages to", + "type": "string", + "minLength": 1 + }, + "deliver_group": { + "description": "The queue group name used to distribute messages among subscribers", "type": "string", "minLength": 1 }, "ack_policy": { + "description": "The requirement of client acknowledgments", "type": "string", "enum": [ "none", @@ -179,6 +199,7 @@ } }, "replay_policy": { + "description": "The rate at which messages will be pushed to a client", "type": "string", "enum": [ "instant", @@ -187,6 +208,7 @@ "default": "instant" }, "sample_freq": { + "description": "Sets the percentage of acknowledgments that should be sampled for observability", "type": "string" }, "rate_limit_bps": { @@ -314,8 +336,21 @@ "pinned_client" ] }, - "pinned_ttl": { + "priority_timeout": { "description": "For pinned_client priority policy how long before the client times out" + }, + "opt_start_seq": { + "description": "Start sequence used with the DeliverByStartSequence deliver policy.", + "minimum": 0, + "$comment": "unsigned 64 bit integer", + "type": "integer", + "maximum": 18446744073709551615 + }, + "opt_start_time": { + "description": "Start time used with the DeliverByStartSequence deliver policy", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" } } }, diff --git a/schemas/jetstream/api/v1/consumer_create_response.json b/schemas/jetstream/api/v1/consumer_create_response.json index 86aaa243..272dd74c 100644 --- a/schemas/jetstream/api/v1/consumer_create_response.json +++ b/schemas/jetstream/api/v1/consumer_create_response.json @@ -159,11 +159,31 @@ "type": "string", "maxLength": 4096 }, + "deliver_policy": { + "description": "The point in the stream from which to receive messages", + "type": "string", + "enum": [ + "all", + "last", + "new", + "by_start_sequence", + "by_start_time", + "last_per_subject" + ], + "default": "all" + }, "deliver_subject": { + "description": "The subject to delivery messages to", + "type": "string", + "minLength": 1 + }, + "deliver_group": { + "description": "The queue group name used to distribute messages among subscribers", "type": "string", "minLength": 1 }, "ack_policy": { + "description": "The requirement of client acknowledgments", "type": "string", "enum": [ "none", @@ -201,6 +221,7 @@ } }, "replay_policy": { + "description": "The rate at which messages will be pushed to a client", "type": "string", "enum": [ "instant", @@ -209,6 +230,7 @@ "default": "instant" }, "sample_freq": { + "description": "Sets the percentage of acknowledgments that should be sampled for observability", "type": "string" }, "rate_limit_bps": { @@ -336,8 +358,21 @@ "pinned_client" ] }, - "pinned_ttl": { + "priority_timeout": { "description": "For pinned_client priority policy how long before the client times out" + }, + "opt_start_seq": { + "description": "Start sequence used with the DeliverByStartSequence deliver policy.", + "minimum": 0, + "$comment": "unsigned 64 bit integer", + "type": "integer", + "maximum": 18446744073709551615 + }, + "opt_start_time": { + "description": "Start time used with the DeliverByStartSequence deliver policy", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" } } }, diff --git a/schemas/jetstream/api/v1/consumer_info_response.json b/schemas/jetstream/api/v1/consumer_info_response.json index 6fb453dd..9d9efdd6 100644 --- a/schemas/jetstream/api/v1/consumer_info_response.json +++ b/schemas/jetstream/api/v1/consumer_info_response.json @@ -159,11 +159,31 @@ "type": "string", "maxLength": 4096 }, + "deliver_policy": { + "description": "The point in the stream from which to receive messages", + "type": "string", + "enum": [ + "all", + "last", + "new", + "by_start_sequence", + "by_start_time", + "last_per_subject" + ], + "default": "all" + }, "deliver_subject": { + "description": "The subject to delivery messages to", + "type": "string", + "minLength": 1 + }, + "deliver_group": { + "description": "The queue group name used to distribute messages among subscribers", "type": "string", "minLength": 1 }, "ack_policy": { + "description": "The requirement of client acknowledgments", "type": "string", "enum": [ "none", @@ -201,6 +221,7 @@ } }, "replay_policy": { + "description": "The rate at which messages will be pushed to a client", "type": "string", "enum": [ "instant", @@ -209,6 +230,7 @@ "default": "instant" }, "sample_freq": { + "description": "Sets the percentage of acknowledgments that should be sampled for observability", "type": "string" }, "rate_limit_bps": { @@ -336,8 +358,21 @@ "pinned_client" ] }, - "pinned_ttl": { + "priority_timeout": { "description": "For pinned_client priority policy how long before the client times out" + }, + "opt_start_seq": { + "description": "Start sequence used with the DeliverByStartSequence deliver policy.", + "minimum": 0, + "$comment": "unsigned 64 bit integer", + "type": "integer", + "maximum": 18446744073709551615 + }, + "opt_start_time": { + "description": "Start time used with the DeliverByStartSequence deliver policy", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" } } }, diff --git a/schemas/jetstream/api/v1/consumer_list_response.json b/schemas/jetstream/api/v1/consumer_list_response.json index 20e9d356..41b3c8f1 100644 --- a/schemas/jetstream/api/v1/consumer_list_response.json +++ b/schemas/jetstream/api/v1/consumer_list_response.json @@ -224,11 +224,31 @@ "type": "string", "maxLength": 4096 }, + "deliver_policy": { + "description": "The point in the stream from which to receive messages", + "type": "string", + "enum": [ + "all", + "last", + "new", + "by_start_sequence", + "by_start_time", + "last_per_subject" + ], + "default": "all" + }, "deliver_subject": { + "description": "The subject to delivery messages to", + "type": "string", + "minLength": 1 + }, + "deliver_group": { + "description": "The queue group name used to distribute messages among subscribers", "type": "string", "minLength": 1 }, "ack_policy": { + "description": "The requirement of client acknowledgments", "type": "string", "enum": [ "none", @@ -266,6 +286,7 @@ } }, "replay_policy": { + "description": "The rate at which messages will be pushed to a client", "type": "string", "enum": [ "instant", @@ -274,6 +295,7 @@ "default": "instant" }, "sample_freq": { + "description": "Sets the percentage of acknowledgments that should be sampled for observability", "type": "string" }, "rate_limit_bps": { @@ -401,8 +423,21 @@ "pinned_client" ] }, - "pinned_ttl": { + "priority_timeout": { "description": "For pinned_client priority policy how long before the client times out" + }, + "opt_start_seq": { + "description": "Start sequence used with the DeliverByStartSequence deliver policy.", + "minimum": 0, + "$comment": "unsigned 64 bit integer", + "type": "integer", + "maximum": 18446744073709551615 + }, + "opt_start_time": { + "description": "Start time used with the DeliverByStartSequence deliver policy", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" } } },