diff --git a/cmd/katalyst-agent/app/options/sysadvisor/qosaware/resource/cpu/cpu_isolation.go b/cmd/katalyst-agent/app/options/sysadvisor/qosaware/resource/cpu/cpu_isolation.go index 233936f38..3c579125d 100644 --- a/cmd/katalyst-agent/app/options/sysadvisor/qosaware/resource/cpu/cpu_isolation.go +++ b/cmd/katalyst-agent/app/options/sysadvisor/qosaware/resource/cpu/cpu_isolation.go @@ -48,8 +48,15 @@ type CPUIsolationOptions struct { IsolationLockInThreshold int IsolationLockOutPeriodSecs int - IsolationDisabled bool - IsolationDisabledPools []string + // IsolationDisabled is used to disable all isolation. + // IsolationDisabledPools indicates the pools where pods will not be isolated. + // IsolationForceEnablePools indicates the pools where pods must be isolated, even if the pool + // is listed in IsolationDisabledPools. + // IsolationNonExclusivePools indicates the pools where pods will not be exclusively isolated. + IsolationDisabled bool + IsolationDisabledPools []string + IsolationForceEnablePools []string + IsolationNonExclusivePools []string } // NewCPUIsolationOptions creates a new Options with a default config @@ -67,8 +74,10 @@ func NewCPUIsolationOptions() *CPUIsolationOptions { IsolationLockInThreshold: 3, IsolationLockOutPeriodSecs: 120, - IsolationDisabled: true, - IsolationDisabledPools: []string{}, + IsolationDisabled: true, + IsolationDisabledPools: []string{}, + IsolationForceEnablePools: []string{}, + IsolationNonExclusivePools: []string{}, } } @@ -101,7 +110,11 @@ func (o *CPUIsolationOptions) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&o.IsolationDisabled, "isolation-disable", o.IsolationDisabled, "if set as true, disable the isolation logic") fs.StringArrayVar(&o.IsolationDisabledPools, "isolation-disable-pools", o.IsolationDisabledPools, - "if set as true, disable the isolation logic for the given pool") + "disable the isolation logic for the given pool") + fs.StringArrayVar(&o.IsolationForceEnablePools, "isolation-force-enable-pools", o.IsolationForceEnablePools, + "isolation force enable for get given pool") + fs.StringArrayVar(&o.IsolationNonExclusivePools, "isolation-non-exclusive-pools", o.IsolationNonExclusivePools, + "isolation is non-exclusive for get given pool") } // ApplyTo fills up config with options @@ -142,6 +155,8 @@ func (o *CPUIsolationOptions) ApplyTo(c *cpu.CPUIsolationConfiguration) error { c.IsolationDisabled = o.IsolationDisabled c.IsolationDisabledPools = sets.NewString(o.IsolationDisabledPools...) + c.IsolationForceEnablePools = sets.NewString(o.IsolationForceEnablePools...) + c.IsolationNonExclusivePools = sets.NewString(o.IsolationNonExclusivePools...) return nil } diff --git a/pkg/agent/sysadvisor/plugin/qosaware/qos_aware.go b/pkg/agent/sysadvisor/plugin/qosaware/qos_aware.go index 05043ba3f..ffb31a281 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/qos_aware.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/qos_aware.go @@ -66,7 +66,7 @@ func NewQoSAwarePlugin(pluginName string, conf *config.Configuration, extraConf return nil, err } - qrmServer, err := server.NewQRMServer(resourceAdvisor, conf, metaCache, emitter) + qrmServer, err := server.NewQRMServer(resourceAdvisor, conf, metaCache, metaServer, emitter) if err != nil { return nil, err } diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go index bbc66c45a..09b93143c 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go @@ -374,7 +374,7 @@ func (cra *cpuResourceAdvisor) assignContainersToRegions() error { if ci.OwnerPoolName == state.PoolNameDedicated { // dedicated pool should not exist in metaCache.poolEntries return true - } else if ci.Isolated { + } else if ci.Isolated || cra.conf.IsolationForceEnablePools.Has(ci.OriginOwnerPoolName) { // isolated pool should not exist in metaCache.poolEntries return true } else { @@ -412,16 +412,35 @@ func (cra *cpuResourceAdvisor) assignToRegions(ci *types.ContainerInfo) ([]regio } // assign isolated container - if ci.Isolated { - // if there already exists an isolation region for this pod, just reuse it - regions, err := cra.getContainerRegions(ci, types.QoSRegionTypeIsolation) - if err != nil { - return nil, err - } else if len(regions) > 0 { - return regions, nil + if ci.Isolated || cra.conf.IsolationForceEnablePools.Has(ci.OriginOwnerPoolName) { + regionName := "" + if cra.conf.IsolationNonExclusivePools.Has(ci.OriginOwnerPoolName) { + // use origin owner pool name as region name, because all the container in this pool + // share only one region which is non-exclusive + regionName = ci.OriginOwnerPoolName + + // if there already exists a non-exclusive isolation region for this pod, just reuse it + regions := cra.getPoolRegions(regionName) + if len(regions) > 0 { + return regions, nil + } + + // if there already exists a region with same name as this region, just reuse it + regions = cra.getRegionsByRegionNames(sets.NewString(regionName)) + if len(regions) > 0 { + return regions, nil + } + } else { + // if there already exists an isolation region for this pod, just reuse it + regions, err := cra.getContainerRegions(ci, types.QoSRegionTypeIsolation) + if err != nil { + return nil, err + } else if len(regions) > 0 { + return regions, nil + } } - r := region.NewQoSRegionIsolation(ci, cra.conf, cra.extraConf, cra.metaCache, cra.metaServer, cra.emitter) + r := region.NewQoSRegionIsolation(ci, regionName, cra.conf, cra.extraConf, cra.metaCache, cra.metaServer, cra.emitter) return []region.QoSRegion{r}, nil } diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/isolation/isolator_load.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/isolation/isolator_load.go index 041d26cd6..8b1d7cb85 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/isolation/isolator_load.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/isolation/isolator_load.go @@ -101,6 +101,7 @@ func (l *LoadIsolator) GetIsolatedPods() []string { existed.Insert(containerMeta(ci)) if l.checkContainerIsolated(ci, isolationResources) { + general.Infof("add container %s from pod %s/%s to isolation", ci.ContainerName, ci.PodNamespace, ci.PodName) uidSets.Insert(ci.PodUID) } } diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/region/region_base.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/region/region_base.go index bd5259dd3..2564b1b5c 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/region/region_base.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/region/region_base.go @@ -274,8 +274,8 @@ func (r *QoSRegionBase) GetProvision() (types.ControlKnob, error) { r.provisionPolicyNameInUse = internal.name if r.provisionPolicyNameInUse != oldProvisionPolicyNameInUse { - klog.Infof("[qosaware-cpu] region: %s provision policy switch from %s to %s", - r.Name, oldProvisionPolicyNameInUse, r.provisionPolicyNameInUse) + klog.Infof("[qosaware-cpu] region: %v provision policy switch from %v to %v", + r.Name(), oldProvisionPolicyNameInUse, r.provisionPolicyNameInUse) if r.enableBorweinModel { r.borweinController.ResetIndicatorOffsets() } diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/region/region_isolation.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/region/region_isolation.go index bd2906bdc..a70905a4b 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/region/region_isolation.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/region/region_isolation.go @@ -27,19 +27,22 @@ import ( "github.com/kubewharf/katalyst-core/pkg/metrics" ) -var isolationRegionDefaultOwnerPoolName = "isolation-default" +const isolationRegionDefaultOwnerPoolName = "isolation-default" type QoSRegionIsolation struct { *QoSRegionBase } // NewQoSRegionIsolation returns a region instance for isolated pods -func NewQoSRegionIsolation(ci *types.ContainerInfo, conf *config.Configuration, extraConf interface{}, +func NewQoSRegionIsolation(ci *types.ContainerInfo, customRegionName string, conf *config.Configuration, extraConf interface{}, metaReader metacache.MetaReader, metaServer *metaserver.MetaServer, emitter metrics.MetricEmitter) QoSRegion { - regionName := getRegionNameFromMetaCache(ci, cpuadvisor.FakedNUMAID, metaReader) + regionName := customRegionName if regionName == "" { - regionName = string(types.QoSRegionTypeIsolation) + types.RegionNameSeparator + ci.PodName + types.RegionNameSeparator + string(uuid.NewUUID()) + regionName = getRegionNameFromMetaCache(ci, cpuadvisor.FakedNUMAID, metaReader) + if regionName == "" { + regionName = string(types.QoSRegionTypeIsolation) + types.RegionNameSeparator + ci.PodName + types.RegionNameSeparator + string(uuid.NewUUID()) + } } r := &QoSRegionIsolation{ diff --git a/pkg/agent/sysadvisor/plugin/qosaware/server/base_server.go b/pkg/agent/sysadvisor/plugin/qosaware/server/base_server.go index a3625ece0..ff9b7d598 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/server/base_server.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/server/base_server.go @@ -33,6 +33,8 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/config/generic" + "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/general" ) @@ -63,23 +65,28 @@ type baseServer struct { resourceRequestName string resourceLimitName string - metaCache metacache.MetaCache - emitter metrics.MetricEmitter + qosConf *generic.QoSConfiguration + + metaCache metacache.MetaCache + metaServer *metaserver.MetaServer + emitter metrics.MetricEmitter grpcServer *grpc.Server resourceServer subQRMServer } func newBaseServer(name string, conf *config.Configuration, recvCh interface{}, sendCh chan types.TriggerInfo, - metaCache metacache.MetaCache, emitter metrics.MetricEmitter, resourceServer subQRMServer) *baseServer { + metaCache metacache.MetaCache, metaServer *metaserver.MetaServer, emitter metrics.MetricEmitter, resourceServer subQRMServer) *baseServer { return &baseServer{ name: name, period: conf.QoSAwarePluginConfiguration.SyncPeriod, + qosConf: conf.QoSConfiguration, recvCh: recvCh, sendCh: sendCh, lwCalledChan: make(chan struct{}), stopCh: make(chan struct{}), metaCache: metaCache, + metaServer: metaServer, emitter: emitter, resourceServer: resourceServer, } diff --git a/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go b/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go index 065e632fa..ddd41c6cd 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go @@ -22,6 +22,7 @@ import ( "time" "google.golang.org/grpc" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" @@ -33,6 +34,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/general" "github.com/kubewharf/katalyst-core/pkg/util/machine" @@ -49,9 +51,9 @@ type cpuServer struct { } func NewCPUServer(recvCh chan types.InternalCPUCalculationResult, sendCh chan types.TriggerInfo, conf *config.Configuration, - metaCache metacache.MetaCache, emitter metrics.MetricEmitter) (*cpuServer, error) { + metaCache metacache.MetaCache, metaServer *metaserver.MetaServer, emitter metrics.MetricEmitter) (*cpuServer, error) { cs := &cpuServer{} - cs.baseServer = newBaseServer(cpuServerName, conf, recvCh, sendCh, metaCache, emitter, cs) + cs.baseServer = newBaseServer(cpuServerName, conf, recvCh, sendCh, metaCache, metaServer, emitter, cs) cs.advisorSocketPath = conf.CPUAdvisorSocketAbsPath cs.pluginSocketPath = conf.CPUPluginSocketAbsPath cs.resourceRequestName = "CPURequest" @@ -106,7 +108,7 @@ func (cs *cpuServer) ListAndWatch(_ *advisorsvc.Empty, server cpuadvisor.CPUAdvi // Assemble pod entries f := func(podUID string, containerName string, ci *types.ContainerInfo) bool { if err := cs.assemblePodEntries(calculationEntriesMap, blockID2Blocks, podUID, ci); err != nil { - klog.Errorf("[qosaware-server-cpu] assemblePodEntries err: %v", err) + klog.Errorf("[qosaware-server-cpu] assemblePodEntries for pod %s/%s uid %s err: %v", ci.PodNamespace, ci.PodName, ci.PodUID, err) } return true } @@ -125,8 +127,9 @@ func (cs *cpuServer) ListAndWatch(_ *advisorsvc.Empty, server cpuadvisor.CPUAdvi } func (cs *cpuServer) getCheckpoint() { + ctx := context.Background() // get checkpoint - resp, err := cs.cpuPluginClient.GetCheckpoint(context.Background(), &cpuadvisor.GetCheckpointRequest{}) + resp, err := cs.cpuPluginClient.GetCheckpoint(ctx, &cpuadvisor.GetCheckpointRequest{}) if err != nil { klog.Errorf("[qosaware-server-cpu] get checkpoint failed: %v", err) _ = cs.emitter.StoreInt64(cs.genMetricsName(metricServerLWGetCheckpointFailed), int64(cs.period.Seconds()), metrics.MetricTypeNameCount) @@ -156,8 +159,14 @@ func (cs *cpuServer) getCheckpoint() { for entryName, entry := range resp.Entries { if _, ok := entry.Entries[cpuadvisor.FakedContainerName]; !ok { podUID := entryName + pod, err := cs.metaServer.GetPod(ctx, podUID) + if err != nil { + klog.Errorf("[qosaware-server-cpu] get pod info with error: %v", err) + continue + } + for containerName, info := range entry.Entries { - if err := cs.updateContainerInfo(podUID, containerName, info); err != nil { + if err := cs.updateContainerInfo(podUID, containerName, pod, info); err != nil { klog.Errorf("[qosaware-server-cpu] update container info with error: %v", err) } } @@ -227,7 +236,7 @@ func (cs *cpuServer) updatePoolInfo(poolName string, info *cpuadvisor.Allocation return cs.metaCache.SetPoolInfo(poolName, pi) } -func (cs *cpuServer) updateContainerInfo(podUID string, containerName string, info *cpuadvisor.AllocationInfo) error { +func (cs *cpuServer) updateContainerInfo(podUID string, containerName string, pod *v1.Pod, info *cpuadvisor.AllocationInfo) error { ci, ok := cs.metaCache.GetContainerInfo(podUID, containerName) if !ok { return fmt.Errorf("container %v/%v not exist", podUID, containerName) @@ -236,12 +245,27 @@ func (cs *cpuServer) updateContainerInfo(podUID string, containerName string, in ci.RampUp = info.RampUp ci.TopologyAwareAssignments = machine.TransformCPUAssignmentFormat(info.TopologyAwareAssignments) ci.OriginalTopologyAwareAssignments = machine.TransformCPUAssignmentFormat(info.OriginalTopologyAwareAssignments) + ci.OwnerPoolName = info.OwnerPoolName - // only reset pool-name according to QRM during starting process - if len(ci.OriginOwnerPoolName) == 0 { - ci.OriginOwnerPoolName = info.OwnerPoolName + // get qos level name according to the qos conf + qosLevel, err := cs.qosConf.GetQoSLevelForPod(pod) + if err != nil { + return fmt.Errorf("container %v/%v get qos level failed", podUID, containerName) + } + if ci.QoSLevel != qosLevel { + general.Infof("qos level has change from %s to %s", ci.QoSLevel, qosLevel) + ci.QoSLevel = qosLevel + } + + // get origin owner pool name according to the qos conf + originOwnerPoolName, err := cs.qosConf.GetSpecifiedPoolNameForPod(pod) + if err != nil { + return fmt.Errorf("container %v/%v get origin owner pool name failed", podUID, containerName) + } + if ci.OriginOwnerPoolName != originOwnerPoolName { + general.Infof("OriginOwnerPoolName has change from %s to %s", ci.OriginOwnerPoolName, originOwnerPoolName) + ci.OriginOwnerPoolName = originOwnerPoolName } - ci.OwnerPoolName = info.OwnerPoolName // fill in topology aware assignment for containers with owner pool if ci.QoSLevel != consts.PodAnnotationQoSLevelDedicatedCores { @@ -291,7 +315,7 @@ func (cs *cpuServer) assemblePodEntries(calculationEntriesMap map[string]*cpuadv } } // if isolation is locking out, pass original owner pool instead of owner pool - if !ci.Isolated && qrmstate.IsIsolationPool(ci.OwnerPoolName) { + if !ci.Isolated && ci.OwnerPoolName != ci.OriginOwnerPoolName { calculationInfo.OwnerPoolName = ci.OriginOwnerPoolName } diff --git a/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server_test.go b/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server_test.go index 2580a4c4b..ceb145120 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server_test.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server_test.go @@ -28,6 +28,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "github.com/kubewharf/katalyst-api/pkg/consts" @@ -38,7 +40,10 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent" "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" "github.com/kubewharf/katalyst-core/pkg/metrics" metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool" ) @@ -62,7 +67,7 @@ func generateTestConfiguration(t *testing.T) *config.Configuration { return conf } -func newTestCPUServer(t *testing.T) *cpuServer { +func newTestCPUServer(t *testing.T, podList []*v1.Pod) *cpuServer { recvCh := make(chan types.InternalCPUCalculationResult) sendCh := make(chan types.TriggerInfo) conf := generateTestConfiguration(t) @@ -72,7 +77,15 @@ func newTestCPUServer(t *testing.T) *cpuServer { require.NoError(t, err) require.NotNil(t, metaCache) - cpuServer, err := NewCPUServer(recvCh, sendCh, conf, metaCache, metrics.DummyMetrics{}) + metaServer := &metaserver.MetaServer{ + MetaAgent: &agent.MetaAgent{ + PodFetcher: &pod.PodFetcherStub{ + PodList: podList, + }, + }, + } + + cpuServer, err := NewCPUServer(recvCh, sendCh, conf, metaCache, metaServer, metrics.DummyMetrics{}) require.NoError(t, err) require.NotNil(t, cpuServer) @@ -84,7 +97,7 @@ func newTestCPUServer(t *testing.T) *cpuServer { func TestCPUServerStartAndStop(t *testing.T) { t.Parallel() - cs := newTestCPUServer(t) + cs := newTestCPUServer(t, []*v1.Pod{}) err := cs.Start() assert.NoError(t, err) @@ -136,7 +149,7 @@ func TestCPUServerAddContainer(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cs := newTestCPUServer(t) + cs := newTestCPUServer(t, []*v1.Pod{}) got, err := cs.AddContainer(context.Background(), tt.request) if (err != nil) != tt.wantErr { t.Errorf("AddContainer() error = %v, wantErr %v", err, tt.wantErr) @@ -175,7 +188,7 @@ func TestCPUServerRemovePod(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cs := newTestCPUServer(t) + cs := newTestCPUServer(t, []*v1.Pod{}) got, err := cs.RemovePod(context.Background(), tt.request) if (err != nil) != tt.wantErr { t.Errorf("RemovePod() error = %v, wantErr %v", err, tt.wantErr) @@ -232,6 +245,7 @@ func TestCPUServerListAndWatch(t *testing.T) { type ContainerInfo struct { request *advisorsvc.ContainerMetadata + podInfo *v1.Pod allocationInfo *cpuadvisor.AllocationInfo isolated bool regions sets.String @@ -262,6 +276,23 @@ func TestCPUServerListAndWatch(t *testing.T) { ContainerName: "c1", QosLevel: consts.PodAnnotationQoSLevelSharedCores, }, + podInfo: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", + UID: "pod1", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + }, + }, + }, + }, allocationInfo: &cpuadvisor.AllocationInfo{ OwnerPoolName: state.PoolNameShare, }, @@ -352,6 +383,24 @@ func TestCPUServerListAndWatch(t *testing.T) { }, QosLevel: consts.PodAnnotationQoSLevelDedicatedCores, }, + podInfo: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", + UID: "pod1", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementKey: "{\"numa_exclusive\":true}", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + }, + }, + }, + }, allocationInfo: &cpuadvisor.AllocationInfo{ OwnerPoolName: state.PoolNameDedicated, TopologyAwareAssignments: map[uint64]string{ @@ -464,6 +513,24 @@ func TestCPUServerListAndWatch(t *testing.T) { }, QosLevel: consts.PodAnnotationQoSLevelDedicatedCores, }, + podInfo: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", + UID: "pod1", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementKey: "{\"numa_exclusive\":true}", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c2", + }, + }, + }, + }, allocationInfo: &cpuadvisor.AllocationInfo{ OwnerPoolName: state.PoolNameDedicated, TopologyAwareAssignments: map[uint64]string{ @@ -481,6 +548,24 @@ func TestCPUServerListAndWatch(t *testing.T) { }, QosLevel: consts.PodAnnotationQoSLevelDedicatedCores, }, + podInfo: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", + UID: "pod1", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementKey: "{\"numa_exclusive\":true}", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c2", + }, + }, + }, + }, allocationInfo: &cpuadvisor.AllocationInfo{ OwnerPoolName: state.PoolNameDedicated, TopologyAwareAssignments: map[uint64]string{ @@ -670,6 +755,24 @@ func TestCPUServerListAndWatch(t *testing.T) { }, QosLevel: consts.PodAnnotationQoSLevelDedicatedCores, }, + podInfo: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", + UID: "pod1", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementKey: "{\"numa_exclusive\":true}", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + }, + }, + }, + }, allocationInfo: &cpuadvisor.AllocationInfo{ OwnerPoolName: state.PoolNameDedicated, TopologyAwareAssignments: map[uint64]string{ @@ -687,6 +790,24 @@ func TestCPUServerListAndWatch(t *testing.T) { }, QosLevel: consts.PodAnnotationQoSLevelDedicatedCores, }, + podInfo: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", + UID: "pod1", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementKey: "{\"numa_exclusive\":true}", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c2", + }, + }, + }, + }, allocationInfo: &cpuadvisor.AllocationInfo{ OwnerPoolName: state.PoolNameDedicated, TopologyAwareAssignments: map[uint64]string{ @@ -704,6 +825,24 @@ func TestCPUServerListAndWatch(t *testing.T) { }, QosLevel: consts.PodAnnotationQoSLevelDedicatedCores, }, + podInfo: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", + UID: "pod1", + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelDedicatedCores, + consts.PodAnnotationMemoryEnhancementKey: "{\"numa_exclusive\":true}", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c3", + }, + }, + }, + }, allocationInfo: &cpuadvisor.AllocationInfo{ OwnerPoolName: state.PoolNameDedicated, TopologyAwareAssignments: map[uint64]string{ @@ -981,11 +1120,11 @@ func TestCPUServerListAndWatch(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cs := newTestCPUServer(t) + cs := newTestCPUServer(t, []*v1.Pod{}) s := &mockCPUServerService_ListAndWatchServer{ResultsChan: make(chan *cpuadvisor.ListAndWatchResponse)} for _, info := range tt.infos { assert.NoError(t, cs.addContainer(info.request)) - assert.NoError(t, cs.updateContainerInfo(info.request.PodUid, info.request.ContainerName, info.allocationInfo)) + assert.NoError(t, cs.updateContainerInfo(info.request.PodUid, info.request.ContainerName, info.podInfo, info.allocationInfo)) nodeInfo, _ := cs.metaCache.GetContainerInfo(info.request.PodUid, info.request.ContainerName) nodeInfo.Isolated = info.isolated diff --git a/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server.go b/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server.go index cdb4f2557..fdc2c121f 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server.go @@ -30,6 +30,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util/general" ) @@ -47,9 +48,9 @@ type memoryServer struct { } func NewMemoryServer(recvCh chan types.InternalMemoryCalculationResult, sendCh chan types.TriggerInfo, conf *config.Configuration, - metaCache metacache.MetaCache, emitter metrics.MetricEmitter) (*memoryServer, error) { + metaCache metacache.MetaCache, metaServer *metaserver.MetaServer, emitter metrics.MetricEmitter) (*memoryServer, error) { ms := &memoryServer{} - ms.baseServer = newBaseServer(memoryServerName, conf, recvCh, sendCh, metaCache, emitter, ms) + ms.baseServer = newBaseServer(memoryServerName, conf, recvCh, sendCh, metaCache, metaServer, emitter, ms) ms.advisorSocketPath = conf.MemoryAdvisorSocketAbsPath ms.pluginSocketPath = conf.MemoryPluginSocketAbsPath ms.resourceRequestName = "MemoryRequest" diff --git a/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server_test.go b/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server_test.go index aa95ba46a..fe8ea9adb 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server_test.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server_test.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/options" @@ -36,7 +37,10 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/metaserver" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent" "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric" + "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" "github.com/kubewharf/katalyst-core/pkg/metrics" metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool" ) @@ -68,7 +72,7 @@ func generateTestMemoryAdvisorConfiguration(t *testing.T) *config.Configuration return conf } -func newTestMemoryServer(t *testing.T) *memoryServer { +func newTestMemoryServer(t *testing.T, podList []*v1.Pod) *memoryServer { recvCh := make(chan types.InternalMemoryCalculationResult) sendCh := make(chan types.TriggerInfo) conf := generateTestMemoryAdvisorConfiguration(t) @@ -78,7 +82,15 @@ func newTestMemoryServer(t *testing.T) *memoryServer { require.NoError(t, err) require.NotNil(t, metaCache) - memoryServer, err := NewMemoryServer(recvCh, sendCh, conf, metaCache, metrics.DummyMetrics{}) + metaServer := &metaserver.MetaServer{ + MetaAgent: &agent.MetaAgent{ + PodFetcher: &pod.PodFetcherStub{ + PodList: podList, + }, + }, + } + + memoryServer, err := NewMemoryServer(recvCh, sendCh, conf, metaCache, metaServer, metrics.DummyMetrics{}) require.NoError(t, err) require.NotNil(t, memoryServer) @@ -139,7 +151,7 @@ func TestListContainers(t *testing.T) { mockQRMServiceServer := &MockQRMServiceServer{containers: tt.containers, listErr: tt.listErr} advisorsvc.RegisterQRMServiceServer(serv, mockQRMServiceServer) - ms := newTestMemoryServer(t) + ms := newTestMemoryServer(t, []*v1.Pod{}) lis, err := net.Listen("unix", ms.pluginSocketPath) assert.NoError(t, err) @@ -163,7 +175,7 @@ func TestListContainers(t *testing.T) { func TestMemoryServerStartAndStop(t *testing.T) { t.Parallel() - cs := newTestMemoryServer(t) + cs := newTestMemoryServer(t, []*v1.Pod{}) err := cs.Start() assert.NoError(t, err) @@ -275,7 +287,7 @@ func TestMemoryServerListAndWatch(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cs := newTestMemoryServer(t) + cs := newTestMemoryServer(t, []*v1.Pod{}) s := &mockMemoryServerService_ListAndWatchServer{ResultsChan: make(chan *advisorsvc.ListAndWatchResponse)} for _, info := range tt.infos { assert.NoError(t, cs.addContainer(info.request)) diff --git a/pkg/agent/sysadvisor/plugin/qosaware/server/server.go b/pkg/agent/sysadvisor/plugin/qosaware/server/server.go index 071fefb54..6aae001f2 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/server/server.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/server/server.go @@ -27,6 +27,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/qosaware/resource" "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types" "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metrics" ) @@ -53,14 +54,14 @@ type qrmServerWrapper struct { // NewQRMServer returns a qrm server wrapper, which instantiates // all required qrm plugin servers according to config func NewQRMServer(advisorWrapper resource.ResourceAdvisor, conf *config.Configuration, - metaCache metacache.MetaCache, emitter metrics.MetricEmitter) (QRMServer, error) { + metaCache metacache.MetaCache, metaServer *metaserver.MetaServer, emitter metrics.MetricEmitter) (QRMServer, error) { qrmServer := qrmServerWrapper{ serversToRun: make(map[v1.ResourceName]subQRMServer), } for _, resourceNameStr := range conf.QRMServers { resourceName := v1.ResourceName(resourceNameStr) - server, err := newSubQRMServer(resourceName, advisorWrapper, conf, metaCache, emitter) + server, err := newSubQRMServer(resourceName, advisorWrapper, conf, metaCache, metaServer, emitter) if err != nil { return nil, fmt.Errorf("new qrm plugin server for %v failed: %v", resourceName, err) } else { @@ -88,7 +89,7 @@ func (qs *qrmServerWrapper) Run(ctx context.Context) { } func newSubQRMServer(resourceName v1.ResourceName, advisorWrapper resource.ResourceAdvisor, - conf *config.Configuration, metaCache metacache.MetaCache, emitter metrics.MetricEmitter) (subQRMServer, error) { + conf *config.Configuration, metaCache metacache.MetaCache, metaServer *metaserver.MetaServer, emitter metrics.MetricEmitter) (subQRMServer, error) { switch resourceName { case v1.ResourceCPU: subAdvisor, err := advisorWrapper.GetSubAdvisor(types.QoSResourceCPU) @@ -98,7 +99,7 @@ func newSubQRMServer(resourceName v1.ResourceName, advisorWrapper resource.Resou advisorRecvChInterface, advisorSendChInterface := subAdvisor.GetChannels() advisorRecvCh := advisorRecvChInterface.(chan types.TriggerInfo) advisorSendCh := advisorSendChInterface.(chan types.InternalCPUCalculationResult) - return NewCPUServer(advisorSendCh, advisorRecvCh, conf, metaCache, emitter) + return NewCPUServer(advisorSendCh, advisorRecvCh, conf, metaCache, metaServer, emitter) case v1.ResourceMemory: subAdvisor, err := advisorWrapper.GetSubAdvisor(types.QoSResourceMemory) if err != nil { @@ -107,7 +108,7 @@ func newSubQRMServer(resourceName v1.ResourceName, advisorWrapper resource.Resou advisorRecvChInterface, advisorSendChInterface := subAdvisor.GetChannels() advisorRecvCh := advisorRecvChInterface.(chan types.TriggerInfo) advisorSendCh := advisorSendChInterface.(chan types.InternalMemoryCalculationResult) - return NewMemoryServer(advisorSendCh, advisorRecvCh, conf, metaCache, emitter) + return NewMemoryServer(advisorSendCh, advisorRecvCh, conf, metaCache, metaServer, emitter) default: return nil, fmt.Errorf("illegal resource %v", resourceName) } diff --git a/pkg/config/agent/sysadvisor/qosaware/resource/cpu/cpu_isolation.go b/pkg/config/agent/sysadvisor/qosaware/resource/cpu/cpu_isolation.go index 789f9b251..9527d19c9 100644 --- a/pkg/config/agent/sysadvisor/qosaware/resource/cpu/cpu_isolation.go +++ b/pkg/config/agent/sysadvisor/qosaware/resource/cpu/cpu_isolation.go @@ -43,8 +43,10 @@ type CPUIsolationConfiguration struct { IsolationLockInThreshold int IsolationLockOutPeriodSecs int - IsolationDisabled bool - IsolationDisabledPools sets.String + IsolationDisabled bool + IsolationDisabledPools sets.String + IsolationForceEnablePools sets.String + IsolationNonExclusivePools sets.String } // NewCPUIsolationConfiguration creates new resource advisor configurations