diff --git a/api/streams.go b/api/streams.go index 845a790f..a841f9f5 100644 --- a/api/streams.go +++ b/api/streams.go @@ -587,10 +587,11 @@ type StreamConsumerLimits struct { MaxAckPending int `json:"max_ack_pending,omitempty" yaml:"max_ack_pending"` } -// Placement describes stream placement requirements for a stream +// Placement describes stream placement requirements for a stream or leader type Placement struct { - Cluster string `json:"cluster,omitempty" yaml:"cluster"` - Tags []string `json:"tags,omitempty" yaml:"tags"` + Cluster string `json:"cluster,omitempty" yaml:"cluster"` + Tags []string `json:"tags,omitempty" yaml:"tags"` + Preferred string `json:"preferred,omitempty" yaml:"preferred"` } // StreamSourceInfo shows information about an upstream stream source. diff --git a/go.mod b/go.mod index 2228c192..abce464b 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/jedib0t/go-pretty/v6 v6.6.5 github.com/klauspost/compress v1.17.11 github.com/nats-io/jwt/v2 v2.7.3 - github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241216161640-d32daa29debe + github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241219180704-470a7acda496 github.com/nats-io/nats.go v1.37.0 github.com/nats-io/nkeys v0.4.9 github.com/nats-io/nuid v1.0.1 @@ -24,7 +24,6 @@ require ( require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/goccy/go-json v0.10.4 // indirect github.com/google/go-tpm v0.9.2 // indirect github.com/kr/text v0.2.0 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect @@ -34,7 +33,6 @@ require ( github.com/prometheus/procfs v0.15.1 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect - go.uber.org/automaxprocs v1.6.0 // indirect golang.org/x/crypto v0.31.0 // indirect golang.org/x/sys v0.28.0 // indirect golang.org/x/time v0.8.0 // indirect diff --git a/go.sum b/go.sum index ca4086a2..b771d58b 100644 --- a/go.sum +++ b/go.sum @@ -9,8 +9,6 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/expr-lang/expr v1.16.9 h1:WUAzmR0JNI9JCiF0/ewwHB1gmcGw5wW7nWt8gc6PpCI= github.com/expr-lang/expr v1.16.9/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= -github.com/goccy/go-json v0.10.4 h1:JSwxQzIqKfmFX1swYPpUThQZp/Ka4wzJdK0LWVytLPM= -github.com/goccy/go-json v0.10.4/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-tpm v0.9.2 h1:Gh8CMnMm06b09DmcsuY9fI3oF69188lGXCpiT/a05T4= @@ -31,10 +29,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= -github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241127165413-cfaad68e19db h1:XSsKLcdTjNwRhhiPS2G193zgh7yCHjT5IRzRyTuI/Y0= -github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241127165413-cfaad68e19db/go.mod h1:nI8h87Ryi/zcaQcDLSSVbMfomhyck+0oDqxR7NoZX0Y= -github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241216161640-d32daa29debe h1:IrqVL1wnRhZ06wJGHMIzBcjr2gzMlRsY1OuDWIXBQwY= -github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241216161640-d32daa29debe/go.mod h1:Ky5GDRMam5300yCf1N+pD0apW7TqfVvTNgYEvbXtnE8= +github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241219180704-470a7acda496 h1:NT7IPFPqkHccMqQWFoCOATlBLR+POK1sqFx4oOX3bAY= +github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241219180704-470a7acda496/go.mod h1:Ky5GDRMam5300yCf1N+pD0apW7TqfVvTNgYEvbXtnE8= github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= @@ -58,8 +54,6 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= -go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20241215155358-4a5509556b9e h1:4qufH0hlUYs6AO6XmZC3GqfDPGSXHVXUFR6OND+iJX4= diff --git a/schema_source/jetstream/api/v1/definitions.json b/schema_source/jetstream/api/v1/definitions.json index 7f7beaac..2c2c004f 100644 --- a/schema_source/jetstream/api/v1/definitions.json +++ b/schema_source/jetstream/api/v1/definitions.json @@ -164,18 +164,22 @@ }, "placement": { "type": "object", - "description": "Placement requirements for a stream", + "description": "Placement requirements for a Stream or asset leader", "properties": { "cluster": { "type": "string", - "description": "The desired cluster name to place the stream" + "description": "The desired cluster name" }, "tags": { - "description": "Tags required on servers hosting this stream", + "description": "Tags required on servers hosting this stream or leader", "type": "array", "items": { "type": "string" } + }, + "preferred": { + "description": "A preferred server name to move the leader to", + "type": "string" } } }, diff --git a/schemas/jetstream/api/v1/consumer_create_response.json b/schemas/jetstream/api/v1/consumer_create_response.json index 86f8219d..0d149318 100644 --- a/schemas/jetstream/api/v1/consumer_create_response.json +++ b/schemas/jetstream/api/v1/consumer_create_response.json @@ -511,11 +511,32 @@ "type": "string", "format": "date-time" }, - "pinned_ids": { - "description": "The IDs pinned by each consumer priority group", - "type": "object", - "additionalProperties": { - "type": "string" + "priority_groups": { + "description": "The state of Priority Groups", + "type": "array", + "items": { + "required": [ + "group" + ], + "type": "object", + "description": "Status of a specific consumer priority group", + "properties": { + "group": { + "type": "string", + "description": "The group this status is for", + "minLength": 1 + }, + "pinned_client_id": { + "type": "string", + "description": "The generated ID of the pinned client" + }, + "pinned_ts": { + "description": "The timestamp when the client was pinned", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" + } + } } } } diff --git a/schemas/jetstream/api/v1/consumer_info_response.json b/schemas/jetstream/api/v1/consumer_info_response.json index 87139761..58614449 100644 --- a/schemas/jetstream/api/v1/consumer_info_response.json +++ b/schemas/jetstream/api/v1/consumer_info_response.json @@ -511,11 +511,32 @@ "type": "string", "format": "date-time" }, - "pinned_ids": { - "description": "The IDs pinned by each consumer priority group", - "type": "object", - "additionalProperties": { - "type": "string" + "priority_groups": { + "description": "The state of Priority Groups", + "type": "array", + "items": { + "required": [ + "group" + ], + "type": "object", + "description": "Status of a specific consumer priority group", + "properties": { + "group": { + "type": "string", + "description": "The group this status is for", + "minLength": 1 + }, + "pinned_client_id": { + "type": "string", + "description": "The generated ID of the pinned client" + }, + "pinned_ts": { + "description": "The timestamp when the client was pinned", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" + } + } } } } diff --git a/schemas/jetstream/api/v1/consumer_list_response.json b/schemas/jetstream/api/v1/consumer_list_response.json index e0aeef52..4f48788d 100644 --- a/schemas/jetstream/api/v1/consumer_list_response.json +++ b/schemas/jetstream/api/v1/consumer_list_response.json @@ -576,11 +576,32 @@ "type": "string", "format": "date-time" }, - "pinned_ids": { - "description": "The IDs pinned by each consumer priority group", - "type": "object", - "additionalProperties": { - "type": "string" + "priority_groups": { + "description": "The state of Priority Groups", + "type": "array", + "items": { + "required": [ + "group" + ], + "type": "object", + "description": "Status of a specific consumer priority group", + "properties": { + "group": { + "type": "string", + "description": "The group this status is for", + "minLength": 1 + }, + "pinned_client_id": { + "type": "string", + "description": "The generated ID of the pinned client" + }, + "pinned_ts": { + "description": "The timestamp when the client was pinned", + "$comment": "A point in time in RFC3339 format including timezone, though typically in UTC", + "type": "string", + "format": "date-time" + } + } } } } diff --git a/schemas/jetstream/api/v1/meta_leader_stepdown_request.json b/schemas/jetstream/api/v1/meta_leader_stepdown_request.json index ac7c8f6b..09357892 100644 --- a/schemas/jetstream/api/v1/meta_leader_stepdown_request.json +++ b/schemas/jetstream/api/v1/meta_leader_stepdown_request.json @@ -7,18 +7,22 @@ "properties": { "placement": { "type": "object", - "description": "Placement requirements for a stream", + "description": "Placement requirements for a Stream or asset leader", "properties": { "cluster": { "type": "string", - "description": "The desired cluster name to place the stream" + "description": "The desired cluster name" }, "tags": { - "description": "Tags required on servers hosting this stream", + "description": "Tags required on servers hosting this stream or leader", "type": "array", "items": { "type": "string" } + }, + "preferred": { + "description": "A preferred server name to move the leader to", + "type": "string" } } } diff --git a/schemas/jetstream/api/v1/stream_configuration.json b/schemas/jetstream/api/v1/stream_configuration.json index 80221886..19b6c027 100644 --- a/schemas/jetstream/api/v1/stream_configuration.json +++ b/schemas/jetstream/api/v1/stream_configuration.json @@ -175,14 +175,18 @@ "properties": { "cluster": { "type": "string", - "description": "The desired cluster name to place the stream" + "description": "The desired cluster name" }, "tags": { - "description": "Tags required on servers hosting this stream", + "description": "Tags required on servers hosting this stream or leader", "type": "array", "items": { "type": "string" } + }, + "preferred": { + "description": "A preferred server name to move the leader to", + "type": "string" } } }, diff --git a/schemas/jetstream/api/v1/stream_create_request.json b/schemas/jetstream/api/v1/stream_create_request.json index 2f1a51d4..9237b0e9 100644 --- a/schemas/jetstream/api/v1/stream_create_request.json +++ b/schemas/jetstream/api/v1/stream_create_request.json @@ -178,14 +178,18 @@ "properties": { "cluster": { "type": "string", - "description": "The desired cluster name to place the stream" + "description": "The desired cluster name" }, "tags": { - "description": "Tags required on servers hosting this stream", + "description": "Tags required on servers hosting this stream or leader", "type": "array", "items": { "type": "string" } + }, + "preferred": { + "description": "A preferred server name to move the leader to", + "type": "string" } } }, diff --git a/schemas/jetstream/api/v1/stream_create_response.json b/schemas/jetstream/api/v1/stream_create_response.json index d5c206d2..b5c74bba 100644 --- a/schemas/jetstream/api/v1/stream_create_response.json +++ b/schemas/jetstream/api/v1/stream_create_response.json @@ -193,14 +193,18 @@ "properties": { "cluster": { "type": "string", - "description": "The desired cluster name to place the stream" + "description": "The desired cluster name" }, "tags": { - "description": "Tags required on servers hosting this stream", + "description": "Tags required on servers hosting this stream or leader", "type": "array", "items": { "type": "string" } + }, + "preferred": { + "description": "A preferred server name to move the leader to", + "type": "string" } } }, diff --git a/schemas/jetstream/api/v1/stream_info_response.json b/schemas/jetstream/api/v1/stream_info_response.json index e41fae26..66f8ef58 100644 --- a/schemas/jetstream/api/v1/stream_info_response.json +++ b/schemas/jetstream/api/v1/stream_info_response.json @@ -193,14 +193,18 @@ "properties": { "cluster": { "type": "string", - "description": "The desired cluster name to place the stream" + "description": "The desired cluster name" }, "tags": { - "description": "Tags required on servers hosting this stream", + "description": "Tags required on servers hosting this stream or leader", "type": "array", "items": { "type": "string" } + }, + "preferred": { + "description": "A preferred server name to move the leader to", + "type": "string" } } }, diff --git a/schemas/jetstream/api/v1/stream_list_response.json b/schemas/jetstream/api/v1/stream_list_response.json index 99d9a184..cda413d6 100644 --- a/schemas/jetstream/api/v1/stream_list_response.json +++ b/schemas/jetstream/api/v1/stream_list_response.json @@ -258,14 +258,18 @@ "properties": { "cluster": { "type": "string", - "description": "The desired cluster name to place the stream" + "description": "The desired cluster name" }, "tags": { - "description": "Tags required on servers hosting this stream", + "description": "Tags required on servers hosting this stream or leader", "type": "array", "items": { "type": "string" } + }, + "preferred": { + "description": "A preferred server name to move the leader to", + "type": "string" } } }, diff --git a/schemas/jetstream/api/v1/stream_restore_request.json b/schemas/jetstream/api/v1/stream_restore_request.json index 5f09ac5c..c9afebbe 100644 --- a/schemas/jetstream/api/v1/stream_restore_request.json +++ b/schemas/jetstream/api/v1/stream_restore_request.json @@ -182,14 +182,18 @@ "properties": { "cluster": { "type": "string", - "description": "The desired cluster name to place the stream" + "description": "The desired cluster name" }, "tags": { - "description": "Tags required on servers hosting this stream", + "description": "Tags required on servers hosting this stream or leader", "type": "array", "items": { "type": "string" } + }, + "preferred": { + "description": "A preferred server name to move the leader to", + "type": "string" } } }, diff --git a/schemas/jetstream/api/v1/stream_snapshot_response.json b/schemas/jetstream/api/v1/stream_snapshot_response.json index 068d12fb..71236342 100644 --- a/schemas/jetstream/api/v1/stream_snapshot_response.json +++ b/schemas/jetstream/api/v1/stream_snapshot_response.json @@ -217,14 +217,18 @@ "properties": { "cluster": { "type": "string", - "description": "The desired cluster name to place the stream" + "description": "The desired cluster name" }, "tags": { - "description": "Tags required on servers hosting this stream", + "description": "Tags required on servers hosting this stream or leader", "type": "array", "items": { "type": "string" } + }, + "preferred": { + "description": "A preferred server name to move the leader to", + "type": "string" } } }, diff --git a/schemas/jetstream/api/v1/stream_template_configuration.json b/schemas/jetstream/api/v1/stream_template_configuration.json index d7aa5aaf..d1a1488d 100644 --- a/schemas/jetstream/api/v1/stream_template_configuration.json +++ b/schemas/jetstream/api/v1/stream_template_configuration.json @@ -198,14 +198,18 @@ "properties": { "cluster": { "type": "string", - "description": "The desired cluster name to place the stream" + "description": "The desired cluster name" }, "tags": { - "description": "Tags required on servers hosting this stream", + "description": "Tags required on servers hosting this stream or leader", "type": "array", "items": { "type": "string" } + }, + "preferred": { + "description": "A preferred server name to move the leader to", + "type": "string" } } }, diff --git a/schemas/jetstream/api/v1/stream_template_create_request.json b/schemas/jetstream/api/v1/stream_template_create_request.json index 389785ca..cadc40b7 100644 --- a/schemas/jetstream/api/v1/stream_template_create_request.json +++ b/schemas/jetstream/api/v1/stream_template_create_request.json @@ -192,14 +192,18 @@ "properties": { "cluster": { "type": "string", - "description": "The desired cluster name to place the stream" + "description": "The desired cluster name" }, "tags": { - "description": "Tags required on servers hosting this stream", + "description": "Tags required on servers hosting this stream or leader", "type": "array", "items": { "type": "string" } + }, + "preferred": { + "description": "A preferred server name to move the leader to", + "type": "string" } } }, diff --git a/schemas/jetstream/api/v1/stream_template_create_response.json b/schemas/jetstream/api/v1/stream_template_create_response.json index 7c7e6331..85a5bced 100644 --- a/schemas/jetstream/api/v1/stream_template_create_response.json +++ b/schemas/jetstream/api/v1/stream_template_create_response.json @@ -202,14 +202,18 @@ "properties": { "cluster": { "type": "string", - "description": "The desired cluster name to place the stream" + "description": "The desired cluster name" }, "tags": { - "description": "Tags required on servers hosting this stream", + "description": "Tags required on servers hosting this stream or leader", "type": "array", "items": { "type": "string" } + }, + "preferred": { + "description": "A preferred server name to move the leader to", + "type": "string" } } }, diff --git a/schemas/jetstream/api/v1/stream_template_info_response.json b/schemas/jetstream/api/v1/stream_template_info_response.json index 176e02f8..f670b820 100644 --- a/schemas/jetstream/api/v1/stream_template_info_response.json +++ b/schemas/jetstream/api/v1/stream_template_info_response.json @@ -202,14 +202,18 @@ "properties": { "cluster": { "type": "string", - "description": "The desired cluster name to place the stream" + "description": "The desired cluster name" }, "tags": { - "description": "Tags required on servers hosting this stream", + "description": "Tags required on servers hosting this stream or leader", "type": "array", "items": { "type": "string" } + }, + "preferred": { + "description": "A preferred server name to move the leader to", + "type": "string" } } }, diff --git a/schemas/jetstream/api/v1/stream_update_request.json b/schemas/jetstream/api/v1/stream_update_request.json index 80380095..c3d3af25 100644 --- a/schemas/jetstream/api/v1/stream_update_request.json +++ b/schemas/jetstream/api/v1/stream_update_request.json @@ -178,14 +178,18 @@ "properties": { "cluster": { "type": "string", - "description": "The desired cluster name to place the stream" + "description": "The desired cluster name" }, "tags": { - "description": "Tags required on servers hosting this stream", + "description": "Tags required on servers hosting this stream or leader", "type": "array", "items": { "type": "string" } + }, + "preferred": { + "description": "A preferred server name to move the leader to", + "type": "string" } } }, diff --git a/schemas/jetstream/api/v1/stream_update_response.json b/schemas/jetstream/api/v1/stream_update_response.json index 457e6cbb..b68e0b95 100644 --- a/schemas/jetstream/api/v1/stream_update_response.json +++ b/schemas/jetstream/api/v1/stream_update_response.json @@ -193,14 +193,18 @@ "properties": { "cluster": { "type": "string", - "description": "The desired cluster name to place the stream" + "description": "The desired cluster name" }, "tags": { - "description": "Tags required on servers hosting this stream", + "description": "Tags required on servers hosting this stream or leader", "type": "array", "items": { "type": "string" } + }, + "preferred": { + "description": "A preferred server name to move the leader to", + "type": "string" } } }, diff --git a/streams.go b/streams.go index 6e713c94..99f3ef6a 100644 --- a/streams.go +++ b/streams.go @@ -417,6 +417,18 @@ func PlacementTags(tags ...string) StreamOption { } } +func PlacementPreferredLeader(leader string) StreamOption { + return func(o *api.StreamConfig) error { + if o.Placement == nil { + o.Placement = &api.Placement{} + } + + o.Placement.Preferred = leader + + return nil + } +} + func Mirror(stream *api.StreamSource) StreamOption { return func(o *api.StreamConfig) error { o.Mirror = stream diff --git a/test/streams_test.go b/test/streams_test.go index 5ab687f1..046c971f 100644 --- a/test/streams_test.go +++ b/test/streams_test.go @@ -834,6 +834,21 @@ func TestPlacementTags(t *testing.T) { } } +func TestPlacementPreferred(t *testing.T) { + cfg := testStreamConfig() + err := jsm.PlacementPreferredLeader("n1")(cfg) + checkErr(t, err, "failed") + if !cmp.Equal(cfg.Placement.Preferred, "n1") { + t.Fatalf("expected 'n1'' got %v", &cfg.Placement.Preferred) + } + + err = jsm.PlacementPreferredLeader("n2")(cfg) + checkErr(t, err, "failed") + if !cmp.Equal(cfg.Placement.Preferred, "n2") { + t.Fatalf("expected 'n2' got %v", cfg.Placement.Preferred) + } +} + func TestSources(t *testing.T) { cfg := testStreamConfig() expected := []*api.StreamSource{{Name: "one"}, {Name: "two"}}