Skip to content

Commit 0539162

Browse files
committed
feat(mysql): 优化 mysql 慢查询视图 #10235
1 parent 70e889a commit 0539162

File tree

12 files changed

+6600
-2758
lines changed

12 files changed

+6600
-2758
lines changed

dbm-services/common/bkdata-kafka-consumer/pkg/consumer/mysql_table_size.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,10 @@ func (c *MysqlTableSize) ConsumeClaim(session sarama.ConsumerGroupSession, claim
9292
if err := c.HandleMessageTryBatch(items, c.Sinker, c.Db); err != nil {
9393
slog.Error("handle message batch", err)
9494
} else {
95+
slog.Info("sink message", slog.String("key", string(msgs[len(msgs)-1].Key)))
9596
session.MarkMessage(msgs[len(msgs)-1], "")
9697
items = items[:0]
9798
msgs = msgs[:0]
98-
slog.Info("sink message", slog.String("key", string(msgs[len(msgs)-1].Key)))
9999
}
100100
}
101101
case message := <-claim.Messages():
@@ -127,10 +127,10 @@ func (c *MysqlTableSize) ConsumeClaim(session sarama.ConsumerGroupSession, claim
127127
slog.Error("handle message batch", err)
128128
time.Sleep(200 * time.Millisecond)
129129
} else {
130+
slog.Info("sink message", slog.String("key", string(message.Key)))
130131
session.MarkMessage(message, "")
131132
items = items[:0]
132133
msgs = msgs[:0]
133-
slog.Info("sink message", slog.String("key", string(message.Key)))
134134
}
135135
}
136136
case <-session.Context().Done():

dbm-services/common/bkdata-kafka-consumer/pkg/consumer/mysql_table_size_for_doris.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,10 @@ func (c *MysqlTableSizeDoris) ConsumeClaim(session sarama.ConsumerGroupSession,
6969
if err := c.HandleMessageTryBatch(items, c.Sinker, c.Db); err != nil {
7070
slog.Error("handle message batch", err)
7171
} else {
72+
slog.Info("sink message", slog.String("key", string(msgs[len(msgs)-1].Key)))
7273
session.MarkMessage(msgs[len(msgs)-1], "")
7374
items = items[:0]
7475
msgs = msgs[:0]
75-
slog.Info("sink message", slog.String("key", string(msgs[len(msgs)-1].Key)))
7676
}
7777
}
7878
case message := <-claim.Messages():
@@ -104,10 +104,10 @@ func (c *MysqlTableSizeDoris) ConsumeClaim(session sarama.ConsumerGroupSession,
104104
slog.Error("handle message batch", err)
105105
time.Sleep(200 * time.Millisecond)
106106
} else {
107+
slog.Info("sink message", slog.String("key", string(message.Key)))
107108
session.MarkMessage(message, "")
108109
items = items[:0]
109110
msgs = msgs[:0]
110-
slog.Info("sink message", slog.String("key", string(message.Key)))
111111
}
112112
}
113113
case <-session.Context().Done():

0 commit comments

Comments
 (0)