-
Notifications
You must be signed in to change notification settings - Fork 2.6k
feat: support lua proxy upstream #12086
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
e593091
d59dbb7
e708e62
8a289ed
981a7a8
6c40320
6ed4f52
0f83314
7de4631
54bf2de
90e0620
e43eeb3
89ff772
5f92bc7
48ec69e
00e1d09
052fc8f
1294721
2f3bfc9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -550,6 +550,12 @@ function _M.handle_upstream(api_ctx, route, enable_websocket) | |
|
||
set_upstream_headers(api_ctx, server) | ||
|
||
-- lua proxy the request to upstream | ||
if api_ctx.lua_proxy_upstream then | ||
common_phase("before_proxy") | ||
return | ||
end | ||
|
||
-- run the before_proxy method in access phase first to avoid always reinit request | ||
common_phase("before_proxy") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. common_phase("before_proxy")
if api_ctx.lua_proxy_upstream then
return
end |
||
|
||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,165 @@ | ||||||
-- | ||||||
-- Licensed to the Apache Software Foundation (ASF) under one or more | ||||||
-- contributor license agreements. See the NOTICE file distributed with | ||||||
-- this work for additional information regarding copyright ownership. | ||||||
-- The ASF licenses this file to You under the Apache License, Version 2.0 | ||||||
-- (the "License"); you may not use this file except in compliance with | ||||||
-- the License. You may obtain a copy of the License at | ||||||
-- | ||||||
-- http://www.apache.org/licenses/LICENSE-2.0 | ||||||
-- | ||||||
-- Unless required by applicable law or agreed to in writing, software | ||||||
-- distributed under the License is distributed on an "AS IS" BASIS, | ||||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
-- See the License for the specific language governing permissions and | ||||||
-- limitations under the License. | ||||||
-- | ||||||
local _M = {} | ||||||
local core = require("apisix.core") | ||||||
local http = require("resty.http") | ||||||
|
||||||
local HTTP_GATEWAY_TIMEOUT = ngx.HTTP_GATEWAY_TIMEOUT | ||||||
local HTTP_INTERNAL_SERVER_ERROR = ngx.HTTP_INTERNAL_SERVER_ERROR | ||||||
|
||||||
|
||||||
local function handle_error(err) | ||||||
if core.string.find(err, "timeout") then | ||||||
return HTTP_GATEWAY_TIMEOUT | ||||||
end | ||||||
return HTTP_INTERNAL_SERVER_ERROR | ||||||
end | ||||||
|
||||||
|
||||||
local function build_request_opts(conf, ctx) | ||||||
-- Get upstream server | ||||||
local server = ctx.picked_server | ||||||
if not server then | ||||||
return nil, "no picked server" | ||||||
end | ||||||
|
||||||
local headers = core.request.headers(ctx) | ||||||
-- When content-length is cleared, the HTTP server will automatically calculate and | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
-- set the correct content length when sending the response. This ensures that the | ||||||
-- content length of the response matches the actual data sent, thereby avoiding mismatches. | ||||||
headers["content-length"] = nil | ||||||
|
||||||
-- Build request options | ||||||
local opts = { | ||||||
scheme = server.scheme or ctx.upstream_scheme or "http", | ||||||
host = server.domain or server.host, | ||||||
port = server.port, | ||||||
path = ctx.var.uri, | ||||||
query = ctx.var.args, | ||||||
method = core.request.get_method(), | ||||||
headers = headers, | ||||||
ssl_verify = conf.ssl_verify, | ||||||
keepalive = conf.keepalive, | ||||||
keepalive_timeout = conf.timeout, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This |
||||||
keepalive_pool = conf.keepalive_pool | ||||||
} | ||||||
|
||||||
-- Set upstream URI | ||||||
if ctx.var.upstream_uri ~= "" then | ||||||
opts.path = ctx.var.upstream_uri | ||||||
end | ||||||
|
||||||
-- Get request body | ||||||
local body, err = core.request.get_body() | ||||||
if err then | ||||||
core.log.error("failed to get request body: ", err) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why isn't there a return for this error? |
||||||
end | ||||||
if body then | ||||||
opts.body = body | ||||||
end | ||||||
|
||||||
return opts | ||||||
end | ||||||
|
||||||
|
||||||
local function read_response(ctx, res) | ||||||
local body_reader = res.body_reader | ||||||
if not body_reader then | ||||||
core.log.error("failed to get response body reader") | ||||||
return HTTP_INTERNAL_SERVER_ERROR | ||||||
end | ||||||
|
||||||
local content_type = res.headers["Content-Type"] | ||||||
core.response.set_header("Content-Type", content_type) | ||||||
|
||||||
-- TODO: support event stream | ||||||
if content_type and core.string.find(content_type, "text/event-stream") then | ||||||
core.log.error("event stream is not supported") | ||||||
return HTTP_INTERNAL_SERVER_ERROR | ||||||
end | ||||||
Comment on lines
+89
to
+93
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This already has example code in ai-proxy. |
||||||
|
||||||
local raw_res_body, err = res:read_body() | ||||||
if err then | ||||||
core.log.error("failed to read response body: ", err) | ||||||
return handle_error(err) | ||||||
end | ||||||
|
||||||
return res.status, raw_res_body | ||||||
end | ||||||
|
||||||
|
||||||
function _M.request(conf, ctx) | ||||||
-- Build request options | ||||||
local opts, err = build_request_opts(conf, ctx) | ||||||
if err then | ||||||
core.log.error("failed to build request options: ", err) | ||||||
return HTTP_INTERNAL_SERVER_ERROR | ||||||
end | ||||||
|
||||||
-- Create HTTP client | ||||||
local httpc, err = http.new() | ||||||
if err then | ||||||
return nil, "failed to create http client: " .. err | ||||||
end | ||||||
httpc:set_timeout(opts.timeout) | ||||||
|
||||||
-- Connect to upstream | ||||||
local ok, err = httpc:connect({ | ||||||
scheme = opts.scheme, | ||||||
host = opts.host, | ||||||
port = opts.port, | ||||||
ssl_verify = opts.ssl_verify, | ||||||
ssl_server_name = opts.host, | ||||||
pool_size = opts.keepalive, | ||||||
}) | ||||||
|
||||||
if not ok then | ||||||
return nil, "failed to connect to upstream: " .. err | ||||||
end | ||||||
|
||||||
-- Prepare request parameters | ||||||
local params = { | ||||||
method = opts.method, | ||||||
headers = opts.headers, | ||||||
keepalive = opts.keepalive, | ||||||
ssl_verify = opts.ssl_verify, | ||||||
path = opts.path, | ||||||
query = opts.query, | ||||||
body = opts.body | ||||||
} | ||||||
|
||||||
-- Send request | ||||||
local res, err = httpc:request(params) | ||||||
if err then | ||||||
return nil, err | ||||||
end | ||||||
|
||||||
-- Handle response | ||||||
local code, body = read_response(ctx, res) | ||||||
|
||||||
-- Set keepalive for connection reuse | ||||||
if opts.keepalive then | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do I not need to close the connection when the keepalive is false? |
||||||
local _, err = httpc:set_keepalive(opts.keepalive_timeout, opts.keepalive_pool) | ||||||
if err then | ||||||
core.log.error("failed to keepalive connection: ", err) | ||||||
end | ||||||
end | ||||||
|
||||||
return code, body | ||||||
end | ||||||
|
||||||
return _M |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,11 @@ local ngx = ngx | |
local core = require("apisix.core") | ||
local plugin = require("apisix.plugin") | ||
local upstream = require("apisix.upstream") | ||
local http = require("resty.http") | ||
local lua_proxy_request = require("apisix.lua_proxy").request | ||
local plugin_lua_body_filter = require("apisix.lua_proxy").lua_body_filter | ||
|
||
local HTTP_INTERNAL_SERVER_ERROR = ngx.HTTP_INTERNAL_SERVER_ERROR | ||
|
||
local schema = { | ||
type = "object", | ||
|
@@ -85,6 +90,10 @@ function _M.access(conf, ctx) | |
core.log.warn("plugin access phase, conf: ", core.json.encode(conf)) | ||
-- return 200, {message = "hit example plugin"} | ||
|
||
if conf.lua_proxy_upstream then | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a feature PR that has added plugin configuration and requires updating schema. |
||
ctx.lua_proxy_upstream = true | ||
end | ||
|
||
if not conf.ip then | ||
return | ||
end | ||
|
@@ -98,7 +107,7 @@ function _M.access(conf, ctx) | |
|
||
local ok, err = upstream.check_schema(up_conf) | ||
if not ok then | ||
return 500, err | ||
return HTTP_INTERNAL_SERVER_ERROR, err | ||
end | ||
|
||
local matched_route = ctx.matched_route | ||
|
@@ -107,6 +116,49 @@ function _M.access(conf, ctx) | |
return | ||
end | ||
|
||
|
||
function _M.before_proxy(conf, ctx) | ||
core.log.warn("plugin before_proxy phase, conf: ", core.json.encode(conf)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not a warm level log, please use core.json.delay_encode |
||
|
||
if ctx.lua_proxy_upstream then | ||
local status, body = lua_proxy_request(conf, ctx) | ||
if status ~= 200 then | ||
return status, body | ||
end | ||
core.log.warn("lua proxy upstream response: ", core.json.encode(body)) | ||
plugin_lua_body_filter(conf, ctx, body) | ||
end | ||
end | ||
|
||
|
||
function _M.lua_body_filter(conf, ctx, body) | ||
core.log.warn("plugin lua_body_filter phase, conf: ", core.json.encode(conf)) | ||
core.log.warn("plugin lua_body_filter phase, body: ", core.json.encode(body)) | ||
Comment on lines
+135
to
+136
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can assert those log in test cases to confirm the body come from upstream is passed to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
|
||
local httpc, err = http.new() | ||
if err then | ||
core.log.error("failed to create http client: ", err) | ||
return HTTP_INTERNAL_SERVER_ERROR | ||
end | ||
|
||
local res, err = httpc:request_uri(conf.request_uri, { | ||
method = conf.method, | ||
}) | ||
if err then | ||
core.log.error("failed to request in lua_body_filter: ", err) | ||
return HTTP_INTERNAL_SERVER_ERROR | ||
end | ||
|
||
local res_body, err = core.json.decode(res.body) | ||
if err then | ||
core.log.error("failed to decode response body: ", err) | ||
return HTTP_INTERNAL_SERVER_ERROR | ||
end | ||
|
||
return res.status, res_body | ||
end | ||
|
||
|
||
function _M.header_filter(conf, ctx) | ||
core.log.warn("plugin header_filter phase, conf: ", core.json.encode(conf)) | ||
end | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -339,3 +339,56 @@ plugin body_filter phase, eof: false | |
plugin delayed_body_filter phase, eof: false | ||
plugin body_filter phase, eof: true | ||
plugin delayed_body_filter phase, eof: true | ||
|
||
|
||
|
||
=== TEST 14: lua body filter | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to add more test cases that combine example-plugin with other plugins like proxy-rewrite, which modify downstream requests. This will help verify that the new Lua HTTP client way works with existing plugins in the rewrite/access phase. |
||
--- config | ||
location /t { | ||
content_by_lua_block { | ||
local t = require("lib.test_admin").test | ||
local code, body = t('/apisix/admin/routes/1', | ||
ngx.HTTP_PUT, | ||
[[{ | ||
"uri": "/anything", | ||
"plugins": { | ||
"example-plugin": { | ||
"i": 0, | ||
"lua_proxy_upstream": true, | ||
"request_uri": "http://httpbin.org/get", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rename this |
||
"method": "GET" | ||
} | ||
}, | ||
"upstream": { | ||
"type": "roundrobin", | ||
"nodes": { | ||
"httpbin.org:80": 1 | ||
} | ||
} | ||
}]] | ||
) | ||
|
||
local code, body, actual_body = t("/anything", | ||
ngx.HTTP_POST, | ||
"some body", | ||
nil, | ||
{ | ||
["Content-Type"] = "text/plain", | ||
} | ||
) | ||
local json = require("cjson.safe") | ||
local response_data = json.decode(actual_body) | ||
|
||
-- check the request uri is http://httpbin.org/get, not upstream uri http://httpbin.org/anything | ||
-- which means the response body is changed | ||
if response_data.url == 'http://httpbin.org/get' then | ||
ngx.say("passed") | ||
else | ||
ngx.say(actual_body) | ||
end | ||
} | ||
} | ||
--- request | ||
GET /t | ||
--- response_body | ||
passed |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about the naming of the existing variables
bypass_nginx_upstream
andlua_proxy_upstream
, and felt that their meanings are unclear. I think we can design them as two:bypass_nginx_upstream
to indicate bypassing nginx's upstream modulebypass_balancer
to indicate bypassing the existing balancer logiccc @membphis What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with this