Skip to content

🌱 Fix indexes #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ linters:
- errchkjson
- errorlint
- exhaustive
- gci
- ginkgolinter
- goconst
- gocritic
Expand Down Expand Up @@ -53,6 +54,18 @@ linters:
- whitespace

linters-settings:
gci:
custom-order: true
sections:
- standard
- default
- prefix(k8s.io,sigs.k8s.io)
- prefix(github.com/kcp-dev)
- prefix(github.com/multicluster-runtime)
- prefix(github.com/kcp-dev/multicluster-provider)
- blank
- dot
skip-generated: true
govet:
enable-all: true
disable:
Expand Down
21 changes: 9 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@ $(GOLANGCI_LINT):
golangci-lint \
${GOLANGCI_LINT_VERSION}

GIMPS = _tools/gimps
GIMPS_VERSION = 0.6.0

.PHONY: $(GIMPS)
$(GIMPS):
@hack/download-tool.sh \
https://github.com/xrstf/gimps/releases/download/v${GIMPS_VERSION}/gimps_${GIMPS_VERSION}_${GOOS}_${GOARCH}.tar.gz \
gimps \
${GIMPS_VERSION}

WWHRD = _tools/wwhrd
WWHRD_VERSION = 0.4.0

Expand Down Expand Up @@ -100,8 +90,15 @@ lint: $(GOLANGCI_LINT)
./...

.PHONY: imports
imports: $(GIMPS)
$(GIMPS) .
imports: WHAT ?=
imports: $(GOLANGCI_LINT)
@if [ -n "$(WHAT)" ]; then \
$(GOLANGCI_LINT) run --enable-only=gci --fix --fast $(WHAT); \
else \
for MOD in . $$(git ls-files '**/go.mod' | sed 's,/go.mod,,'); do \
(cd $$MOD; $(GOLANGCI_LINT) run --enable-only=gci --fix --fast); \
done; \
fi

.PHONY: verify
verify:
Expand Down
67 changes: 67 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package client

import (
"sync"

"github.com/hashicorp/golang-lru/v2"

"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kcp-dev/logicalcluster/v3"
)

// ClusterClient is a cluster-aware client.
type ClusterClient interface {
// Cluster returns the client for the given cluster.
Cluster(cluster logicalcluster.Path) (client.Client, error)
}

// clusterClient is a multi-cluster-aware client.
type clusterClient struct {
baseConfig *rest.Config
opts client.Options

lock sync.RWMutex
cache *lru.Cache[logicalcluster.Path, client.Client]
}

// New creates a new cluster-aware client.
func New(cfg *rest.Config, options client.Options) (ClusterClient, error) {
ca, err := lru.New[logicalcluster.Path, client.Client](100)
if err != nil {
return nil, err
}
return &clusterClient{
opts: options,
baseConfig: cfg,
cache: ca,
}, nil
}

func (c *clusterClient) Cluster(cluster logicalcluster.Path) (client.Client, error) {
// quick path
c.lock.RLock()
cli, ok := c.cache.Get(cluster)
c.lock.RUnlock()
if ok {
return cli, nil
}

// slow path
c.lock.Lock()
defer c.lock.Unlock()
if cli, ok := c.cache.Get(cluster); ok {
return cli, nil
}

// cache miss
cfg := rest.CopyConfig(c.baseConfig)
cfg.Host = cfg.Host + cluster.RequestPath()
cli, err := client.New(cfg, c.opts)
if err != nil {
return nil, err
}
c.cache.Add(cluster, cli)
return cli, nil
}
3 changes: 3 additions & 0 deletions envtest/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package envtest provides a test environment for testing code against a
// kcp control plane.
package envtest
30 changes: 30 additions & 0 deletions envtest/internal/addr/addr_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
Copyright 2021 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package addr_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestAddr(t *testing.T) {
t.Parallel()
RegisterFailHandler(Fail)
RunSpecs(t, "Addr Suite")
}
142 changes: 142 additions & 0 deletions envtest/internal/addr/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
Copyright 2021 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package addr

import (
"errors"
"fmt"
"io/fs"
"net"
"os"
"path/filepath"
"strings"
"time"

"github.com/kcp-dev/multicluster-provider/envtest/internal/flock"
)

// TODO(directxman12): interface / release functionality for external port managers

const (
portReserveTime = 2 * time.Minute
portConflictRetry = 100
portFilePrefix = "port-"
)

var (
cacheDir string
)

func init() {
baseDir, err := os.UserCacheDir()
if err == nil {
cacheDir = filepath.Join(baseDir, "kubebuilder-envtest")
err = os.MkdirAll(cacheDir, 0o750)
}
if err != nil {
// Either we didn't get a cache directory, or we can't use it
baseDir = os.TempDir()
cacheDir = filepath.Join(baseDir, "kubebuilder-envtest")
err = os.MkdirAll(cacheDir, 0o750)
}
if err != nil {
panic(err)
}
}

type portCache struct{}

func (c *portCache) add(port int) (bool, error) {
// Remove outdated ports.
if err := fs.WalkDir(os.DirFS(cacheDir), ".", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() || !d.Type().IsRegular() || !strings.HasPrefix(path, portFilePrefix) {
return nil
}
info, err := d.Info()
if err != nil {
// No-op if file no longer exists; may have been deleted by another
// process/thread trying to allocate ports.
if errors.Is(err, fs.ErrNotExist) {
return nil
}
return err
}
if time.Since(info.ModTime()) > portReserveTime {
if err := os.Remove(filepath.Join(cacheDir, path)); err != nil {
// No-op if file no longer exists; may have been deleted by another
// process/thread trying to allocate ports.
if os.IsNotExist(err) {
return nil
}
return err
}
}
return nil
}); err != nil {
return false, err
}
// Try allocating new port, by acquiring a file.
path := fmt.Sprintf("%s/%s%d", cacheDir, portFilePrefix, port)
if err := flock.Acquire(path); errors.Is(err, flock.ErrAlreadyLocked) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
}

var cache = &portCache{}

func suggest(listenHost string) (*net.TCPListener, int, string, error) {
if listenHost == "" {
listenHost = "localhost"
}
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(listenHost, "0"))
if err != nil {
return nil, -1, "", err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, -1, "", err
}
return l, l.Addr().(*net.TCPAddr).Port,
addr.IP.String(),
nil
}

// Suggest suggests an address a process can listen on. It returns
// a tuple consisting of a free port and the hostname resolved to its IP.
// It makes sure that new port allocated does not conflict with old ports
// allocated within 1 minute.
func Suggest(listenHost string) (int, string, error) {
for i := 0; i < portConflictRetry; i++ {
listener, port, resolvedHost, err := suggest(listenHost)
if err != nil {
return -1, "", err
}
defer listener.Close()
if ok, err := cache.add(port); ok {
return port, resolvedHost, nil
} else if err != nil {
return -1, "", err
}
}
return -1, "", fmt.Errorf("no free ports found after %d retries", portConflictRetry)
}
77 changes: 77 additions & 0 deletions envtest/internal/addr/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright 2021 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package addr_test

import (
"net"
"strconv"

"github.com/kcp-dev/multicluster-provider/envtest/internal/addr"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("SuggestAddress", func() {
It("returns a free port and an address to bind to", func() {
port, host, err := addr.Suggest("")

Expect(err).NotTo(HaveOccurred())
Expect(host).To(Or(Equal("127.0.0.1"), Equal("::1")))
Expect(port).NotTo(Equal(0))

addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(host, strconv.Itoa(port)))
Expect(err).NotTo(HaveOccurred())
l, err := net.ListenTCP("tcp", addr)
defer func() {
Expect(l.Close()).To(Succeed())
}()
Expect(err).NotTo(HaveOccurred())
})

It("supports an explicit listenHost", func() {
port, host, err := addr.Suggest("localhost")

Expect(err).NotTo(HaveOccurred())
Expect(host).To(Or(Equal("127.0.0.1"), Equal("::1")))
Expect(port).NotTo(Equal(0))

addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(host, strconv.Itoa(port)))
Expect(err).NotTo(HaveOccurred())
l, err := net.ListenTCP("tcp", addr)
defer func() {
Expect(l.Close()).To(Succeed())
}()
Expect(err).NotTo(HaveOccurred())
})

It("supports a 0.0.0.0 listenHost", func() {
port, host, err := addr.Suggest("0.0.0.0")

Expect(err).NotTo(HaveOccurred())
Expect(host).To(Equal("0.0.0.0"))
Expect(port).NotTo(Equal(0))

addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(host, strconv.Itoa(port)))
Expect(err).NotTo(HaveOccurred())
l, err := net.ListenTCP("tcp", addr)
defer func() {
Expect(l.Close()).To(Succeed())
}()
Expect(err).NotTo(HaveOccurred())
})
})
Loading