Skip to content

Commit

Permalink
Azure blob store integration and collection creation support
Browse files Browse the repository at this point in the history
  • Loading branch information
nithinvenkatesh-openai authored and nithinv13 committed Jul 27, 2024
1 parent 319defe commit d33b623
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 13 deletions.
26 changes: 13 additions & 13 deletions modules/test-resources/files/cities.json
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
{
"cities": {
"city": [
{
"country": "USA",
"city": [
"Burlington"
],
"population": "46000",
"visited": "true"
},
]
}
}
"cities": {
"city": [
{
"country": "USA",
"city": [
"Burlington"
],
"population": "46000",
"visited": "true"
}
]
}
}
13 changes: 13 additions & 0 deletions rockset/bucket_collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ func flattenBucketSourceParams(ctx context.Context, sourceType string, sources *
m["prefix"] = source.S3.Prefix
m["pattern"] = source.S3.Pattern
m["bucket"] = source.S3.Bucket
case "azure_blob_storage":
if source.AzureBlobStorage == nil {
return nil, fmt.Errorf("source type is %s but not Azure Blob Storage parameters found", sourceType)
}
m["prefix"] = source.AzureBlobStorage.Prefix
m["pattern"] = source.AzureBlobStorage.Pattern
m["container"] = source.AzureBlobStorage.Container
default:
return nil, fmt.Errorf("unknown source type %s", sourceType)
}
Expand Down Expand Up @@ -114,6 +121,12 @@ func makeBucketSourceParams(sourceType string, in interface{}) ([]openapi.Source
source.S3.Prefix = toStringPtrNilIfEmpty(val["prefix"].(string))
source.S3.Pattern = toStringPtrNilIfEmpty(val["pattern"].(string))
source.S3.Bucket = val["bucket"].(string)
case "azure_blob_storage":
source.AzureBlobStorage = openapi.NewSourceAzureBlobStorageWithDefaults()
source.AzureBlobStorage.Prefix = toStringPtrNilIfEmpty(val["prefix"].(string))
source.AzureBlobStorage.Pattern = toStringPtrNilIfEmpty(val["pattern"].(string))
container := val["container"].(string)
source.AzureBlobStorage.Container = &container
default:
panic("unknown source type " + sourceType)
}
Expand Down
1 change: 1 addition & 0 deletions rockset/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func Provider() *schema.Provider {
"rockset_query_lambda_tag": resourceQueryLambdaTag(),
"rockset_role": resourceRole(),
"rockset_s3_collection": resourceS3Collection(),
"rockset_azure_blob_storage_collection": resourceAzureBlobStorageCollection(),
"rockset_s3_integration": resourceS3Integration(),
"rockset_azure_blob_storage_integration": resourceAzureBlobStorageIntegration(),
"rockset_user": resourceUser(),
Expand Down
144 changes: 144 additions & 0 deletions rockset/resource_azure_blob_storage_collection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package rockset

import (
"context"

"github.com/hashicorp/terraform-plugin-log/tflog"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/rockset/rockset-go-client/option"

"github.com/rockset/rockset-go-client"
)

func azureBlobStorageCollectionSchema() map[string]*schema.Schema {
return map[string]*schema.Schema{
"source": {
Description: "Defines a source for this collection.",
Type: schema.TypeSet,
ForceNew: true,
Optional: true,
MinItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"integration_name": {
Description: "The name of the Rockset Azure Blob Store integration. If no Azure Blob Store integration is provided " +
"only data in public Azure Blob Store buckets are accessible.",
Type: schema.TypeString,
ForceNew: true,
Required: true,
ValidateFunc: rocksetNameValidator,
},
"prefix": {
Type: schema.TypeString,
ForceNew: true,
Optional: true,
Deprecated: "use pattern instead",
Description: "Simple path prefix to Azure Blob Storage container keys.",
},
"pattern": {
Type: schema.TypeString,
ForceNew: true,
Optional: true,
Description: "Regex path pattern to Azure Blob Storage keys.",
},
"container": {
Type: schema.TypeString,
ForceNew: true,
Required: true,
Description: "Azure Blob Store container containing the target data.",
},
"format": formatSchema(),
"csv": csvSchema(),
"xml": xmlSchema(),
},
},
},
} // End schema return
} // End func

func resourceAzureBlobStorageCollection() *schema.Resource {
return &schema.Resource{
Description: "Manages a collection with on or more Azure Blob Storage sources attached. " +
"Uses an Azure Blob Storage integration to access the Azure Blob Storage container. If no integration is provided, " +
"only data in public storage accounts and containers are accessible.\n\n",

CreateContext: resourceAzureBlobStorageCollectionCreate,
ReadContext: resourceAzureBlobStorageCollectionRead,
UpdateContext: resourceCollectionUpdate, // No change from base collection update
DeleteContext: resourceCollectionDelete, // No change from base collection delete

Importer: &schema.ResourceImporter{
StateContext: schema.ImportStatePassthroughContext,
},

// This schema will use the base collection schema as a foundation
// And layer on just the necessary fields for an s3 collection
Schema: mergeSchemas(baseCollectionSchema(), azureBlobStorageCollectionSchema()),
Timeouts: &schema.ResourceTimeout{
Create: schema.DefaultTimeout(defaultCollectionTimeout),
},
}
}

func resourceAzureBlobStorageCollectionCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
rc := meta.(*rockset.RockClient)
var diags diag.Diagnostics
var err error

name := d.Get("name").(string)
workspace := d.Get("workspace").(string)

// Add all base schema fields
params := createBaseCollectionRequest(d)
// Add fields for Azure Blob Storage
sources, err := makeBucketSourceParams("azure_blob_storage", d.Get("source"))
if err != nil {
return DiagFromErr(err)
}
params.Sources = sources

c, err := rc.CreateCollection(ctx, workspace, name, option.WithCollectionRequest(*params))
if err != nil {
return DiagFromErr(err)
}
tflog.Trace(ctx, "created Rockset collection", map[string]interface{}{"workspace": workspace, "name": name},
sourcesToTraceInfo(c.Sources))

if err = waitForCollectionAndDocuments(ctx, rc, d, workspace, name); err != nil {
return DiagFromErr(err)
}

d.SetId(toID(workspace, name))

return diags
}

func resourceAzureBlobStorageCollectionRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
rc := meta.(*rockset.RockClient)
var diags diag.Diagnostics
var err error

workspace, name := workspaceAndNameFromID(d.Id())

collection, err := rc.GetCollection(ctx, workspace, name)
if err != nil {
return checkForNotFoundError(d, err)
}
tflog.Trace(ctx, "read Rockset collection", map[string]interface{}{"workspace": workspace, "name": name},
sourcesToTraceInfo(collection.Sources))

// Gets all the fields any generic collection has
err = parseBaseCollection(&collection, d)
if err != nil {
return DiagFromErr(err)
}

// Gets all the fields relevant to a s3 collection
err = parseBucketCollection(ctx, "azure_blob_storage", &collection, d)
if err != nil {
return DiagFromErr(err)
}

return diags
}
82 changes: 82 additions & 0 deletions rockset/resource_azure_blob_storage_collection_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package rockset

import (
"testing"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/rockset/rockset-go-client/openapi"
)

func TestAccAzureBlobStorageCollection_Basic(t *testing.T) {
var collection openapi.Collection

name := randomName("azure_blob_storage")
values := Values{
Name: name,
Collection: name,
Workspace: name,
Description: description(),
}

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ProviderFactories: testAccProviderFactories,
CheckDestroy: testAccCheckRocksetCollectionDestroy, // Reused from base collection
Steps: []resource.TestStep{
{
Config: getHCLTemplate("azure_blob_storage_collection.tf", values),
Check: resource.ComposeTestCheckFunc(
testAccCheckRocksetCollectionExists("rockset_azure_blob_storage_collection.test", &collection), // Reused from base collection
resource.TestCheckResourceAttr("rockset_azure_blob_storage_collection.test", "name", values.Collection),
resource.TestCheckResourceAttr("rockset_azure_blob_storage_collection.test", "workspace", values.Workspace),
resource.TestCheckResourceAttr("rockset_azure_blob_storage_collection.test", "description", values.Description),
resource.TestCheckResourceAttr("rockset_azure_blob_storage_collection.test", "retention_secs", "3600"),
),
},
},
})
}

func TestAccAzureBlobStorageCollection_Json(t *testing.T) {
var collection openapi.Collection

name := randomName("azure_blob_storage-json")
values := Values{
Name: name,
Collection: name,
Workspace: name,
Description: description(),
}

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ProviderFactories: testAccProviderFactories,
CheckDestroy: testAccCheckRocksetCollectionDestroy, // Reused from base collection
Steps: []resource.TestStep{
{
Config: getHCLTemplate("azure_blob_storage_collection_json.tf", values),
Check: resource.ComposeTestCheckFunc(
testAccCheckRocksetCollectionExists("rockset_azure_blob_storage_collection.test", &collection), // Reused from base collection
resource.TestCheckResourceAttr("rockset_azure_blob_storage_collection.test", "name", values.Name),
resource.TestCheckResourceAttr("rockset_azure_blob_storage_collection.test", "workspace", values.Workspace),
resource.TestCheckResourceAttr("rockset_azure_blob_storage_collection.test", "description", values.Description),
resource.TestCheckResourceAttr("rockset_azure_blob_storage_collection.test", "retention_secs", "3600"),
resource.TestCheckResourceAttr("rockset_azure_blob_storage_collection.test", "source.0.integration_name", values.Name),
resource.TestCheckResourceAttr("rockset_azure_blob_storage_collection.test", "source.0.container", "sampledatasets"),
resource.TestCheckResourceAttr("rockset_azure_blob_storage_collection.test", "source.0.pattern", "product.json"),
resource.TestCheckResourceAttr("rockset_azure_blob_storage_collection.test", "source.0.format", "json"),
),
},
{
PreConfig: func() {
triggerWriteAPISourceAdd(t, values.Workspace, values.Collection)
},
Config: getHCLTemplate("azure_blob_storage_collection.tf", values),
Check: resource.ComposeTestCheckFunc(
// check that we still just have two sources
resource.TestCheckResourceAttr("rockset_azure_blob_storage_collection.test", "source.#", "2"),
),
},
},
})
}
Empty file.
50 changes: 50 additions & 0 deletions testdata/azure_blob_storage_collection.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
resource rockset_azure_blob_storage_integration test {
name = "{{ .Name }}"
connection_string = "BlobEndpoint=https://a.blob.core.windows.net/;SharedAccessSignature=sv=2022-11-02&ss=x=co&sp=x=2024-07-28T02:59:32Z&st=2024-07-26T18:59:32Z&spr=https&sig=x"
}

resource rockset_workspace test {
name = "{{ .Workspace }}"
description = "{{ .Description }}"
}

resource rockset_azure_blob_storage_collection test {
name = "{{ .Collection }}"
workspace = rockset_workspace.test.name
description = "{{ .Description }}"
retention_secs = 3600

source {
integration_name = rockset_azure_blob_storage_integration.test.name
container = "sampledatasets"
pattern = "cities.csv"
format = "csv"
csv {
first_line_as_column_names = false
column_names = [
"country",
"city",
"population",
"visited"
]
column_types = [
"STRING",
"STRING",
"STRING",
"STRING"
]
}
}

source {
integration_name = rockset_azure_blob_storage_integration.test.name
container = "sampledatasets"
pattern = "cities.xml"
format = "xml"
xml {
root_tag = "cities"
encoding = "UTF-8"
doc_tag = "city"
}
}
}
23 changes: 23 additions & 0 deletions testdata/azure_blob_storage_collection_json.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
resource rockset_azure_blob_storage_integration test {
name = "{{ .Name }}"
connection_string = "BlobEndpoint=https://a.blob.core.windows.net/;SharedAccessSignature=sv=2022-11-02&ss=x=co&sp=x=2024-07-28T02:59:32Z&st=2024-07-26T18:59:32Z&spr=https&sig=x"
}

resource rockset_workspace test {
name = "{{ .Workspace }}"
description = "{{ .Description }}"
}

resource rockset_azure_blob_storage_collection test {
name = "{{ .Collection }}"
workspace = rockset_workspace.test.name
description = "{{ .Description }}"
retention_secs = 3600

source {
integration_name = rockset_azure_blob_storage_integration.test.name
container = "sampledatasets"
pattern = "product.json"
format = "json"
}
}

0 comments on commit d33b623

Please sign in to comment.