Skip to content

Commit 4bfe6ad

Browse files
authored
add support for dedicated clusters to CDC resource (#428)
Add option for dedicated streaming clusters to use CDC
1 parent 54d4237 commit 4bfe6ad

File tree

3 files changed

+24
-6
lines changed

3 files changed

+24
-6
lines changed

docs/resources/cdc.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ resource "astra_cdc" "db_cdc" {
7474
database_name = astra_database.db_database.name
7575
table = astra_table.db_table.table
7676
keyspace = astra_database.db_database.keyspace
77+
pulsar_cluster = astra_streaming_tenant.cluster_name
7778
tenant_name = astra_streaming_tenant.streaming_tenant.tenant_name
7879
topic_partitions = 3
7980
}
@@ -96,6 +97,10 @@ resource "astra_cdc" "db_cdc" {
9697
- `tenant_name` (String) Streaming tenant name
9798
- `topic_partitions` (Number) Number of partitions in cdc topic.
9899

100+
### Optional
101+
102+
- `pulsar_cluster` (String) Pulsar cluster name
103+
99104
### Read-Only
100105

101106
- `connector_status` (String) Connector Status

examples/resources/astra_cdc/resource.tf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ resource "astra_cdc" "db_cdc" {
5959
database_name = astra_database.db_database.name
6060
table = astra_table.db_table.table
6161
keyspace = astra_database.db_database.keyspace
62+
pulsar_cluster = astra_streaming_tenant.cluster_name
6263
tenant_name = astra_streaming_tenant.streaming_tenant.tenant_name
6364
topic_partitions = 3
6465
}

internal/provider/resource_cdc.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ func resourceCDC() *schema.Resource {
6565
Required: true,
6666
ForceNew: true,
6767
},
68+
"pulsar_cluster": {
69+
Description: "Pulsar cluster name",
70+
Type: schema.TypeString,
71+
Optional: true,
72+
ForceNew: true,
73+
},
6874
"tenant_name": {
6975
Description: "Streaming tenant name",
7076
Type: schema.TypeString,
@@ -96,6 +102,7 @@ func resourceCDCDelete(ctx context.Context, resourceData *schema.ResourceData, m
96102
token := meta.(astraClients).token
97103

98104
id := resourceData.Id()
105+
pulsarClusterFromConfig := resourceData.Get("pulsar_cluster_name").(string)
99106

100107
databaseId, keyspace, table, tenantName, err := parseCDCID(id)
101108
if err != nil {
@@ -114,7 +121,7 @@ func resourceCDCDelete(ctx context.Context, resourceData *schema.ResourceData, m
114121
return diag.FromErr(fmt.Errorf("failed to read current organization: %w", err))
115122
}
116123

117-
pulsarCluster, pulsarToken, err := prepCDC(ctx, client, databaseId, token, org, streamingClient, tenantName)
124+
pulsarCluster, pulsarToken, err := prepCDC(ctx, client, databaseId, token, org, streamingClient, tenantName, pulsarClusterFromConfig)
118125
if err != nil {
119126
diag.FromErr(err)
120127
}
@@ -190,6 +197,7 @@ func resourceCDCRead(ctx context.Context, resourceData *schema.ResourceData, met
190197
token := meta.(astraClients).token
191198

192199
id := resourceData.Id()
200+
pulsarClusterFromConfig := resourceData.Get("pulsar_cluster_name").(string)
193201

194202
databaseId, keyspace, table, tenantName, err := parseCDCID(id)
195203
if err != nil {
@@ -206,7 +214,7 @@ func resourceCDCRead(ctx context.Context, resourceData *schema.ResourceData, met
206214
return diag.FromErr(fmt.Errorf("failed to read organization: %w", err))
207215
}
208216

209-
pulsarCluster, pulsarToken, err := prepCDC(ctx, client, databaseId, token, orgId, streamingClient, tenantName)
217+
pulsarCluster, pulsarToken, err := prepCDC(ctx, client, databaseId, token, orgId, streamingClient, tenantName, pulsarClusterFromConfig)
210218
if err != nil {
211219
diag.FromErr(err)
212220
}
@@ -291,6 +299,7 @@ func resourceCDCCreate(ctx context.Context, resourceData *schema.ResourceData, m
291299
databaseId := resourceData.Get("database_id").(string)
292300
databaseName := resourceData.Get("database_name").(string)
293301
topicPartitions := resourceData.Get("topic_partitions").(int)
302+
pulsarClusterFromConfig := resourceData.Get("pulsar_cluster_name").(string)
294303
tenantName := resourceData.Get("tenant_name").(string)
295304

296305
orgBody, _ := client.GetCurrentOrganization(ctx)
@@ -309,7 +318,7 @@ func resourceCDCCreate(ctx context.Context, resourceData *schema.ResourceData, m
309318
TopicPartitions: topicPartitions,
310319
}
311320

312-
pulsarCluster, pulsarToken, err := prepCDC(ctx, client, databaseId, token, org, streamingClient, tenantName)
321+
pulsarCluster, pulsarToken, err := prepCDC(ctx, client, databaseId, token, org, streamingClient, tenantName, pulsarClusterFromConfig)
313322
if err != nil {
314323
return diag.FromErr(err)
315324
}
@@ -386,7 +395,10 @@ func getTableCDCStatus(databaseID, keyspace, table string, cdcStatuses CDCStatus
386395
return nil
387396
}
388397

389-
func prepCDC(ctx context.Context, client *astra.ClientWithResponses, databaseId string, token string, org OrgId, streamingClient *astrastreaming.ClientWithResponses, tenantName string) (string, string, error) {
398+
// prepCDC get the pulsar cluster name (if it's not set) and the pulsar token
399+
func prepCDC(ctx context.Context, client *astra.ClientWithResponses, databaseId string, token string, org OrgId,
400+
streamingClient *astrastreaming.ClientWithResponses, tenantName string, pulsarCluster string) (string, string, error) {
401+
390402
databaseResourceData := schema.ResourceData{}
391403
db, err := getDatabase(ctx, &databaseResourceData, client, databaseId)
392404
if err != nil {
@@ -395,9 +407,8 @@ func prepCDC(ctx context.Context, client *astra.ClientWithResponses, databaseId
395407

396408
// In most astra APIs there are dashes in region names depending on the cloud provider, this seems not to be the case for streaming
397409
cloudProvider := string(*db.Info.CloudProvider)
398-
fmt.Printf("%s", cloudProvider)
410+
pulsarCluster = getPulsarCluster(pulsarCluster, cloudProvider, *db.Info.Region, "")
399411

400-
pulsarCluster := getPulsarCluster("", cloudProvider, *db.Info.Region, "")
401412
pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, streamingClient, tenantName)
402413
return pulsarCluster, pulsarToken, err
403414
}
@@ -455,6 +466,7 @@ func setCDCData(resourceData *schema.ResourceData, cdc CDCStatus) diag.Diagnosti
455466
return nil
456467
}
457468

469+
// parseCDCID expects an ID in the format "databaseId/keyspace/table/tenantName"
458470
func parseCDCID(id string) (string, string, string, string, error) {
459471
idParts := strings.Split(strings.ToLower(id), "/")
460472
if len(idParts) != 4 {

0 commit comments

Comments
 (0)