Skip to content

feat: add support for multiple nacos instances and replace events library with shdict #12263

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 46 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
e3dac56
feat: add support for multiple nacos instances and replace events lib…
Revolyssup May 28, 2025
41a6a96
add shared dict
Revolyssup May 28, 2025
53b82c3
add timeout
Revolyssup May 28, 2025
c722fb4
add license
Revolyssup May 28, 2025
6be3ccc
fix indent
Revolyssup May 28, 2025
d15a621
fix test
Revolyssup May 28, 2025
19bd92c
docker compose revert
Revolyssup May 28, 2025
ad5cde8
revert nacos3.t
Revolyssup May 28, 2025
a3a5c14
revert
Revolyssup May 28, 2025
33148d8
temp
Revolyssup May 28, 2025
b93618f
test
Revolyssup May 28, 2025
50f2547
fix
Revolyssup May 28, 2025
0d1066a
reset waiting time
Revolyssup May 28, 2025
9542877
add logs
Revolyssup May 28, 2025
6b5c731
fix nacos2 assert
Revolyssup May 28, 2025
450452c
fix stream
Revolyssup May 28, 2025
f77e00e
fix lint
Revolyssup May 28, 2025
bbfaf29
add dedup logic
Revolyssup May 28, 2025
e6d84b5
add log
Revolyssup May 28, 2025
91598fe
fix test2
Revolyssup May 28, 2025
fab71bd
fix lint
Revolyssup May 28, 2025
4ba8fc4
add multi cluster test
Revolyssup May 28, 2025
612de20
fix lint
Revolyssup May 28, 2025
6ad86df
fix nacos2
Revolyssup May 28, 2025
1d896bc
fix lint
Revolyssup May 29, 2025
4dc1e22
remove timeout from tests
Revolyssup May 29, 2025
2240259
update docs
Revolyssup May 29, 2025
85879e7
readd grpc scheme support
Revolyssup May 29, 2025
a8c7a29
fix lint docs
Revolyssup May 29, 2025
9fc6ecc
fix lint
Revolyssup May 29, 2025
89a666b
fix lint
Revolyssup May 29, 2025
f3e8bb2
increase timeout in tests
Revolyssup May 29, 2025
86ccc81
Apply suggestions from code review
Revolyssup May 29, 2025
c73abb7
fix docs
Revolyssup May 29, 2025
bb5d343
fix cli test
Revolyssup May 29, 2025
e8e09bc
add sleep to allow workers to get ready
Revolyssup May 30, 2025
7daad27
fix indent
Revolyssup May 30, 2025
a4e225f
fix indent
Revolyssup May 30, 2025
add27eb
Merge branch 'master' of github.com:apache/apisix into revolyssup/ref…
Revolyssup Jun 3, 2025
fe05f02
apply suggestions
Revolyssup Jun 4, 2025
1e9159a
init nacos shared dict in meta
Revolyssup Jun 4, 2025
2d222ce
fail fast
Revolyssup Jun 5, 2025
6608c7c
remove nacos_client upvalue
Revolyssup Jun 5, 2025
6579ef0
fix removal of wrong keys
Revolyssup Jun 5, 2025
9f15786
add retry
Revolyssup Jun 5, 2025
def3458
lint
Revolyssup Jun 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apisix/cli/ngx_tpl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ lua {
{% if status then %}
lua_shared_dict status-report {* meta.lua_shared_dict["status-report"] *};
{% end %}
lua_shared_dict nacos 10m;
}

{% if enabled_stream_plugins["prometheus"] and not enable_http then %}
Expand Down
293 changes: 293 additions & 0 deletions apisix/discovery/nacos/factory.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

local require = require
local core = require("apisix.core")
local http = require('resty.http')
local ngx = ngx
local ngx_re = require('ngx.re')
local utils = require("apisix.discovery.nacos.utils")
local string = string
local string_sub = string.sub
local str_find = core.string.find
local ngx_timer_at = ngx.timer.at
local math_random = math.random
local ipairs = ipairs
local shdict_name = "nacos"

local nacos_dict = ngx.shared[shdict_name]
local NACOS_LOGIN_PATH = "/auth/login"
local NACOS_INSTANCE_PATH = "/ns/instance/list"
local _M = {}


local function _request(method, uri, params, headers, body, options)
local url = uri
if params ~= nil and params ~= {} then
url = uri .. "?" .. ngx.encode_args(params)
end
local httpc = http.new()
local timeout = options and options.timeout or {}
local connect_timeout = timeout.connect and timeout.connect * 1000 or 2000
local read_timeout = timeout.read and timeout.read * 1000 or 2000
local write_timeout = timeout.write and timeout.write * 1000 or 5000

httpc:set_timeouts(connect_timeout, read_timeout, write_timeout )
local res, err = httpc:request_uri(url, {
method = method,
headers = headers,
body = body,
ssl_verify = false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems unsafe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its okay to allow for non tls traffic for service discovery. Eureka service discovery code also has the same logic.

})

if not res then
return nil, err
end

if not res.body or res.status ~= 200 then
return nil, 'status = ' .. res.status
end

core.log.info("request to nacos, uri: ", url, "response: ", res.body)

local data, err = core.json.decode(res.body)
if not data then
return nil, err
end

return data
end


local function get_base_uri(hosts)
local host = hosts
-- TODO Add health check to get healthy nodes.
local url = host[math_random(#host)]
local auth_idx = core.string.rfind_char(url, '@')
local username, password
if auth_idx then
local protocol_idx = str_find(url, '://')
local protocol = string_sub(url, 1, protocol_idx + 2)
local user_and_password = string_sub(url, protocol_idx + 3, auth_idx - 1)
local arr = ngx_re.split(user_and_password, ':')
if #arr == 2 then
username = arr[1]
password = arr[2]
end
local other = string_sub(url, auth_idx + 1)
url = protocol .. other
end
return url, username, password
end

local function request_login(self, host, username, password)
local params = {
username = username,
password = password,
}
-- backward compat: NACOS_LOGIN_PATH starts with "/" so
-- we need to remove the last "/" from prefix
if string_sub(self.config.prefix, -1) == "/" then
self.config.prefix = string_sub(self.config.prefix, 1, -2)
end
local uri = host .. self.config.prefix .. NACOS_LOGIN_PATH

local headers = {
["Content-Type"] ="application/x-www-form-urlencoded"
}

local resp, err = _request("POST", uri, params, headers, nil, {timeout=self.config.timeout})
if not resp then
core.log.error("failed to fetch token from nacos, uri: ", uri, " err: ", err)
return ""
end
return resp.accessToken
end


local function request_instance_list(self, params, host)
-- backward compat: NACOS_INSTANCE_PATH starts with "/" so
-- we need to remove the last "/" from prefix
if string_sub(self.config.prefix, -1) == "/" then
self.config.prefix = string_sub(self.config.prefix, 1, -2)
end
local uri = host .. self.config.prefix .. NACOS_INSTANCE_PATH

local resp, err = _request("GET", uri, params)
if not resp then
core.log.error("failed to fetch instances list from nacos, uri: ", uri, " err: ", err)
return {}
end
return resp.hosts or {}
end

local function is_grpc(scheme)
return scheme == "grpc" or scheme == "grpcs"
end


local function fetch_instances(self, serv)
local config = self.config

local params = {
["namespaceId"] = serv.namespace_id or "",
["groupName"] = serv.group_name or "DEFAULT_GROUP",
["serviceName"] = serv.name,
["healthyOnly"] = "true"
}

local auth = config.auth or {}
-- for backward compat:
-- In older method, we passed username and password inside of host
-- In new method its passed separately
local username, password, host
if config.old_conf then
-- extract username and password from host
host, username, password = get_base_uri(config.hosts)
else
host = config.hosts[math_random(#config.hosts)]
if (auth.username and auth.username ~= "") and (auth.password and auth.password ~= "") then
username = auth.username
password = auth.password
end
end
if username and username ~= "" and password and password ~= "" then
local token = request_login(self, host, username, password)
params["accessToken"] = token
end

if auth.token and auth.token ~= "" then
params["accessToken"] = auth.token
end

if (auth.access_key and auth.access_key ~= "") and
(auth.secret_key and auth.secret_key ~= "") then
local ak, data, signature = utils.generate_signature(serv.group_name,
serv.name, auth.access_key, auth.secret_key)
params["ak"] = ak
params["data"] = data
params["signature"] = signature
end

local instances = request_instance_list(self, params, host)
local nodes = {}
for _, instance in ipairs(instances) do
local node = {
host = instance.ip,
port = instance.port,
weight = instance.weight or self.config.default_weight,
metadata = instance.metadata,
}
-- docs: https://github.com/yidongnan/grpc-spring-boot-starter/pull/496
if is_grpc(instance.scheme) and instance.metadata and instance.metadata.gRPC_port then
node.port = host.metadata.gRPC_port
end
core.table.insert(nodes, node)
end
return nodes
end


local function fetch_full_registry(self)
return function (premature)
if premature then
return
end

local config = self.config
local services_in_use = utils.get_nacos_services(config.id)
local service_names = {}
for _, serv in ipairs(services_in_use) do
if self.stop_flag then
core.log.error("nacos client is exited, id: ", config.id)
return
end
local key = utils.generate_key(serv.id, serv.namespace_id,
serv.group_name, serv.name)
service_names[key] = true
local nodes = self:fetch_instances(serv)
if #nodes > 0 then
local content = core.json.encode(nodes)
local key = key
nacos_dict:set(key, content)
end
end
-- delete unused service names
local keys = nacos_dict:get_keys(0)
Copy link
Member

@nic-6443 nic-6443 Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CAUTION Avoid calling this method on dictionaries with a very large number of keys as it may lock the dictionary for significant amount of time and block Nginx worker processes trying to access the dictionary.

According to the OpenResty documentation, I realized that get_keys should be avoided. We can record services_in_use in a module variable so that by comparing the values of services_in_use of last time and this time executing the fetch_full_registry function, we can determine the changes without using get_keys.

for _, key in ipairs(keys) do
if core.string.has_prefix(key, config.id) and not service_names[key] then
nacos_dict:delete(key)
end
end
ngx_timer_at(self.config.fetch_interval, self:fetch_full_registry())
end
end


local function stop(self)
self.stop_flag = true

if self.checker then
self.checker:clear()
end
end


local function start(self)
ngx_timer_at(0, self:fetch_full_registry())
end


function _M.new(config)
local version = ngx.md5(core.json.encode(config, true))

local client = {
id = config.id,
version = version,
config = config,
stop_flag = false,

start = start,
stop = stop,
fetch_instances = fetch_instances,
fetch_full_registry = fetch_full_registry,
Comment on lines +264 to +267
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to APISIX convention, you should use the meta table and _index. I don't remember us having such a precedent.

}

if config.check then
local health_check = require("resty.healthcheck")
local checker = health_check.new({
name = config.id,
shm_name = "nacos",
checks = config.check
})

local ok, err = checker:add_target(config.check.active.host,
config.check.active.port, nil, false)
if not ok then
core.log.error("failed to add health check target",
core.json.encode(config), " err: ", err)
else
core.log.info("success to add health checker, id ", config.id,
" host ", config.check.active.host, " port ", config.check.active.port)
client.checker = checker
end
end

return client
end

return _M
Loading
Loading