Skip to content

Commit a3c376a

Browse files
authored
streaming: sink and tests improvements (#430)
- allow dedicated cluster config for streaming sink - add streaming cdc test to automated CI
1 parent 3a44d7c commit a3c376a

File tree

7 files changed

+131
-54
lines changed

7 files changed

+131
-54
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ jobs:
6262
runs-on: ubuntu-latest
6363
timeout-minutes: 20
6464
strategy:
65-
max-parallel: 2
65+
max-parallel: 1 # need to run one at a time to avoid conflicts
6666
matrix:
6767
# Test with min and max supported Terraform versions
6868
terraform:

docs/resources/streaming_sink.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ resource "astra_streaming_sink" "streaming_sink" {
9292

9393
- `archive` (String) Name of the sink archive type to use. Defaults to the value of sink_name. Must be formatted as a URL, e.g. 'builtin://jdbc-clickhouse
9494
- `deletion_protection` (Boolean) Whether or not to allow Terraform to destroy this streaming sink. Unless this field is set to false in Terraform state, a `terraform destroy` or `terraform apply` command that deletes the instance will fail. Defaults to `true`.
95+
- `pulsar_cluster` (String) Name of the pulsar cluster in which to create the sink. If left blank, the name will be inferred from thecloud provider and region
9596

9697
### Read-Only
9798

internal/provider/resource_cdc.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,8 @@ func prepCDC(ctx context.Context, client *astra.ClientWithResponses, databaseId
404404
db, err := getDatabase(ctx, &databaseResourceData, client, databaseId)
405405
if err != nil {
406406
return "", "", err
407+
} else if db == nil || db.Info.CloudProvider == nil {
408+
return "", "", fmt.Errorf("failed to get database metadata for databaseID: %s", databaseId)
407409
}
408410

409411
// In most astra APIs there are dashes in region names depending on the cloud provider, this seems not to be the case for streaming

internal/provider/resource_cdc_test.go

Lines changed: 76 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,40 +7,95 @@ import (
77
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
88
)
99

10-
func TestCDC(t *testing.T) {
11-
// Disable this test by default until test works with non-prod clusters
12-
checkRequiredTestVars(t, "ASTRA_TEST_CDC_TEST_ENABLED")
10+
// https://www.terraform.io/docs/extend/testing/acceptance-tests/index.html
11+
func TestAccAstraCDC(t *testing.T) {
12+
resource.Test(t, resource.TestCase{
13+
PreCheck: func() { testAccPreCheck(t) },
14+
Providers: testAccProviders,
15+
Steps: []resource.TestStep{
16+
{
17+
Config: testAstraCDCConfig(),
18+
},
19+
},
20+
})
21+
}
22+
23+
func testAstraCDCConfig() string {
24+
return `
1325
26+
resource "astra_cdc" "cdc-1" {
27+
database_id = "de76e588-761f-4e74-afed-1d2092aaaa84"
28+
database_name = "terraform-cdc-test"
29+
keyspace = "ks1"
30+
table = "tbl1"
31+
topic_partitions = 3
32+
pulsar_cluster = "pulsar-gcp-useast1-staging"
33+
tenant_name = "terraform-tests1"
34+
}`
35+
}
36+
37+
func TestAstraCDCFull(t *testing.T) {
38+
checkRequiredTestVars(t, "ASTRA_TEST_CDC_FULL_TEST_ENABLED")
39+
streamingTenant := "terraform-cdc-test-" + randomString(6)
1440
resource.Test(t, resource.TestCase{
1541
PreCheck: func() { testAccPreCheck(t) },
1642
Providers: testAccProviders,
1743
Steps: []resource.TestStep{
1844
{
19-
Config: testAccCDCConfiguration(),
45+
Config: testAstraCDCConfigFull("GCP", "us-east1", streamingTenant),
2046
},
2147
},
2248
})
2349
}
2450

25-
// https://www.terraform.io/docs/extend/testing/acceptance-tests/index.html
26-
func testAccCDCConfiguration() string {
51+
func testAstraCDCConfigFull(cloud_provider, region, streamingTenant string) string {
2752
return fmt.Sprintf(`
28-
resource "astra_streaming_tenant" "streaming_tenant-1" {
29-
tenant_name = "terraformtest"
30-
topic = "terraformtest"
31-
region = "useast-4"
32-
cloud_provider = "gcp"
33-
user_email = "seb@datastax.com"
53+
54+
resource "astra_database" "database_1" {
55+
cloud_provider = "%s"
56+
regions = ["%s"]
57+
name = "terraform-cdc-test"
58+
keyspace = "ks1"
59+
deletion_protection = "false"
3460
}
35-
resource "astra_cdc" "cdc-1" {
36-
depends_on = [ astra_streaming_tenant.streaming_tenant-1 ]
37-
database_id = "5b70892f-e01a-4595-98e6-19ecc9985d50"
38-
database_name = "sai_test"
39-
table = "test"
40-
keyspace = "sai_test"
41-
topic_partitions = 3
42-
tenant_name = astra_streaming_tenant.streaming_tenant-1.tenant_name
61+
62+
resource "astra_table" "table_1" {
63+
database_id = astra_database.database_1.id
64+
keyspace = astra_database.database_1.keyspace
65+
region = "%s"
66+
table = "cdctable1"
67+
clustering_columns = "a"
68+
partition_keys = "b"
69+
column_definitions = [
70+
{
71+
Name: "a"- Static: false
72+
TypeDefinition: "text"
73+
},
74+
{
75+
Name: "b"
76+
Static: false
77+
TypeDefinition: "text"
78+
}
79+
]
4380
}
4481
45-
`)
82+
resource "astra_streaming_tenant" "streaming_tenant_1" {
83+
tenant_name = "%s"
84+
cloud_provider = lower(astra_database.database_1.cloud_provider)
85+
region = astra_table.table_1.region
86+
user_email = "test@datastax.com"
87+
deletion_protection = "false"
88+
}
89+
90+
resource "astra_cdc" "cdc-1" {
91+
depends_on = [ astra_database.database_1, astra_streaming_tenant.streaming_tenant_1 ]
92+
database_id = astra_database.database_1.id
93+
database_name = astra_database.database_1.name
94+
keyspace = astra_database.database_1.keyspace
95+
table = astra_table.table_1.table
96+
topic_partitions = 3
97+
tenant_name = astra_streaming_tenant.streaming_tenant_1.tenant_name
98+
pulsar_cluster = astra_streaming_tenant.streaming_tenant_1.cluster_name
99+
}`, cloud_provider, region, region, streamingTenant)
100+
46101
}

internal/provider/resource_streaming_sink.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ func resourceStreamingSink() *schema.Resource {
4444
ForceNew: true,
4545
ValidateFunc: validation.StringMatch(regexp.MustCompile("^.{2,}"), "name must be atleast 2 characters"),
4646
},
47+
"pulsar_cluster": {
48+
Description: "Name of the pulsar cluster in which to create the sink. If left blank, the name will be inferred from the" +
49+
"cloud provider and region",
50+
Type: schema.TypeString,
51+
Optional: true,
52+
ForceNew: true,
53+
ValidateFunc: validation.StringMatch(regexp.MustCompile("^.{2,}"), "name must be atleast 2 characters"),
54+
},
4755
"region": {
4856
Description: "cloud region",
4957
Type: schema.TypeString,
@@ -136,11 +144,12 @@ func resourceStreamingSinkDelete(ctx context.Context, resourceData *schema.Resou
136144
sinkName := resourceData.Get("sink_name").(string)
137145
namespace := resourceData.Get("namespace").(string)
138146

147+
pulsarClusterName := resourceData.Get("pulsar_cluster").(string)
139148
rawRegion := resourceData.Get("region").(string)
140149
region := strings.ReplaceAll(rawRegion, "-", "")
141150
cloudProvider := resourceData.Get("cloud_provider").(string)
142151

143-
pulsarCluster := getPulsarCluster("", cloudProvider, region, "")
152+
pulsarCluster := getPulsarCluster(pulsarClusterName, cloudProvider, region, "")
144153

145154
orgResp, err := astraClient.GetCurrentOrganization(ctx)
146155
if err != nil {
@@ -210,11 +219,12 @@ func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.Resourc
210219
topic := resourceData.Get("topic").(string)
211220
namespace := resourceData.Get("namespace").(string)
212221

222+
pulsarClusterName := resourceData.Get("pulsar_cluster").(string)
213223
rawRegion := resourceData.Get("region").(string)
214224
region := strings.ReplaceAll(rawRegion, "-", "")
215225
cloudProvider := resourceData.Get("cloud_provider").(string)
216226

217-
pulsarCluster := getPulsarCluster("", cloudProvider, region, "")
227+
pulsarCluster := getPulsarCluster(pulsarClusterName, cloudProvider, region, "")
218228

219229
orgBody, err := astraClient.GetCurrentOrganization(ctx)
220230
if err != nil {
@@ -260,6 +270,7 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
260270
region := strings.ReplaceAll(rawRegion, "-", "")
261271
cloudProvider := resourceData.Get("cloud_provider").(string)
262272
tenantName := resourceData.Get("tenant_name").(string)
273+
pulsarClusterName := resourceData.Get("pulsar_cluster").(string)
263274

264275
sinkName := resourceData.Get("sink_name").(string)
265276
archive := resourceData.Get("archive").(string)
@@ -294,7 +305,7 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
294305
return diag.FromErr(fmt.Errorf("failed to read pulsar clusters. Status code: %s, msg:\n%s", streamingClustersResponse.Status(), string(streamingClustersResponse.Body)))
295306
}
296307

297-
pulsarCluster := getPulsarCluster("", cloudProvider, region, "")
308+
pulsarCluster := getPulsarCluster(pulsarClusterName, cloudProvider, region, "")
298309

299310
var configs map[string]interface{}
300311
if err := json.Unmarshal([]byte(rawConfigs), &configs); err != nil {

internal/provider/resource_streaming_sink_test.go

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,58 +8,61 @@ import (
88
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
99
)
1010

11-
func TestStreamingSink(t *testing.T) {
11+
func TestAccStreamingSink(t *testing.T) {
1212
// Disable this test by default until test works with non-prod clusters
1313
checkRequiredTestVars(t, "ASTRA_TEST_STREAMING_SINK_TEST_ENABLED")
1414

15-
//tenantName := fmt.Sprintf("terraform-test-%s", uuid.New().String())[0:20]
16-
snowflakeTenantName := fmt.Sprintf("terraform-test-%s", uuid.New().String())[0:20]
15+
tenantName := fmt.Sprintf("terraform-test-%s", uuid.New().String())[0:20]
16+
//snowflakeTenantName := fmt.Sprintf("terraform-test-%s", uuid.New().String())[0:20]
1717

1818
resource.Test(t, resource.TestCase{
19-
PreCheck: func() { testAccPreCheck(t) },
20-
Providers: testAccProviders,
19+
PreCheck: func() { testAccPreCheck(t) },
20+
ProtoV6ProviderFactories: testAccProtoV6ProviderFactories,
2121
Steps: []resource.TestStep{
22-
// {
23-
// Config: testAccStreamingSinkConfiguration(),
24-
// },
2522
{
26-
Config: testAccStreamingSnowflakeSinkConfiguration(snowflakeTenantName),
23+
Config: testAccStreamingSinkConfiguration(tenantName),
2724
},
25+
// {
26+
// Config: testAccStreamingSnowflakeSinkConfiguration(snowflakeTenantName),
27+
// },
2828
},
2929
})
3030
}
3131

3232
// https://www.terraform.io/docs/extend/testing/acceptance-tests/index.html
3333
func testAccStreamingSinkConfiguration(tenantName string) string {
3434
return fmt.Sprintf(`
35-
resource "astra_streaming_tenant" "streaming_tenant-1" {
36-
tenant_name = "%s"
37-
topic = "terraformtest"
38-
region = "useast-4"
39-
cloud_provider = "gcp"
40-
user_email = "seb@datastax.com"
35+
resource "astra_streaming_tenant" "streaming_tenant_1" {
36+
deletion_protection = false
37+
tenant_name = "%s"
38+
topic = "terraformtest"
39+
cloud_provider = "gcp"
40+
region = "us-east4"
41+
user_email = "test@datastax.com"
4142
}
4243
43-
resource "astra_cdc" "cdc-1" {
44-
depends_on = [ astra_streaming_tenant.streaming_tenant-1 ]
45-
database_id = "5b70892f-e01a-4595-98e6-19ecc9985d50"
46-
database_name = "sai_test"
47-
table = "test"
48-
keyspace = "sai_test"
49-
topic_partitions = 3
50-
tenant_name = astra_streaming_tenant.streaming_tenant-1.tenant_name
44+
resource "astra_streaming_topic" "streaming_topic_1" {
45+
depends_on = [astra_streaming_tenant.streaming_tenant_1]
46+
deletion_protection = false
47+
cluster = astra_streaming_tenant.streaming_tenant_1.cluster_name
48+
tenant = astra_streaming_tenant.streaming_tenant_1.tenant_name
49+
namespace = "default"
50+
topic = "terraform-sink-test-1"
5151
}
52-
resource "astra_streaming_sink" "streaming_sink-1" {
53-
depends_on = [ astra_streaming_tenant.streaming_tenant-1, astra_cdc.cdc-1 ]
54-
tenant_name = astra_streaming_tenant.streaming_tenant-1.tenant_name
55-
topic = astra_cdc.cdc-1.data_topic
56-
region = "useast-4"
52+
53+
resource "astra_streaming_sink" "streaming_sink_1" {
54+
depends_on = [ astra_streaming_tenant.streaming_tenant_1, astra_streaming_topic.streaming_topic_1 ]
55+
deletion_protection = false
56+
pulsar_cluster = "pulsar-gcp-useast4-staging"
5757
cloud_provider = "gcp"
58+
region = "us-east4"
59+
tenant_name = astra_streaming_tenant.streaming_tenant_1.tenant_name
60+
namespace = "default"
61+
topic = astra_streaming_topic.streaming_topic_1.topic_fqn
5862
sink_name = "jdbc-clickhouse"
5963
retain_ordering = true
6064
processing_guarantees = "ATLEAST_ONCE"
6165
parallelism = 3
62-
namespace = "default"
6366
sink_configs = jsonencode({
6467
"userName": "clickhouse",
6568
"password": "password",
@@ -70,6 +73,7 @@ resource "astra_streaming_sink" "streaming_sink-1" {
7073
}
7174
`, tenantName)
7275
}
76+
7377
func testAccStreamingSnowflakeSinkConfiguration(tenantName string) string {
7478
return fmt.Sprintf(`
7579
resource "astra_streaming_tenant" "streaming_tenant-1" {

test/run_tests.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ set -o allexport errexit
55
DEFAULT_TEST_ENV_FILE="test.env"
66
DEFAULT_TEST_ASTRA_API_URL="https://api.test.cloud.datastax.com"
77
DEFAULT_TEST_ASTRA_STREAMING_API_URL="https://api.staging.streaming.datastax.com"
8+
DEFAULT_TEST_ASTRA_APPS_DOMAIN="apps.astra-test.datastax.com"
89

910
SCRIPT_PATH=$(dirname -- "$0")
1011

@@ -34,6 +35,9 @@ setup_env() {
3435
if [ -z "$ASTRA_TEST_TIMEOUT" ]; then
3536
ASTRA_TEST_TIMEOUT="15m"
3637
fi
38+
if [ -z "$ASTRA_APPS_DOMAIN" ]; then
39+
ASTRA_APPS_DOMAIN="$DEFAULT_TEST_ASTRA_APPS_DOMAIN"
40+
fi
3741
}
3842

3943
run_tests() {

0 commit comments

Comments
 (0)