17
17
18
18
local core = require (" apisix.core" )
19
19
local binaryHeap = require (" binaryheap" )
20
+ local dkjson = require (" dkjson" )
20
21
local ipairs = ipairs
21
22
local pairs = pairs
22
-
23
+ local ngx = ngx
24
+ local ngx_shared = ngx .shared
25
+ local tostring = tostring
23
26
24
27
local _M = {}
25
28
29
+ -- Shared dictionary to store connection counts across balancer recreations
30
+ local CONN_COUNT_DICT_NAME = " balancer-least-conn"
31
+ local conn_count_dict
26
32
27
33
local function least_score (a , b )
28
34
return a .score < b .score
29
35
end
30
36
37
+ -- Get the connection count key for a specific upstream and server
38
+ local function get_conn_count_key (upstream , server )
39
+ local upstream_id = upstream .id
40
+ if not upstream_id then
41
+ -- Fallback to a hash of the upstream configuration using stable encoding
42
+ upstream_id = ngx .crc32_short (dkjson .encode (upstream ))
43
+ core .log .debug (" generated upstream_id from hash: " , upstream_id )
44
+ end
45
+ local key = " conn_count:" .. tostring (upstream_id ) .. " :" .. server
46
+ core .log .debug (" generated connection count key: " , key )
47
+ return key
48
+ end
49
+
50
+
51
+ -- Get the current connection count for a server from shared dict
52
+ local function get_server_conn_count (upstream , server )
53
+ local key = get_conn_count_key (upstream , server )
54
+ local count , err = conn_count_dict :get (key )
55
+ if err then
56
+ core .log .error (" failed to get connection count for " , server , " : " , err )
57
+ return 0
58
+ end
59
+ local result = count or 0
60
+ core .log .debug (" retrieved connection count for server " , server , " : " , result )
61
+ return result
62
+ end
63
+
64
+
65
+ -- Set the connection count for a server in shared dict
66
+ local function set_server_conn_count (upstream , server , count )
67
+ local key = get_conn_count_key (upstream , server )
68
+ local ok , err = conn_count_dict :set (key , count )
69
+ if not ok then
70
+ core .log .error (" failed to set connection count for " , server , " : " , err )
71
+ else
72
+ core .log .debug (" set connection count for server " , server , " to " , count )
73
+ end
74
+ end
75
+
76
+
77
+ -- Increment the connection count for a server
78
+ local function incr_server_conn_count (upstream , server , delta )
79
+ local key = get_conn_count_key (upstream , server )
80
+ local new_count , err = conn_count_dict :incr (key , delta or 1 , 0 )
81
+ if not new_count then
82
+ core .log .error (" failed to increment connection count for " , server , " : " , err )
83
+ return 0
84
+ end
85
+ core .log .debug (" incremented connection count for server " , server , " by " , delta or 1 ,
86
+ " , new count: " , new_count )
87
+ return new_count
88
+ end
89
+
90
+
91
+ -- Clean up connection counts for servers that are no longer in the upstream
92
+ local function cleanup_stale_conn_counts (upstream , current_servers )
93
+ local upstream_id = upstream .id
94
+ if not upstream_id then
95
+ upstream_id = ngx .crc32_short (dkjson .encode (upstream ))
96
+ end
97
+
98
+ local prefix = " conn_count:" .. tostring (upstream_id ) .. " :"
99
+ core .log .debug (" cleaning up stale connection counts with prefix: " , prefix )
100
+ local keys , err = conn_count_dict :get_keys (0 ) -- Get all keys
101
+ if err then
102
+ core .log .error (" failed to get keys from shared dict: " , err )
103
+ return
104
+ end
105
+
106
+ for _ , key in ipairs (keys or {}) do
107
+ if core .string .has_prefix (key , prefix ) then
108
+ local server = key :sub (# prefix + 1 )
109
+ if not current_servers [server ] then
110
+ -- This server is no longer in the upstream, clean it up
111
+ local ok , delete_err = conn_count_dict :delete (key )
112
+ if not ok and delete_err then
113
+ core .log .error (" failed to delete stale connection count for server " , server , " : " , delete_err )
114
+ else
115
+ core .log .info (" cleaned up stale connection count for server: " , server )
116
+ end
117
+ end
118
+ end
119
+ end
120
+ end
31
121
32
122
function _M .new (up_nodes , upstream )
123
+ if not conn_count_dict then
124
+ conn_count_dict = ngx_shared [CONN_COUNT_DICT_NAME ]
125
+ end
126
+
127
+ if not conn_count_dict then
128
+ core .log .error (" shared dict '" , CONN_COUNT_DICT_NAME , " ' not found" )
129
+ return nil , " shared dict not found"
130
+ end
131
+
33
132
local servers_heap = binaryHeap .minUnique (least_score )
133
+
134
+ -- Clean up stale connection counts for removed servers
135
+ cleanup_stale_conn_counts (upstream , up_nodes )
136
+
34
137
for server , weight in pairs (up_nodes ) do
35
- local score = 1 / weight
138
+ -- Get the persisted connection count for this server
139
+ local conn_count = get_server_conn_count (upstream , server )
140
+ -- Score directly reflects weighted connection count
141
+ local score = (conn_count + 1 ) / weight
142
+
143
+ core .log .debug (" initializing server " , server ,
144
+ " | weight: " , weight ,
145
+ " | conn_count: " , conn_count ,
146
+ " | score: " , score ,
147
+ " | upstream_id: " , upstream .id or " no-id" )
148
+
36
149
-- Note: the argument order of insert is different from others
37
150
servers_heap :insert ({
38
151
server = server ,
39
- effect_weight = 1 / weight ,
152
+ weight = weight ,
40
153
score = score ,
41
154
}, server )
42
155
end
43
156
44
157
return {
45
158
upstream = upstream ,
46
- get = function (ctx )
159
+ get = function (ctx )
47
160
local server , info , err
48
161
if ctx .balancer_tried_servers then
49
162
local tried_server_list = {}
@@ -75,15 +188,29 @@ function _M.new(up_nodes, upstream)
75
188
return nil , err
76
189
end
77
190
78
- info .score = info .score + info .effect_weight
191
+ -- Get current connection count for detailed logging
192
+ local current_conn_count = get_server_conn_count (upstream , server )
193
+ info .score = (current_conn_count + 1 ) / info .weight
79
194
servers_heap :update (server , info )
195
+ incr_server_conn_count (upstream , server , 1 )
80
196
return server
81
197
end ,
82
- after_balance = function (ctx , before_retry )
198
+ after_balance = function (ctx , before_retry )
83
199
local server = ctx .balancer_server
84
200
local info = servers_heap :valueByPayload (server )
85
- info .score = info .score - info .effect_weight
201
+ if not info then
202
+ core .log .error (" server info not found for: " , server )
203
+ return
204
+ end
205
+
206
+ local current_conn_count = get_server_conn_count (upstream , server )
207
+ info .score = (current_conn_count - 1 ) / info .weight
208
+ if info .score < 0 then
209
+ info .score = 0 -- Prevent negative scores
210
+ end
86
211
servers_heap :update (server , info )
212
+ -- Decrement connection count in shared dict
213
+ incr_server_conn_count (upstream , server , - 1 )
87
214
88
215
if not before_retry then
89
216
if ctx .balancer_tried_servers then
@@ -100,7 +227,7 @@ function _M.new(up_nodes, upstream)
100
227
101
228
ctx .balancer_tried_servers [server ] = true
102
229
end ,
103
- before_retry_next_priority = function (ctx )
230
+ before_retry_next_priority = function (ctx )
104
231
if ctx .balancer_tried_servers then
105
232
core .tablepool .release (" balancer_tried_servers" , ctx .balancer_tried_servers )
106
233
ctx .balancer_tried_servers = nil
@@ -109,5 +236,5 @@ function _M.new(up_nodes, upstream)
109
236
}
110
237
end
111
238
112
-
113
239
return _M
240
+
0 commit comments