Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(#1145) Fix consumer interactive edit #616

Merged
merged 1 commit into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 156 additions & 35 deletions api/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"encoding/json"
"fmt"
"time"

"gopkg.in/yaml.v3"
)

// also update wellKnownSubjectSchemas
Expand Down Expand Up @@ -203,6 +205,33 @@ func (p AckPolicy) String() string {
}
}

func (p *AckPolicy) UnmarshalYAML(data *yaml.Node) error {
switch data.Value {
case "none":
*p = AckNone
case "all":
*p = AckAll
case "explicit":
*p = AckExplicit
default:
return fmt.Errorf("can not unmarshal: %v", data.Value)
}
return nil
}

func (p AckPolicy) MarshalYAML() (any, error) {
switch p {
case AckNone:
return "none", nil
case AckAll:
return "all", nil
case AckExplicit:
return "explicit", nil
default:
return nil, fmt.Errorf("unknown acknowlegement policy: %v", p)
}
}

func (p *AckPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString("none"):
Expand Down Expand Up @@ -249,6 +278,29 @@ func (p ReplayPolicy) String() string {
}
}

func (p *ReplayPolicy) UnmarshalYAML(data *yaml.Node) error {
switch data.Value {
case "instant":
*p = ReplayInstant
case "original":
*p = ReplayOriginal
default:
return fmt.Errorf("can not unmarshal %v", data.Value)
}
return nil
}

func (p ReplayPolicy) MarshalYAML() (any, error) {
switch p {
case ReplayInstant:
return "original", nil
case ReplayOriginal:
return "instant", nil
default:
return nil, fmt.Errorf("unknown replay policy: %v", p)
}
}

func (p *ReplayPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString("instant"):
Expand Down Expand Up @@ -311,6 +363,46 @@ func (p DeliverPolicy) String() string {
}
}

func (p *DeliverPolicy) UnmarshalYAML(data *yaml.Node) error {
switch data.Value {
case "all", "undefined":
*p = DeliverAll
case "last":
*p = DeliverLast
case "new":
*p = DeliverNew
case "by_start_sequence":
*p = DeliverByStartSequence
case "by_start_time":
*p = DeliverByStartTime
case "last_per_subject":
*p = DeliverLastPerSubject
default:
return fmt.Errorf("can not unmarshal %q", data.Value)
}

return nil
}

func (p DeliverPolicy) MarshalYAML() (any, error) {
switch p {
case DeliverAll:
return "all", nil
case DeliverLast:
return "last", nil
case DeliverNew:
return "new", nil
case DeliverByStartSequence:
return "by_start_sequence", nil
case DeliverByStartTime:
return "by_start_time", nil
case DeliverLastPerSubject:
return "last_per_subject", nil
default:
return nil, fmt.Errorf("unknown deliver policy %v", p)
}
}

func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString("all"), jsonString("undefined"):
Expand All @@ -325,6 +417,8 @@ func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
*p = DeliverByStartTime
case jsonString("last_per_subject"):
*p = DeliverLastPerSubject
default:
return fmt.Errorf("can not unmarshal %q", data)
}

return nil
Expand Down Expand Up @@ -369,6 +463,33 @@ func (p PriorityPolicy) String() string {
}
}

func (p *PriorityPolicy) UnmarshalYAML(data *yaml.Node) error {
switch data.Value {
case "none":
*p = PriorityNone
case "overflow":
*p = PriorityOverflow
case "pinned_client":
*p = PriorityPinnedClient
default:
return fmt.Errorf("cannot unmarshal %v", data.Value)
}
return nil
}

func (p PriorityPolicy) MarshalYAML() (any, error) {
switch p {
case PriorityNone:
return "none", nil
case PriorityOverflow:
return "overflow", nil
case PriorityPinnedClient:
return "pinned_client", nil
default:
return nil, fmt.Errorf("unknown priority policy: %v", p)
}
}

func (p *PriorityPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString("none"):
Expand All @@ -378,7 +499,7 @@ func (p *PriorityPolicy) UnmarshalJSON(data []byte) error {
case jsonString("pinned_client"):
*p = PriorityPinnedClient
default:
return fmt.Errorf("unknown priority policy: %v", string(data))
return fmt.Errorf("cannot unmarshal %v", string(data))
}
return nil
}
Expand All @@ -400,47 +521,47 @@ func (p PriorityPolicy) MarshalJSON() ([]byte, error) {
//
// NATS Schema Type io.nats.jetstream.api.v1.consumer_configuration
type ConsumerConfig struct {
Description string `json:"description,omitempty"`
AckPolicy AckPolicy `json:"ack_policy"`
AckWait time.Duration `json:"ack_wait,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
DeliverSubject string `json:"deliver_subject,omitempty"`
DeliverGroup string `json:"deliver_group,omitempty"`
Durable string `json:"durable_name,omitempty"` // Durable is deprecated. All consumers will have names. picked by clients.
Name string `json:"name,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
FilterSubjects []string `json:"filter_subjects,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
MaxDeliver int `json:"max_deliver,omitempty"`
BackOff []time.Duration `json:"backoff,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
RateLimit uint64 `json:"rate_limit_bps,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
SampleFrequency string `json:"sample_freq,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`
MaxRequestBatch int `json:"max_batch,omitempty"`
MaxRequestExpires time.Duration `json:"max_expires,omitempty"`
MaxRequestMaxBytes int `json:"max_bytes,omitempty"`
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
Replicas int `json:"num_replicas"`
MemoryStorage bool `json:"mem_storage,omitempty"`
Description string `json:"description,omitempty" yaml:"description"`
AckPolicy AckPolicy `json:"ack_policy" yaml:"ack_policy"`
AckWait time.Duration `json:"ack_wait,omitempty" yaml:"ack_wait"`
DeliverPolicy DeliverPolicy `json:"deliver_policy" yaml:"deliver_policy"`
DeliverSubject string `json:"deliver_subject,omitempty" yaml:"deliver_subject"`
DeliverGroup string `json:"deliver_group,omitempty" yaml:"deliver_group"`
Durable string `json:"durable_name,omitempty" yaml:"durable_name"` // Durable is deprecated. All consumers will have names. picked by clients.
Name string `json:"name,omitempty" yaml:"name"`
FilterSubject string `json:"filter_subject,omitempty" yaml:"filter_subject"`
FilterSubjects []string `json:"filter_subjects,omitempty" yaml:"filter_subjects"`
FlowControl bool `json:"flow_control,omitempty" yaml:"flow_control"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty" yaml:"idle_heartbeat"`
MaxAckPending int `json:"max_ack_pending,omitempty" yaml:"max_ack_pending"`
MaxDeliver int `json:"max_deliver,omitempty" yaml:"max_deliver"`
BackOff []time.Duration `json:"backoff,omitempty" yaml:"backoff"`
MaxWaiting int `json:"max_waiting,omitempty" yaml:"max_waiting"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty" yaml:"opt_start_seq"`
OptStartTime *time.Time `json:"opt_start_time,omitempty" yaml:"opt_start_time"`
RateLimit uint64 `json:"rate_limit_bps,omitempty" yaml:"rate_limit_bps"`
ReplayPolicy ReplayPolicy `json:"replay_policy" yaml:"replay_policy"`
SampleFrequency string `json:"sample_freq,omitempty" yaml:"sample_freq"`
HeadersOnly bool `json:"headers_only,omitempty" yaml:"headers_only"`
MaxRequestBatch int `json:"max_batch,omitempty" yaml:"max_batch"`
MaxRequestExpires time.Duration `json:"max_expires,omitempty" yaml:"max_expires"`
MaxRequestMaxBytes int `json:"max_bytes,omitempty" yaml:"max_bytes"`
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty" yaml:"inactive_threshold"`
Replicas int `json:"num_replicas" yaml:"num_replicas"`
MemoryStorage bool `json:"mem_storage,omitempty" yaml:"mem_storage"`
// Metadata is additional metadata for the Consumer.
Metadata map[string]string `json:"metadata,omitempty"`
Metadata map[string]string `json:"metadata,omitempty" yaml:"metadata"`

// PauseUntil is for suspending the consumer until the deadline.
PauseUntil time.Time `json:"pause_until,omitempty"`
PauseUntil time.Time `json:"pause_until,omitempty" yaml:"pause_until"`

// Priority groups
PriorityGroups []string `json:"priority_groups,omitempty"`
PriorityPolicy PriorityPolicy `json:"priority_policy,omitempty"`
PinnedTTL time.Duration `json:"priority_timeout,omitempty"`
PriorityGroups []string `json:"priority_groups,omitempty" yaml:"priority_groups"`
PriorityPolicy PriorityPolicy `json:"priority_policy,omitempty" yaml:"priority_policy"`
PinnedTTL time.Duration `json:"priority_timeout,omitempty" yaml:"priority_timeout"`

// Don't add to general clients.
Direct bool `json:"direct,omitempty"`
Direct bool `json:"direct,omitempty" yaml:"direct"`
}

// SequenceInfo is the consumer and stream sequence that uniquely identify a message
Expand Down
28 changes: 26 additions & 2 deletions schema_source/jetstream/api/v1/definitions.json
Original file line number Diff line number Diff line change
Expand Up @@ -708,11 +708,24 @@
"type": "string",
"maxLength": 4096
},
"deliver_policy": {
"description": "The point in the stream from which to receive messages",
"type": "string",
"enum": ["all", "last", "new", "by_start_sequence", "by_start_time", "last_per_subject"],
"default": "all"
},
"deliver_subject": {
"description": "The subject to delivery messages to",
"type": "string",
"minLength": 1
},
"deliver_group": {
"description": "The queue group name used to distribute messages among subscribers",
"type": "string",
"minLength": 1
},
"ack_policy": {
"description": "The requirement of client acknowledgments",
"type": "string",
"enum": ["none", "all", "explicit"],
"default": "none"
Expand Down Expand Up @@ -741,11 +754,13 @@
}
},
"replay_policy": {
"description": "The rate at which messages will be pushed to a client",
"type": "string",
"enum": ["instant", "original"],
"default": "instant"
},
"sample_freq": {
"description": "Sets the percentage of acknowledgments that should be sampled for observability",
"type": "string"
},
"rate_limit_bps": {
Expand Down Expand Up @@ -841,11 +856,20 @@
}
},
"priority_policy": {
"description": "The policy the consumer is set to",
"description": "The priority policy the consumer is set to",
"$ref": "#/definitions/priority_policy"
},
"pinned_ttl": {
"priority_timeout": {
"description": "For pinned_client priority policy how long before the client times out"
},
"opt_start_seq": {
"description": "Start sequence used with the DeliverByStartSequence deliver policy.",
"$ref": "#/definitions/golang_uint64",
"minimum": 0
},
"opt_start_time": {
"description": "Start time used with the DeliverByStartSequence deliver policy",
"$ref": "#/definitions/golang_time"
}
}
},
Expand Down
37 changes: 36 additions & 1 deletion schemas/jetstream/api/v1/consumer_configuration.json
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,31 @@
"type": "string",
"maxLength": 4096
},
"deliver_policy": {
"description": "The point in the stream from which to receive messages",
"type": "string",
"enum": [
"all",
"last",
"new",
"by_start_sequence",
"by_start_time",
"last_per_subject"
],
"default": "all"
},
"deliver_subject": {
"description": "The subject to delivery messages to",
"type": "string",
"minLength": 1
},
"deliver_group": {
"description": "The queue group name used to distribute messages among subscribers",
"type": "string",
"minLength": 1
},
"ack_policy": {
"description": "The requirement of client acknowledgments",
"type": "string",
"enum": [
"none",
Expand Down Expand Up @@ -167,6 +187,7 @@
}
},
"replay_policy": {
"description": "The rate at which messages will be pushed to a client",
"type": "string",
"enum": [
"instant",
Expand All @@ -175,6 +196,7 @@
"default": "instant"
},
"sample_freq": {
"description": "Sets the percentage of acknowledgments that should be sampled for observability",
"type": "string"
},
"rate_limit_bps": {
Expand Down Expand Up @@ -302,8 +324,21 @@
"pinned_client"
]
},
"pinned_ttl": {
"priority_timeout": {
"description": "For pinned_client priority policy how long before the client times out"
},
"opt_start_seq": {
"description": "Start sequence used with the DeliverByStartSequence deliver policy.",
"minimum": 0,
"$comment": "unsigned 64 bit integer",
"type": "integer",
"maximum": 18446744073709551615
},
"opt_start_time": {
"description": "Start time used with the DeliverByStartSequence deliver policy",
"$comment": "A point in time in RFC3339 format including timezone, though typically in UTC",
"type": "string",
"format": "date-time"
}
}
}
Loading
Loading