Skip to content

Commit

Permalink
Tey get stable dns when service pod changed
Browse files Browse the repository at this point in the history
  • Loading branch information
willzhen committed Apr 7, 2022
1 parent 69c33c4 commit 0822599
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 30 deletions.
52 changes: 34 additions & 18 deletions service/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/memory-overflow/highly-balanced-scheduling-agent/common"
"github.com/memory-overflow/highly-balanced-scheduling-agent/common/config"
)

func (pxy *proxyService) init(ctx context.Context) {
Expand Down Expand Up @@ -49,25 +50,40 @@ func syncIps(ctx context.Context, service *k8sserviceInfo) {
if err != nil {
return
}
// 三次拉取到的都是同一个 ip 列表再继续,防止 pod 重启过程中dns ip 列表不稳定
for i := 0; i < 2; i++ {
time.Sleep(100 * time.Millisecond)
dupips, err := common.GetDns(ctx, service.k8sHost)
if err != nil {
return
}
if !common.SliceSame(ips, dupips) {
return
lastIps := []string{}
service.lastConnections.Range(
func(key, value interface{}) bool {
lastIps = append(lastIps, key.(string))
return true
})
if !common.SliceSame(lastIps, ips) {
// dns 发生变化,服务有重启
// 先锁住服务调度
service.locked = true
time.Sleep(10 * time.Second) // 等待服务重启完成
// 5次拉取到的都是同一个 ip 列表再继续,防止 pod 重启过程中dns ip 列表不稳定
for i := 0; i < 4; i++ {
time.Sleep(500 * time.Millisecond)
dupips, err := common.GetDns(ctx, service.k8sHost)
if err != nil {
return
}
if !common.SliceSame(ips, dupips) {
return
}
}
}
var newMap sync.Map
for _, ip := range ips {
if value, ok := service.lastConnections.Load(ip); ok {
newMap.Store(ip, value)
} else {
x := int32(service.limitConnections)
newMap.Store(ip, &x)
config.GetLogger().Sugar().Infof("new ips: %v for uri: %s", ips, service.uri)
var newMap sync.Map
for _, ip := range ips {
if value, ok := service.lastConnections.Load(ip); ok {
newMap.Store(ip, value)
} else {
x := int32(service.limitConnections)
newMap.Store(ip, &x)
}
}
service.lastConnections = newMap
service.locked = false
}
service.lastConnections = newMap

}
28 changes: 16 additions & 12 deletions service/proxy_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type job struct {
}

type k8sserviceInfo struct {
locked bool
uri string
k8sPort int
k8sHost string
Expand All @@ -37,18 +38,20 @@ func (service *k8sserviceInfo) getIp(ctx context.Context) (ip string, last *int3
// 超时,一直没有空资源使用
return "", nil, errors.New("no resources available")
default:
var maxLast *int32
service.lastConnections.Range(
func(key, value interface{}) bool {
if maxLast == nil || *value.(*int32) > *maxLast {
maxLast = value.(*int32)
ip = key.(string)
}
return true
})
if maxLast != nil && *maxLast > 0 {
atomic.AddInt32(maxLast, -1)
return ip, maxLast, nil
if !service.locked {
var maxLast *int32
service.lastConnections.Range(
func(key, value interface{}) bool {
if maxLast == nil || *value.(*int32) > *maxLast {
maxLast = value.(*int32)
ip = key.(string)
}
return true
})
if maxLast != nil && *maxLast > 0 {
atomic.AddInt32(maxLast, -1)
return ip, maxLast, nil
}
}
time.Sleep(2 * time.Second)
}
Expand All @@ -68,6 +71,7 @@ func BuildProxy(ctx context.Context, routes []config.Route) *proxyService {
}
for _, route := range routes {
serviceInfo := k8sserviceInfo{
locked: false,
uri: route.URI,
k8sHost: route.K8sHost,
k8sPort: route.K8sPort,
Expand Down

0 comments on commit 0822599

Please sign in to comment.