Skip to content

feat: support lua proxy upstream #12086

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 19 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,12 @@ function _M.handle_upstream(api_ctx, route, enable_websocket)

set_upstream_headers(api_ctx, server)

-- lua proxy the request to upstream
if api_ctx.lua_proxy_upstream then
Copy link
Member

@nic-6443 nic-6443 Apr 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about the naming of the existing variables bypass_nginx_upstream and lua_proxy_upstream, and felt that their meanings are unclear. I think we can design them as two:

  • bypass_nginx_upstream to indicate bypassing nginx's upstream module
  • bypass_balancer to indicate bypassing the existing balancer logic
    cc @membphis What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this

common_phase("before_proxy")
return
end

-- run the before_proxy method in access phase first to avoid always reinit request
common_phase("before_proxy")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

common_phase("before_proxy")
if api_ctx.lua_proxy_upstream then
     return
end


Expand Down
165 changes: 165 additions & 0 deletions apisix/lua_proxy.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local _M = {}
local core = require("apisix.core")
local http = require("resty.http")

local HTTP_GATEWAY_TIMEOUT = ngx.HTTP_GATEWAY_TIMEOUT
local HTTP_INTERNAL_SERVER_ERROR = ngx.HTTP_INTERNAL_SERVER_ERROR


local function handle_error(err)
if core.string.find(err, "timeout") then
return HTTP_GATEWAY_TIMEOUT
end
return HTTP_INTERNAL_SERVER_ERROR
end


local function build_request_opts(conf, ctx)
-- Get upstream server
local server = ctx.picked_server
if not server then
return nil, "no picked server"
end

local headers = core.request.headers(ctx)
-- When content-length is cleared, the HTTP server will automatically calculate and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
-- When content-length is cleared, the HTTP server will automatically calculate and
-- When content-length is cleared, the HTTP client will automatically calculate and

-- set the correct content length when sending the response. This ensures that the
-- content length of the response matches the actual data sent, thereby avoiding mismatches.
headers["content-length"] = nil

-- Build request options
local opts = {
scheme = server.scheme or ctx.upstream_scheme or "http",
host = server.domain or server.host,
port = server.port,
path = ctx.var.uri,
query = ctx.var.args,
method = core.request.get_method(),
headers = headers,
ssl_verify = conf.ssl_verify,
keepalive = conf.keepalive,
keepalive_timeout = conf.timeout,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timeout is unclear.
The meanings of connection timeout and idle connection timeout are different.
I suggest using keepalive_timeout

keepalive_pool = conf.keepalive_pool
}

-- Set upstream URI
if ctx.var.upstream_uri ~= "" then
opts.path = ctx.var.upstream_uri
end

-- Get request body
local body, err = core.request.get_body()
if err then
core.log.error("failed to get request body: ", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't there a return for this error?

end
if body then
opts.body = body
end

return opts
end


local function read_response(ctx, res)
local body_reader = res.body_reader
if not body_reader then
core.log.error("failed to get response body reader")
return HTTP_INTERNAL_SERVER_ERROR
end

local content_type = res.headers["Content-Type"]
core.response.set_header("Content-Type", content_type)

-- TODO: support event stream
if content_type and core.string.find(content_type, "text/event-stream") then
core.log.error("event stream is not supported")
return HTTP_INTERNAL_SERVER_ERROR
end
Comment on lines +89 to +93
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This already has example code in ai-proxy.


local raw_res_body, err = res:read_body()
if err then
core.log.error("failed to read response body: ", err)
return handle_error(err)
end

return res.status, raw_res_body
end


function _M.request(conf, ctx)
-- Build request options
local opts, err = build_request_opts(conf, ctx)
if err then
core.log.error("failed to build request options: ", err)
return HTTP_INTERNAL_SERVER_ERROR
end

-- Create HTTP client
local httpc, err = http.new()
if err then
return nil, "failed to create http client: " .. err
end
httpc:set_timeout(opts.timeout)

-- Connect to upstream
local ok, err = httpc:connect({
scheme = opts.scheme,
host = opts.host,
port = opts.port,
ssl_verify = opts.ssl_verify,
ssl_server_name = opts.host,
pool_size = opts.keepalive,
})

if not ok then
return nil, "failed to connect to upstream: " .. err
end

-- Prepare request parameters
local params = {
method = opts.method,
headers = opts.headers,
keepalive = opts.keepalive,
ssl_verify = opts.ssl_verify,
path = opts.path,
query = opts.query,
body = opts.body
}

-- Send request
local res, err = httpc:request(params)
if err then
return nil, err
end

-- Handle response
local code, body = read_response(ctx, res)

-- Set keepalive for connection reuse
if opts.keepalive then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I not need to close the connection when the keepalive is false?

local _, err = httpc:set_keepalive(opts.keepalive_timeout, opts.keepalive_pool)
if err then
core.log.error("failed to keepalive connection: ", err)
end
end

return code, body
end

return _M
35 changes: 35 additions & 0 deletions apisix/plugin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1285,4 +1285,39 @@ function _M.run_global_rules(api_ctx, global_rules, phase_name)
end


function _M.lua_body_filter(data, api_ctx)
local plugins = api_ctx.plugins
if not plugins or #plugins == 0 then
return
end
for i = 1, #plugins, 2 do
local phase_func = plugins[i]["lua_body_filter"]
if phase_func then
local conf = plugins[i + 1]
if not meta_filter(api_ctx, plugins[i]["name"], conf)then
goto CONTINUE
end

run_meta_pre_function(conf, api_ctx, plugins[i]["name"])
local code, body = phase_func(conf, api_ctx, data)
if code or body then
if code >= 400 then
core.log.warn(plugins[i].name, " exits with http status code ", code)

if conf._meta and conf._meta.error_response then
-- Whether or not the original error message is output,
-- always return the configured message
-- so the caller can't guess the real error
body = conf._meta.error_response
end
end

return core.response.exit(code, body)
end
end

::CONTINUE::
end
end

return _M
54 changes: 53 additions & 1 deletion apisix/plugins/example-plugin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ local ngx = ngx
local core = require("apisix.core")
local plugin = require("apisix.plugin")
local upstream = require("apisix.upstream")
local http = require("resty.http")
local lua_proxy_request = require("apisix.lua_proxy").request
local plugin_lua_body_filter = require("apisix.lua_proxy").lua_body_filter

local HTTP_INTERNAL_SERVER_ERROR = ngx.HTTP_INTERNAL_SERVER_ERROR

local schema = {
type = "object",
Expand Down Expand Up @@ -85,6 +90,10 @@ function _M.access(conf, ctx)
core.log.warn("plugin access phase, conf: ", core.json.encode(conf))
-- return 200, {message = "hit example plugin"}

if conf.lua_proxy_upstream then
Copy link
Contributor

@AlinsRan AlinsRan Apr 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a feature PR that has added plugin configuration and requires updating schema.

ctx.lua_proxy_upstream = true
end

if not conf.ip then
return
end
Expand All @@ -98,7 +107,7 @@ function _M.access(conf, ctx)

local ok, err = upstream.check_schema(up_conf)
if not ok then
return 500, err
return HTTP_INTERNAL_SERVER_ERROR, err
end

local matched_route = ctx.matched_route
Expand All @@ -107,6 +116,49 @@ function _M.access(conf, ctx)
return
end


function _M.before_proxy(conf, ctx)
core.log.warn("plugin before_proxy phase, conf: ", core.json.encode(conf))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a warm level log, please use core.json.delay_encode


if ctx.lua_proxy_upstream then
local status, body = lua_proxy_request(conf, ctx)
if status ~= 200 then
return status, body
end
core.log.warn("lua proxy upstream response: ", core.json.encode(body))
plugin_lua_body_filter(conf, ctx, body)
end
end


function _M.lua_body_filter(conf, ctx, body)
core.log.warn("plugin lua_body_filter phase, conf: ", core.json.encode(conf))
core.log.warn("plugin lua_body_filter phase, body: ", core.json.encode(body))
Comment on lines +135 to +136
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can assert those log in test cases to confirm the body come from upstream is passed to lua_body_filter as expect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


local httpc, err = http.new()
if err then
core.log.error("failed to create http client: ", err)
return HTTP_INTERNAL_SERVER_ERROR
end

local res, err = httpc:request_uri(conf.request_uri, {
method = conf.method,
})
if err then
core.log.error("failed to request in lua_body_filter: ", err)
return HTTP_INTERNAL_SERVER_ERROR
end

local res_body, err = core.json.decode(res.body)
if err then
core.log.error("failed to decode response body: ", err)
return HTTP_INTERNAL_SERVER_ERROR
end

return res.status, res_body
end


function _M.header_filter(conf, ctx)
core.log.warn("plugin header_filter phase, conf: ", core.json.encode(conf))
end
Expand Down
53 changes: 53 additions & 0 deletions t/plugin/example.t
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,56 @@ plugin body_filter phase, eof: false
plugin delayed_body_filter phase, eof: false
plugin body_filter phase, eof: true
plugin delayed_body_filter phase, eof: true



=== TEST 14: lua body filter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add more test cases that combine example-plugin with other plugins like proxy-rewrite, which modify downstream requests. This will help verify that the new Lua HTTP client way works with existing plugins in the rewrite/access phase.

--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"uri": "/anything",
"plugins": {
"example-plugin": {
"i": 0,
"lua_proxy_upstream": true,
"request_uri": "http://httpbin.org/get",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename this request_uri to indicate it's used to change the rseponse body, it make me confused with upstream.nodes

"method": "GET"
}
},
"upstream": {
"type": "roundrobin",
"nodes": {
"httpbin.org:80": 1
}
}
}]]
)

local code, body, actual_body = t("/anything",
ngx.HTTP_POST,
"some body",
nil,
{
["Content-Type"] = "text/plain",
}
)
local json = require("cjson.safe")
local response_data = json.decode(actual_body)

-- check the request uri is http://httpbin.org/get, not upstream uri http://httpbin.org/anything
-- which means the response body is changed
if response_data.url == 'http://httpbin.org/get' then
ngx.say("passed")
else
ngx.say(actual_body)
end
}
}
--- request
GET /t
--- response_body
passed
Loading