Skip to content

Commit

Permalink
Add MessageTTL option to namespace (#65)
Browse files Browse the repository at this point in the history
* Add MessageTTL option to namespace

* Readme updates

* Style fix

Error: ST1003: struct field MessageTtlInSeconds should be MessageTTLInSeconds (stylecheck)

* Add field description
  • Loading branch information
lovrogalesic-toast authored Oct 2, 2022
1 parent c2171c1 commit ed259bc
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 7 deletions.
29 changes: 22 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ A [Terraform](https://www.terraform.io) provider for managing [Apache Pulsar Ent
- Run `make build`, it will generate a binary file named `terraform-provider-pulsar`
- Copy this `terraform-provider-pulsar` binary file to your terraform plugin directory based on your OS:

| Operating System | User plugins directory |
|----------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------|
| Windows(amd64) | %APPDATA%\terraform.d\plugins\registry.terraform.io\streamnative\pulsar\0.1.0\windows_amd64\ |
| Linux(amd64) | ~/.terraform.d/plugins/registry.terraform.io/streamnative/pulsar/0.1.0/linux_amd64/ |
| MacOS(amd64) | ~/.terraform.d/plugins/registry.terraform.io/streamnative/pulsar/0.1.0/darwin_amd64/ |
| Operating System | User plugins directory |
|------------------|----------------------------------------------------------------------------------------------|
| Windows(amd64) | %APPDATA%\terraform.d\plugins\registry.terraform.io\streamnative\pulsar\0.1.0\windows_amd64\ |
| Linux(amd64) | ~/.terraform.d/plugins/registry.terraform.io/streamnative/pulsar/0.1.0/linux_amd64/ |
| MacOS(amd64) | ~/.terraform.d/plugins/registry.terraform.io/streamnative/pulsar/0.1.0/darwin_amd64/ |

# Using the Provider

Expand Down Expand Up @@ -186,6 +186,7 @@ resource "pulsar_namespace" "test" {
max_consumers_per_subscription = "50"
max_consumers_per_topic = "50"
max_producers_per_topic = "50"
message_ttl_seconds = "86400"
replication_clusters = ["standalone"]
}
Expand Down Expand Up @@ -231,12 +232,26 @@ resource "pulsar_namespace" "test" {
| `namespace_config` | Configuration for your namespaces like max allowed producers to produce messages | No |
| `dispatch_rate` | Apache Pulsar throttling config | No |
| `retention_policies` | Data retention policies | No |
| `schema_validation_enforce` | Enable or disable schema validation | No |
| `schema_compatibility_strategy` | Set schema compatibility strategy | No |
| `backlog_quota` | [Backlog Quota](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-backlog-quota-policies) for all topics | No |
| `persistence_policies` | [Persistence policies](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-persistence-policies) for all topics under a given namespace | No |
| `permission_grant` | [Permission grants](https://pulsar.apache.org/docs/en/admin-api-permissions/) on a namespace. This block can be repeated for each grant you'd like to add | No |


namespace_config nested schema

| Property | Description | Required |
|----------------------------------|-------------------------------------------------------------------|----------|
| `anti_affinity` | Anti-affinity group name | No |
| `is_allow_auto_update_schema` | Is schema auto-update allowed | No |
| `max_consumers_per_subscription` | Sets the max consumers per subscription | No |
| `max_consumers_per_topic` | Sets the max consumers per topic | No |
| `max_producers_per_topic` | Sets the max producers per topic | No |
| `message_ttl_seconds` | Sets the message TTL in seconds | No |
| `replication_clusters` | List of replication clusters for the namespace | No |
| `schema_compatibility_strategy` | Set schema compatibility strategy | No |
| `schema_validation_enforce` | Enable or disable schema validation | No |


The `schema_compatibility_strategy` can take the following values:

- AutoUpdateDisabled
Expand Down
1 change: 1 addition & 0 deletions docs/resources/namespace.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Optional:
- `max_consumers_per_subscription` (Number)
- `max_consumers_per_topic` (Number)
- `max_producers_per_topic` (Number)
- `message_ttl_seconds` (Number)
- `replication_clusters` (List of String)
- `schema_compatibility_strategy` (String)
- `schema_validation_enforce` (Boolean)
Expand Down
1 change: 1 addition & 0 deletions examples/namespaces/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ resource "pulsar_namespace" "test" {
max_consumers_per_subscription = "50"
max_consumers_per_topic = "50"
max_producers_per_topic = "50"
message_ttl_seconds = "86400"
replication_clusters = [
"standalone"]
}
Expand Down
1 change: 1 addition & 0 deletions pulsar/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ only once, even if the message is produced more than once`,
"max_producers_per_topic": "Max number of producers per topic",
"max_consumers_per_subscription": "Max number of consumers per subscription",
"max_consumers_per_topic": "Max number of consumers per topic",
"message_ttl_seconds": "Sets the message time to live",
"dispatch_rate": "Data transfer rate, in and out of the Pulsar Broker",
"persistence_policy": "Policy for the namespace for data persistence",
"backlog_quota": "",
Expand Down
21 changes: 21 additions & 0 deletions pulsar/resource_pulsar_namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ func resourcePulsarNamespace() *schema.Resource {
Default: -1,
ValidateFunc: validateGtEq0,
},
"message_ttl_seconds": {
Type: schema.TypeInt,
Optional: true,
Default: -1,
ValidateFunc: validateGtEq0,
},
"replication_clusters": {
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -294,6 +300,11 @@ func resourcePulsarNamespaceRead(d *schema.ResourceData, meta interface{}) error
return fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err)
}

messageTTL, err := client.GetNamespaceMessageTTL(ns.String())
if err != nil {
return fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceMessageTTL: %w", err)
}

schemaValidationEnforce, err := client.GetSchemaValidationEnforced(*ns)
if err != nil {
return fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaValidationEnforced: %w", err)
Expand Down Expand Up @@ -325,6 +336,7 @@ func resourcePulsarNamespaceRead(d *schema.ResourceData, meta interface{}) error
"max_consumers_per_subscription": maxConsPerSub,
"max_consumers_per_topic": maxConsPerTopic,
"max_producers_per_topic": maxProdPerTopic,
"message_ttl_seconds": messageTTL,
"replication_clusters": replClusters,
"schema_validation_enforce": schemaValidationEnforce,
"schema_compatibility_strategy": schemaCompatibilityStrategy.String(),
Expand Down Expand Up @@ -461,6 +473,13 @@ func resourcePulsarNamespaceUpdate(d *schema.ResourceData, meta interface{}) err
errs = multierror.Append(errs, fmt.Errorf("SetMaxProducersPerTopic: %w", err))
}
}

if nsCfg.MessageTTLInSeconds >= 0 {
if err = client.SetNamespaceMessageTTL(nsName.String(), nsCfg.MessageTTLInSeconds); err != nil {
errs = multierror.Append(errs, fmt.Errorf("SetNamespaceMessageTTL: %w", err))
}
}

if err = client.SetSchemaValidationEnforced(*nsName, nsCfg.SchemaValidationEnforce); err != nil {
errs = multierror.Append(errs, fmt.Errorf("SetSchemaValidationEnforced: %w", err))
}
Expand Down Expand Up @@ -634,6 +653,7 @@ func namespaceConfigToHash(v interface{}) int {
buf.WriteString(fmt.Sprintf("%d-", m["max_consumers_per_subscription"].(int)))
buf.WriteString(fmt.Sprintf("%d-", m["max_consumers_per_topic"].(int)))
buf.WriteString(fmt.Sprintf("%d-", m["max_producers_per_topic"].(int)))
buf.WriteString(fmt.Sprintf("%d-", m["message_ttl_seconds"].(int)))
buf.WriteString(fmt.Sprintf("%s-", m["replication_clusters"].([]interface{})))
buf.WriteString(fmt.Sprintf("%t-", m["schema_validation_enforce"].(bool)))
buf.WriteString(fmt.Sprintf("%s-", m["schema_compatibility_strategy"].(string)))
Expand Down Expand Up @@ -695,6 +715,7 @@ func unmarshalNamespaceConfig(v *schema.Set) *types.NamespaceConfig {
nsConfig.MaxProducersPerTopic = data["max_producers_per_topic"].(int)
nsConfig.MaxConsumersPerTopic = data["max_consumers_per_topic"].(int)
nsConfig.MaxConsumersPerSubscription = data["max_consumers_per_subscription"].(int)
nsConfig.MessageTTLInSeconds = data["message_ttl_seconds"].(int)
nsConfig.AntiAffinity = data["anti_affinity"].(string)
nsConfig.SchemaValidationEnforce = data["schema_validation_enforce"].(bool)
nsConfig.SchemaCompatibilityStrategy = data["schema_compatibility_strategy"].(string)
Expand Down
1 change: 1 addition & 0 deletions pulsar/resource_pulsar_namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ resource "pulsar_namespace" "test" {
max_consumers_per_subscription = "50"
max_consumers_per_topic = "50"
max_producers_per_topic = "50"
message_ttl_seconds = "86400"
replication_clusters = ["standalone"]
is_allow_auto_update_schema = false
}
Expand Down
1 change: 1 addition & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type (
MaxConsumersPerTopic int
MaxProducersPerTopic int
MaxConsumersPerSubscription int
MessageTTLInSeconds int
SchemaValidationEnforce bool
SchemaCompatibilityStrategy string
IsAllowAutoUpdateSchema bool
Expand Down

0 comments on commit ed259bc

Please sign in to comment.