Skip to content

Commit 114ffec

Browse files
authored
Merge pull request #749 from WeBankPartners/dev
Dev
2 parents 3b98ff3 + a7ffe88 commit 114ffec

File tree

8 files changed

+59
-13
lines changed

8 files changed

+59
-13
lines changed

monitor-agent/archive_mysql_tool/funcs/db.go

+25-1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,29 @@ func InitDbEngine(databaseName string) (err error) {
5050
return err
5151
}
5252

53+
func ResetDbEngine() {
54+
err := mysqlEngine.Close()
55+
if err != nil {
56+
log.Printf("close mysql engine fail,%s \n", err.Error())
57+
}
58+
time.Sleep(30*time.Second)
59+
databaseName := Config().Mysql.DatabasePrefix + time.Now().Format("2006")
60+
connectStr := fmt.Sprintf("%s:%s@%s(%s:%s)/%s?collation=utf8mb4_unicode_ci&allowNativePasswords=true",
61+
Config().Mysql.User, Config().Mysql.Password, "tcp", Config().Mysql.Server, Config().Mysql.Port, databaseName)
62+
mysqlEngine, err = xorm.NewEngine("mysql", connectStr)
63+
if err != nil {
64+
log.Printf("init mysql fail with connect: %s error: %v \n", connectStr, err)
65+
} else {
66+
mysqlEngine.SetMaxIdleConns(Config().Mysql.MaxIdle)
67+
mysqlEngine.SetMaxOpenConns(Config().Mysql.MaxOpen)
68+
mysqlEngine.SetConnMaxLifetime(time.Duration(Config().Mysql.Timeout) * time.Second)
69+
mysqlEngine.Charset("utf8")
70+
// 使用驼峰式映射
71+
mysqlEngine.SetMapper(core.SnakeMapper{})
72+
}
73+
log.Println("Reset db engine done! ")
74+
}
75+
5376
func InitMonitorDbEngine() (err error) {
5477
connectStr := fmt.Sprintf("%s:%s@%s(%s:%s)/%s?collation=utf8mb4_unicode_ci&allowNativePasswords=true",
5578
Config().Monitor.Mysql.User, Config().Monitor.Mysql.Password, "tcp", Config().Monitor.Mysql.Server, Config().Monitor.Mysql.Port, Config().Monitor.Mysql.DataBase)
@@ -91,7 +114,7 @@ func insertMysql(rows []*ArchiveTable, tableName string) error {
91114
for sqlIndex, v := range sqlList {
92115
var tmpErr error
93116
for i := 0; i < 3; i++ {
94-
log.Printf("start try %d to insert mysql,data num:%d \n", i+1, rowCountList[sqlIndex])
117+
//log.Printf("start try %d to insert mysql,data num:%d \n", i+1, rowCountList[sqlIndex])
95118
_, err := mysqlEngine.Exec(v)
96119
if err != nil {
97120
tmpErr = err
@@ -104,6 +127,7 @@ func insertMysql(rows []*ArchiveTable, tableName string) error {
104127
}
105128
}
106129
if tmpErr != nil {
130+
log.Printf("Exec sql error:%s \n", tmpErr.Error())
107131
tmpErrorString := tmpErr.Error()
108132
if len(tmpErrorString) > 200 {
109133
tmpErrorString = tmpErrorString[:200]

monitor-agent/archive_mysql_tool/funcs/job.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,11 @@ func consumeJob() {
140140
endTime := time.Now()
141141
useTime := float64(endTime.Sub(startTime).Nanoseconds()) / 1e6
142142
log.Printf("done with consume job,use time: %.3f ms", useTime)
143+
if int(endTime.Sub(startTime).Seconds()) >= jobTimeout {
144+
log.Println("job timeout,try to reset db connection ")
145+
ResetDbEngine()
146+
}
147+
gTransport.CloseIdleConnections()
143148
}
144149
}
145150

@@ -151,7 +156,7 @@ func checkJobStatus() {
151156
log.Printf("archive job done \n")
152157
break
153158
}
154-
time.Sleep(10 * time.Second)
159+
time.Sleep(30 * time.Second)
155160
}
156161
}
157162

@@ -165,8 +170,9 @@ func archiveTimeoutAction(param ArchiveActionList) {
165170
case <-timeoutChan:
166171
log.Printf("done archive action,job length:%d \n", len(param))
167172
case <-time.After(time.Duration(jobTimeout) * time.Second):
168-
log.Printf("timeout archive action in 10min,job length:%d \n", len(param))
173+
log.Printf("timeout archive action in %d s,job length:%d \n", jobTimeout, len(param))
169174
}
175+
close(timeoutChan)
170176
}
171177

172178
func archiveAction(param ArchiveActionList) {

monitor-agent/archive_mysql_tool/funcs/prometheus.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,16 @@ func getPrometheusData(param *PrometheusQueryParam) error {
6767
return fmt.Errorf("http request error, %v \n", err)
6868
}
6969
if resp.StatusCode/100 != 2 {
70+
if resp.Body != nil {
71+
resp.Body.Close()
72+
}
7073
return fmt.Errorf("http response status %d \n", resp.StatusCode)
7174
}
7275
bodyBytes,err := ioutil.ReadAll(resp.Body)
76+
resp.Body.Close()
7377
if err != nil {
7478
return fmt.Errorf("http read body error, %v \n", err)
7579
}
76-
defer resp.Body.Close()
7780
var result PrometheusResponse
7881
err = json.Unmarshal(bodyBytes, &result)
7982
if err != nil {

monitor-server/api/v1/agent/kubernetes.go

+4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"net/http"
1313
"strconv"
1414
"strings"
15+
"time"
1516
)
1617

1718
func UpdateKubernetesCluster(c *gin.Context) {
@@ -72,6 +73,8 @@ func UpdateKubernetesCluster(c *gin.Context) {
7273
if err != nil {
7374
mid.ReturnHandleError(c, err.Error(), err)
7475
}else{
76+
time.Sleep(20*time.Second)
77+
db.SyncPodToEndpoint()
7578
mid.ReturnSuccess(c)
7679
}
7780
}
@@ -184,6 +187,7 @@ func handleAddKubernetesCluster(input k8sClusterRequestInputObj) error {
184187
}else {
185188
err = db.AddKubernetesCluster(m.KubernetesClusterParam{ClusterName: input.ClusterName, Ip: input.Ip, Port: input.Port, Token: input.Token})
186189
}
190+
db.SyncPodToEndpoint()
187191
return err
188192
}
189193

monitor-server/api/v1/alarm/alarm.go

+4
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,8 @@ func QueryEntityAlarm(c *gin.Context) {
545545
value := param.Criteria.Condition
546546
if strings.Contains(value, "monitor-check") {
547547
alarmObj := db.GetCheckProgressContent(value)
548+
alarmObj.Id = value
549+
alarmObj.DisplayName = alarmObj.Subject
548550
result.Data = append(result.Data, &alarmObj)
549551
}else {
550552
if strings.Contains(value, "-") {
@@ -571,6 +573,8 @@ func QueryEntityAlarm(c *gin.Context) {
571573
mid.ReturnData(c, result)
572574
return
573575
}
576+
alarmObj.Id = value
577+
alarmObj.DisplayName = alarmObj.Subject
574578
result.Data = append(result.Data, &alarmObj)
575579
}
576580
result.Status = "OK"

monitor-server/models/entity.go

+1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ type AlarmEntity struct {
7676

7777
type AlarmEntityObj struct {
7878
Id string `json:"id"`
79+
DisplayName string `json:"displayName"`
7980
Status string `json:"status"`
8081
Subject string `json:"subject"`
8182
Content string `json:"content"`

monitor-server/services/db/dashboard.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func GetPromMetric(endpoint []string, metric string) (error, string) {
140140
reg = strings.Replace(reg, "$guid", host.Guid, -1)
141141
}
142142
if strings.Contains(reg, "$pod") {
143-
reg = strings.Replace(reg, "$pod", host.Name[len(host.ExportVersion)+1:], -1)
143+
reg = strings.Replace(reg, "$pod", host.Name, -1)
144144
}
145145
if strings.Contains(reg, "$k8s_namespace") {
146146
reg = strings.Replace(reg, "$k8s_namespace", host.ExportVersion, -1)
@@ -182,7 +182,7 @@ func ReplacePromQlKeyword(promQl, metric string, host m.EndpointTable) string {
182182
promQl = strings.Replace(promQl, "$guid", host.Guid, -1)
183183
}
184184
if strings.Contains(promQl, "$pod") {
185-
promQl = strings.Replace(promQl, "$pod", host.Name[len(host.ExportVersion)+1:], -1)
185+
promQl = strings.Replace(promQl, "$pod", host.Name, -1)
186186
}
187187
if strings.Contains(promQl, "$k8s_namespace") {
188188
promQl = strings.Replace(promQl, "$k8s_namespace", host.ExportVersion, -1)

monitor-server/services/db/kubernetes.go

+11-7
Original file line numberDiff line numberDiff line change
@@ -166,11 +166,11 @@ func StartCronSyncKubernetesPod(interval int) {
166166
t := time.NewTicker(time.Duration(interval*10)*time.Second).C
167167
for {
168168
<- t
169-
go syncPodToEndpoint()
169+
go SyncPodToEndpoint()
170170
}
171171
}
172172

173-
func syncPodToEndpoint() bool {
173+
func SyncPodToEndpoint() bool {
174174
log.Logger.Info("start to sync kubernetes pod")
175175
var kubernetesTables []*m.KubernetesClusterTable
176176
result := false
@@ -190,7 +190,11 @@ func syncPodToEndpoint() bool {
190190
x.SQL("select * from kubernetes_endpoint_rel where kubernete_id=?", v.Id).Find(&tmpKubernetesEndpointTables)
191191
tmpApiServerIp := v.ApiServer[:strings.Index(v.ApiServer, ":")]
192192
for _,vv := range series {
193-
tmpEndpointGuid := fmt.Sprintf("%s_%s_pod", vv.Name, tmpApiServerIp)
193+
tmpPodName := vv.Name
194+
if strings.HasPrefix(tmpPodName, "pod=") {
195+
tmpPodName = tmpPodName[4:]
196+
}
197+
tmpEndpointGuid := fmt.Sprintf("%s_%s_pod", tmpPodName, tmpApiServerIp)
194198
existsFlag := false
195199
for _,ke := range tmpKubernetesEndpointTables {
196200
if ke.EndpointGuid == tmpEndpointGuid {
@@ -204,8 +208,8 @@ func syncPodToEndpoint() bool {
204208
}else{
205209
tmpGuidMap[tmpEndpointGuid] = 1
206210
}
207-
endpointTables = append(endpointTables, &m.EndpointTable{Guid:tmpEndpointGuid, Name:vv.Name, Ip:tmpApiServerIp, ExportType:"pod", Step:10, OsType:v.ClusterName})
208-
kubernetesEndpointTables = append(kubernetesEndpointTables, &m.KubernetesEndpointRelTable{KuberneteId:v.Id, EndpointGuid:tmpEndpointGuid})
211+
endpointTables = append(endpointTables, &m.EndpointTable{Guid:tmpEndpointGuid, Name:tmpPodName, Ip:tmpApiServerIp, ExportType:"pod", Step:10, OsType:v.ClusterName})
212+
kubernetesEndpointTables = append(kubernetesEndpointTables, &m.KubernetesEndpointRelTable{KuberneteId:v.Id, EndpointGuid:tmpEndpointGuid, PodGuid: tmpPodName})
209213
}
210214
}
211215
}
@@ -242,9 +246,9 @@ func syncPodToEndpoint() bool {
242246
}
243247
if len(kubernetesEndpointTables) > 0 {
244248
result = true
245-
keRelSql := "insert into kubernetes_endpoint_rel(kubernete_id,endpoint_guid) values "
249+
keRelSql := "insert into kubernetes_endpoint_rel(kubernete_id,endpoint_guid,pod_guid) values "
246250
for i,v := range kubernetesEndpointTables {
247-
keRelSql += fmt.Sprintf("(%d,'%s')", v.KuberneteId, v.EndpointGuid)
251+
keRelSql += fmt.Sprintf("(%d,'%s','%s')", v.KuberneteId, v.EndpointGuid, v.PodGuid)
248252
if i < len(kubernetesEndpointTables)-1 {
249253
keRelSql += ","
250254
}

0 commit comments

Comments
 (0)