Skip to content

🌱 test/e2e: test scoped indexes #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
golang.org/x/sync v0.11.0
golang.org/x/sys v0.30.0
k8s.io/api v0.32.3
k8s.io/apiextensions-apiserver v0.32.1
k8s.io/apimachinery v0.32.3
k8s.io/client-go v0.32.3
k8s.io/klog/v2 v2.130.1
Expand Down Expand Up @@ -73,7 +74,6 @@ require (
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.32.1 // indirect
k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect
Expand Down
151 changes: 141 additions & 10 deletions test/e2e/apiexport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ import (

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
runtimeschema "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -57,11 +60,11 @@ var _ = Describe("VirtualWorkspace Provider", Ordered, func() {
ctx context.Context
cancel context.CancelFunc

cli clusterclient.ClusterClient
provider, consumer logicalcluster.Path
consumerWS *tenancyv1alpha1.Workspace
mgr mcmanager.Manager
vwEndpoint string
cli clusterclient.ClusterClient
provider, consumer, other logicalcluster.Path
consumerWS *tenancyv1alpha1.Workspace
mgr mcmanager.Manager
vwEndpoint string
)

BeforeAll(func() {
Expand All @@ -73,6 +76,7 @@ var _ = Describe("VirtualWorkspace Provider", Ordered, func() {

_, provider = envtest.NewWorkspaceFixture(GinkgoT(), cli, core.RootCluster.Path(), envtest.WithNamePrefix("provider"))
consumerWS, consumer = envtest.NewWorkspaceFixture(GinkgoT(), cli, core.RootCluster.Path(), envtest.WithNamePrefix("consumer"))
_, other = envtest.NewWorkspaceFixture(GinkgoT(), cli, core.RootCluster.Path(), envtest.WithNamePrefix("other"))

By(fmt.Sprintf("creating a schema in the provider workspace %q", provider))
schema := &apisv1alpha1.APIResourceSchema{
Expand All @@ -94,6 +98,7 @@ var _ = Describe("VirtualWorkspace Provider", Ordered, func() {
Raw: []byte(`{"type":"object","properties":{"spec":{"type":"object","properties":{"message":{"type":"string"}}}}}`),
},
Storage: true,
Served: true,
}},
},
}
Expand Down Expand Up @@ -127,7 +132,7 @@ var _ = Describe("VirtualWorkspace Provider", Ordered, func() {
err = cli.Cluster(provider).Create(ctx, endpoitns)
Expect(err).NotTo(HaveOccurred())

By(fmt.Sprintf("creating an APIBinding in the consumer workspace %q", consumer))
By(fmt.Sprintf("creating an APIBinding in the other workspace %q", other))
binding := &apisv1alpha1.APIBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "example.com",
Expand All @@ -141,6 +146,23 @@ var _ = Describe("VirtualWorkspace Provider", Ordered, func() {
},
},
}
err = cli.Cluster(other).Create(ctx, binding)
Expect(err).NotTo(HaveOccurred())

By(fmt.Sprintf("creating an APIBinding in the consumer workspace %q", consumer))
binding = &apisv1alpha1.APIBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "example.com",
},
Spec: apisv1alpha1.APIBindingSpec{
Reference: apisv1alpha1.BindingReference{
Export: &apisv1alpha1.ExportBindingReference{
Path: provider.String(),
Name: export.Name,
},
},
},
}
err = cli.Cluster(consumer).Create(ctx, binding)
Expect(err).NotTo(HaveOccurred())

Expand All @@ -149,35 +171,104 @@ var _ = Describe("VirtualWorkspace Provider", Ordered, func() {
envtest.Eventually(GinkgoT(), func() (bool, string) {
err := cli.Cluster(provider).Get(ctx, client.ObjectKey{Name: "example.com"}, endpoints)
if err != nil {
return false, fmt.Sprintf("failed to get APIExportEndpointSlice in %s: %v", provider, err)
return false, fmt.Sprintf("failed to get APIExportEndpointSlice in %q: %v", provider, err)
}
return len(endpoints.Status.APIExportEndpoints) > 0, toYAML(GinkgoT(), endpoints)
}, wait.ForeverTestTimeout, time.Millisecond*100, "failed to see endpoints in APIExportEndpointSlice in %s", provider)
}, wait.ForeverTestTimeout, time.Millisecond*100, "failed to see endpoints in APIExportEndpointSlice in %q", provider)
vwEndpoint = endpoints.Status.APIExportEndpoints[0].URL

By(fmt.Sprintf("waiting until the APIBinding in the consumer workspace %q to be ready", consumer))
envtest.Eventually(GinkgoT(), func() (bool, string) {
current := &apisv1alpha1.APIBinding{}
err := cli.Cluster(consumer).Get(ctx, client.ObjectKey{Name: "example.com"}, current)
if err != nil {
return false, fmt.Sprintf("failed to get APIBinding in %q: %v", consumer, err)
}
if current.Status.Phase != apisv1alpha1.APIBindingPhaseBound {
return false, fmt.Sprintf("binding not bound:\n\n%s", toYAML(GinkgoT(), current))
}
return true, ""
}, wait.ForeverTestTimeout, time.Millisecond*100, "failed to wait for APIBinding in consumer workspace to be ready %q", consumer)

By("waiting until things can be listed in the consumer workspace")
envtest.Eventually(GinkgoT(), func() (bool, string) {
u := &unstructured.UnstructuredList{}
u.SetGroupVersionKind(runtimeschema.GroupVersionKind{Group: "example.com", Version: "v1", Kind: "ThingList"})
err = cli.Cluster(consumer).List(ctx, u)
if err != nil {
return false, fmt.Sprintf("failed to list things in %s: %v", consumer, err)
}
return true, ""
}, wait.ForeverTestTimeout, time.Millisecond*100, "failed to wait for things to be listable in consumer workspace %q", consumer)
})

Describe("with a multicluster provider and manager", func() {
var (
lock sync.RWMutex
engaged = sets.NewString()
p *virtualworkspace.Provider
g *errgroup.Group
cancelGroup context.CancelFunc
)

BeforeAll(func() {
By("creating a stone in the consumer workspace", func() {
thing := &unstructured.Unstructured{}
thing.SetGroupVersionKind(runtimeschema.GroupVersionKind{Group: "example.com", Version: "v1", Kind: "Thing"})
thing.SetName("stone")
thing.SetLabels(map[string]string{"color": "gray"})
err := cli.Cluster(consumer).Create(ctx, thing)
Expect(err).NotTo(HaveOccurred())
})

By("creating a box in the other workspace", func() {
thing := &unstructured.Unstructured{}
thing.SetGroupVersionKind(runtimeschema.GroupVersionKind{Group: "example.com", Version: "v1", Kind: "Thing"})
thing.SetName("box")
thing.SetLabels(map[string]string{"color": "white"})
err := cli.Cluster(other).Create(ctx, thing)
Expect(err).NotTo(HaveOccurred())
})

By("creating a multicluster provider for APIBindings against the apiexport virtual workspace")
vwConfig := rest.CopyConfig(kcpConfig)
vwConfig.Host = vwEndpoint
p, err := virtualworkspace.New(vwConfig, &apisv1alpha1.APIBinding{}, virtualworkspace.Options{})
var err error
p, err = virtualworkspace.New(vwConfig, &apisv1alpha1.APIBinding{}, virtualworkspace.Options{})
Expect(err).NotTo(HaveOccurred())

By("waiting for discovery of the virtual workspace to show 'example.com'")
wildcardConfig := rest.CopyConfig(vwConfig)
wildcardConfig.Host += logicalcluster.Wildcard.RequestPath()
disc, err := discovery.NewDiscoveryClientForConfig(wildcardConfig)
Expect(err).NotTo(HaveOccurred())
envtest.Eventually(GinkgoT(), func() (bool, string) {
ret, err := disc.ServerGroups()
Expect(err).NotTo(HaveOccurred())
for _, g := range ret.Groups {
if g.Name == "example.com" {
return true, ""
}
}
return false, fmt.Sprintf("failed to find group example.com in:\n%s", toYAML(GinkgoT(), ret))
}, wait.ForeverTestTimeout, time.Millisecond*100, "failed to find group example.com in the virtual workspace")

By("creating a manager against the provider workspace")
rootConfig := rest.CopyConfig(kcpConfig)
rootConfig.Host += provider.RequestPath()
mgr, err = mcmanager.New(rootConfig, p, mcmanager.Options{})
Expect(err).NotTo(HaveOccurred())

By("creating a reconciler for the APIBinding")
By("adding an index on label 'color'")
thing := &unstructured.Unstructured{}
thing.SetGroupVersionKind(runtimeschema.GroupVersionKind{Group: "example.com", Version: "v1", Kind: "Thing"})
err = mgr.GetFieldIndexer().IndexField(ctx, thing, "color", func(obj client.Object) []string {
u := obj.(*unstructured.Unstructured)
return []string{u.GetLabels()["color"]}
})
Expect(err).NotTo(HaveOccurred())

By("creating a reconciler for APIBindings")
err = mcbuilder.ControllerManagedBy(mgr).
Named("things").
For(&apisv1alpha1.APIBinding{}).
Expand Down Expand Up @@ -211,6 +302,46 @@ var _ = Describe("VirtualWorkspace Provider", Ordered, func() {
}, wait.ForeverTestTimeout, time.Millisecond*100, "failed to see the consumer workspace %q as a cluster", consumer)
})

It("sees only the stone in the consumer clusters", func() {
consumerCl, err := mgr.GetCluster(ctx, consumerWS.Spec.Cluster)
Expect(err).NotTo(HaveOccurred())

envtest.Eventually(GinkgoT(), func() (success bool, reason string) {
l := &unstructured.UnstructuredList{}
l.SetGroupVersionKind(runtimeschema.GroupVersionKind{Group: "example.com", Version: "v1", Kind: "ThingList"})
err = consumerCl.GetCache().List(ctx, l)
if err != nil {
return false, fmt.Sprintf("failed to list things in the consumer cluster cache: %v", err)
}
if len(l.Items) != 1 {
return false, fmt.Sprintf("expected 1 item, got %d\n\n%s", len(l.Items), toYAML(GinkgoT(), l.Object))
} else if name := l.Items[0].GetName(); name != "stone" {
return false, fmt.Sprintf("expected item name to be stone, got %q\n\n%s", name, toYAML(GinkgoT(), l.Items[0]))
}
return true, ""
}, wait.ForeverTestTimeout, time.Millisecond*100, "failed to see the stone in the consumer cluster")
})

It("sees only the stone as grey thing in the consumer clusters", func() {
consumerCl, err := mgr.GetCluster(ctx, consumerWS.Spec.Cluster)
Expect(err).NotTo(HaveOccurred())

envtest.Eventually(GinkgoT(), func() (success bool, reason string) {
l := &unstructured.UnstructuredList{}
l.SetGroupVersionKind(runtimeschema.GroupVersionKind{Group: "example.com", Version: "v1", Kind: "ThingList"})
err = consumerCl.GetCache().List(ctx, l, client.MatchingFields{"color": "gray"})
if err != nil {
return false, fmt.Sprintf("failed to list things in the consumer cluster cache: %v", err)
}
if len(l.Items) != 1 {
return false, fmt.Sprintf("expected 1 item, got %d\n\n%s", len(l.Items), toYAML(GinkgoT(), l.Object))
} else if name := l.Items[0].GetName(); name != "stone" {
return false, fmt.Sprintf("expected item name to be stone, got %q\n\n%s", name, toYAML(GinkgoT(), l.Items[0]))
}
return true, ""
}, wait.ForeverTestTimeout, time.Millisecond*100, "failed to see the stone as only thing of color 'grey' in the consumer cluster")
})

AfterAll(func() {
cancelGroup()
err := g.Wait()
Expand Down
12 changes: 9 additions & 3 deletions virtualworkspace/forked_cache_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func byIndexes(indexer cache.Indexer, requires fields.Requirements, clusterName
indexers := indexer.GetIndexers()
_, isClusterAware := indexers[kcpcache.ClusterAndNamespaceIndexName]
for idx, req := range requires {
indexName := fieldIndexName(req.Field)
indexName := fieldIndexName(isClusterAware, req.Field)
var indexedValue string
if isClusterAware {
indexedValue = keyToClusteredKey(clusterName.String(), namespace, req.Value)
Expand Down Expand Up @@ -270,7 +270,10 @@ func objectKeyToStoreKey(k client.ObjectKey) string {

// fieldIndexName constructs the name of the index over the given field,
// for use with an indexer.
func fieldIndexName(field string) string {
func fieldIndexName(clusterAware bool, field string) string {
if clusterAware {
return "field:cluster/" + field
}
return "field:" + field
}

Expand All @@ -289,7 +292,10 @@ func keyToNamespacedKey(ns string, baseKey string) string {
// keyToClusteredKey prefixes the given index key with a cluster name
// for use in field selector indexes.
func keyToClusteredKey(clusterName string, ns string, baseKey string) string {
return clusterName + "|" + keyToNamespacedKey(ns, baseKey)
if ns != "" {
return ns + "/" + clusterName + "/" + baseKey
}
return allNamespacesNamespace + "/" + clusterName + "/" + baseKey
}

// requiresExactMatch checks if the given field selector is of the form `k=v` or `k==v`.
Expand Down