-
Notifications
You must be signed in to change notification settings - Fork 2.6k
feat: add support for multiple nacos instances and replace events library with shdict #12263
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
e3dac56
41a6a96
53b82c3
c722fb4
6be3ccc
d15a621
19bd92c
ad5cde8
a3a5c14
33148d8
b93618f
50f2547
0d1066a
9542877
6b5c731
450452c
f77e00e
bbfaf29
e6d84b5
91598fe
fab71bd
4ba8fc4
612de20
6ad86df
1d896bc
4dc1e22
2240259
85879e7
a8c7a29
9fc6ecc
89a666b
f3e8bb2
86ccc81
c73abb7
bb5d343
e8e09bc
7daad27
a4e225f
add27eb
fe05f02
1e9159a
2d222ce
6608c7c
6579ef0
9f15786
def3458
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,293 @@ | ||
-- | ||
-- Licensed to the Apache Software Foundation (ASF) under one or more | ||
-- contributor license agreements. See the NOTICE file distributed with | ||
-- this work for additional information regarding copyright ownership. | ||
-- The ASF licenses this file to You under the Apache License, Version 2.0 | ||
-- (the "License"); you may not use this file except in compliance with | ||
-- the License. You may obtain a copy of the License at | ||
-- | ||
-- http://www.apache.org/licenses/LICENSE-2.0 | ||
-- | ||
-- Unless required by applicable law or agreed to in writing, software | ||
-- distributed under the License is distributed on an "AS IS" BASIS, | ||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
-- See the License for the specific language governing permissions and | ||
-- limitations under the License. | ||
-- | ||
|
||
local require = require | ||
local core = require("apisix.core") | ||
local http = require('resty.http') | ||
local ngx = ngx | ||
local ngx_re = require('ngx.re') | ||
local utils = require("apisix.discovery.nacos.utils") | ||
local string = string | ||
local string_sub = string.sub | ||
local str_find = core.string.find | ||
local ngx_timer_at = ngx.timer.at | ||
local math_random = math.random | ||
local ipairs = ipairs | ||
local shdict_name = "nacos" | ||
|
||
local nacos_dict = ngx.shared[shdict_name] | ||
local NACOS_LOGIN_PATH = "/auth/login" | ||
local NACOS_INSTANCE_PATH = "/ns/instance/list" | ||
local _M = {} | ||
|
||
|
||
local function _request(method, uri, params, headers, body, options) | ||
local url = uri | ||
if params ~= nil and params ~= {} then | ||
url = uri .. "?" .. ngx.encode_args(params) | ||
end | ||
local httpc = http.new() | ||
local timeout = options and options.timeout or {} | ||
local connect_timeout = timeout.connect and timeout.connect * 1000 or 2000 | ||
local read_timeout = timeout.read and timeout.read * 1000 or 2000 | ||
local write_timeout = timeout.write and timeout.write * 1000 or 5000 | ||
|
||
httpc:set_timeouts(connect_timeout, read_timeout, write_timeout ) | ||
local res, err = httpc:request_uri(url, { | ||
method = method, | ||
headers = headers, | ||
body = body, | ||
ssl_verify = false, | ||
}) | ||
|
||
if not res then | ||
return nil, err | ||
end | ||
|
||
if not res.body or res.status ~= 200 then | ||
return nil, 'status = ' .. res.status | ||
end | ||
|
||
core.log.info("request to nacos, uri: ", url, "response: ", res.body) | ||
|
||
local data, err = core.json.decode(res.body) | ||
if not data then | ||
return nil, err | ||
end | ||
|
||
return data | ||
end | ||
|
||
|
||
local function get_base_uri(hosts) | ||
local host = hosts | ||
-- TODO Add health check to get healthy nodes. | ||
local url = host[math_random(#host)] | ||
local auth_idx = core.string.rfind_char(url, '@') | ||
local username, password | ||
if auth_idx then | ||
local protocol_idx = str_find(url, '://') | ||
local protocol = string_sub(url, 1, protocol_idx + 2) | ||
local user_and_password = string_sub(url, protocol_idx + 3, auth_idx - 1) | ||
local arr = ngx_re.split(user_and_password, ':') | ||
if #arr == 2 then | ||
username = arr[1] | ||
password = arr[2] | ||
end | ||
local other = string_sub(url, auth_idx + 1) | ||
url = protocol .. other | ||
end | ||
return url, username, password | ||
end | ||
|
||
local function request_login(self, host, username, password) | ||
local params = { | ||
username = username, | ||
password = password, | ||
} | ||
-- backward compat: NACOS_LOGIN_PATH starts with "/" so | ||
-- we need to remove the last "/" from prefix | ||
if string_sub(self.config.prefix, -1) == "/" then | ||
self.config.prefix = string_sub(self.config.prefix, 1, -2) | ||
end | ||
local uri = host .. self.config.prefix .. NACOS_LOGIN_PATH | ||
|
||
local headers = { | ||
["Content-Type"] ="application/x-www-form-urlencoded" | ||
} | ||
|
||
local resp, err = _request("POST", uri, params, headers, nil, {timeout=self.config.timeout}) | ||
if not resp then | ||
core.log.error("failed to fetch token from nacos, uri: ", uri, " err: ", err) | ||
return "" | ||
end | ||
return resp.accessToken | ||
end | ||
|
||
|
||
local function request_instance_list(self, params, host) | ||
-- backward compat: NACOS_INSTANCE_PATH starts with "/" so | ||
-- we need to remove the last "/" from prefix | ||
if string_sub(self.config.prefix, -1) == "/" then | ||
self.config.prefix = string_sub(self.config.prefix, 1, -2) | ||
end | ||
local uri = host .. self.config.prefix .. NACOS_INSTANCE_PATH | ||
|
||
local resp, err = _request("GET", uri, params) | ||
if not resp then | ||
core.log.error("failed to fetch instances list from nacos, uri: ", uri, " err: ", err) | ||
return {} | ||
end | ||
return resp.hosts or {} | ||
end | ||
|
||
local function is_grpc(scheme) | ||
return scheme == "grpc" or scheme == "grpcs" | ||
end | ||
|
||
|
||
local function fetch_instances(self, serv) | ||
local config = self.config | ||
|
||
local params = { | ||
["namespaceId"] = serv.namespace_id or "", | ||
["groupName"] = serv.group_name or "DEFAULT_GROUP", | ||
["serviceName"] = serv.name, | ||
["healthyOnly"] = "true" | ||
} | ||
|
||
local auth = config.auth or {} | ||
-- for backward compat: | ||
-- In older method, we passed username and password inside of host | ||
-- In new method its passed separately | ||
local username, password, host | ||
if config.old_conf then | ||
-- extract username and password from host | ||
host, username, password = get_base_uri(config.hosts) | ||
else | ||
host = config.hosts[math_random(#config.hosts)] | ||
if (auth.username and auth.username ~= "") and (auth.password and auth.password ~= "") then | ||
username = auth.username | ||
password = auth.password | ||
end | ||
end | ||
if username and username ~= "" and password and password ~= "" then | ||
local token = request_login(self, host, username, password) | ||
params["accessToken"] = token | ||
end | ||
|
||
if auth.token and auth.token ~= "" then | ||
params["accessToken"] = auth.token | ||
end | ||
|
||
if (auth.access_key and auth.access_key ~= "") and | ||
(auth.secret_key and auth.secret_key ~= "") then | ||
local ak, data, signature = utils.generate_signature(serv.group_name, | ||
serv.name, auth.access_key, auth.secret_key) | ||
params["ak"] = ak | ||
params["data"] = data | ||
params["signature"] = signature | ||
end | ||
|
||
local instances = request_instance_list(self, params, host) | ||
local nodes = {} | ||
for _, instance in ipairs(instances) do | ||
local node = { | ||
host = instance.ip, | ||
port = instance.port, | ||
weight = instance.weight or self.config.default_weight, | ||
metadata = instance.metadata, | ||
} | ||
-- docs: https://github.com/yidongnan/grpc-spring-boot-starter/pull/496 | ||
if is_grpc(instance.scheme) and instance.metadata and instance.metadata.gRPC_port then | ||
node.port = host.metadata.gRPC_port | ||
end | ||
core.table.insert(nodes, node) | ||
end | ||
return nodes | ||
end | ||
|
||
|
||
local function fetch_full_registry(self) | ||
return function (premature) | ||
if premature then | ||
return | ||
end | ||
|
||
local config = self.config | ||
local services_in_use = utils.get_nacos_services(config.id) | ||
local service_names = {} | ||
for _, serv in ipairs(services_in_use) do | ||
if self.stop_flag then | ||
core.log.error("nacos client is exited, id: ", config.id) | ||
return | ||
end | ||
local key = utils.generate_key(serv.id, serv.namespace_id, | ||
serv.group_name, serv.name) | ||
service_names[key] = true | ||
local nodes = self:fetch_instances(serv) | ||
if #nodes > 0 then | ||
local content = core.json.encode(nodes) | ||
local key = key | ||
nacos_dict:set(key, content) | ||
end | ||
end | ||
-- delete unused service names | ||
local keys = nacos_dict:get_keys(0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
According to the OpenResty documentation, I realized that |
||
for _, key in ipairs(keys) do | ||
if core.string.has_prefix(key, config.id) and not service_names[key] then | ||
nacos_dict:delete(key) | ||
end | ||
end | ||
ngx_timer_at(self.config.fetch_interval, self:fetch_full_registry()) | ||
end | ||
end | ||
|
||
|
||
local function stop(self) | ||
self.stop_flag = true | ||
|
||
if self.checker then | ||
self.checker:clear() | ||
end | ||
end | ||
|
||
|
||
local function start(self) | ||
ngx_timer_at(0, self:fetch_full_registry()) | ||
end | ||
|
||
|
||
function _M.new(config) | ||
local version = ngx.md5(core.json.encode(config, true)) | ||
|
||
local client = { | ||
id = config.id, | ||
version = version, | ||
config = config, | ||
stop_flag = false, | ||
|
||
start = start, | ||
stop = stop, | ||
fetch_instances = fetch_instances, | ||
fetch_full_registry = fetch_full_registry, | ||
Comment on lines
+264
to
+267
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. According to APISIX convention, you should use the meta table and |
||
} | ||
|
||
if config.check then | ||
local health_check = require("resty.healthcheck") | ||
local checker = health_check.new({ | ||
name = config.id, | ||
shm_name = "nacos", | ||
checks = config.check | ||
}) | ||
|
||
local ok, err = checker:add_target(config.check.active.host, | ||
config.check.active.port, nil, false) | ||
if not ok then | ||
core.log.error("failed to add health check target", | ||
core.json.encode(config), " err: ", err) | ||
else | ||
core.log.info("success to add health checker, id ", config.id, | ||
" host ", config.check.active.host, " port ", config.check.active.port) | ||
client.checker = checker | ||
end | ||
end | ||
|
||
return client | ||
end | ||
|
||
return _M |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems unsafe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its okay to allow for non tls traffic for service discovery. Eureka service discovery code also has the same logic.