Skip to content

Commit 233a785

Browse files
committed
feat: add lago plugin
1 parent 7898012 commit 233a785

File tree

1 file changed

+229
-0
lines changed

1 file changed

+229
-0
lines changed

apisix/plugins/lago.lua

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
--
2+
-- Licensed to the Apache Software Foundation (ASF) under one or more
3+
-- contributor license agreements. See the NOTICE file distributed with
4+
-- this work for additional information regarding copyright ownership.
5+
-- The ASF licenses this file to You under the Apache License, Version 2.0
6+
-- (the "License"); you may not use this file except in compliance with
7+
-- the License. You may obtain a copy of the License at
8+
--
9+
-- http://www.apache.org/licenses/LICENSE-2.0
10+
--
11+
-- Unless required by applicable law or agreed to in writing, software
12+
-- distributed under the License is distributed on an "AS IS" BASIS,
13+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
-- See the License for the specific language governing permissions and
15+
-- limitations under the License.
16+
--
17+
local pairs = pairs
18+
local ipairs = ipairs
19+
local tostring = tostring
20+
local math_random = math.random
21+
local table_insert = table.insert
22+
local ngx = ngx
23+
24+
local new_tab = require("table.new")
25+
local http = require("resty.http")
26+
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
27+
local log_util = require("apisix.utils.log-util")
28+
local core = require("apisix.core")
29+
local str_format = core.string.format
30+
31+
local plugin_name = "lago"
32+
local batch_processor_manager = bp_manager_mod.new("lago logger")
33+
34+
local schema = {
35+
type = "object",
36+
properties = {
37+
-- core configurations
38+
endpoint_addrs = {
39+
type = "array",
40+
minItems = 1,
41+
items = core.schema.uri_def,
42+
description = "Lago API address, like https://api.getlago.com, "
43+
.. "it supports both cloud and self-hosted, "
44+
.. "one of them is randomly selected when configured as more than one",
45+
},
46+
endpoint_uri = {
47+
type = "string",
48+
minLength = 1,
49+
default = "/api/v1/events/batch",
50+
description = "Lago API endpoint, it needs to be set to the batch send endpoint",
51+
},
52+
token = {
53+
type = "string",
54+
description = "Lago API API key, create one for your organization on dashboard"
55+
},
56+
event_transaction_id = {
57+
type = "string",
58+
description = "Event's transaction ID, it is used to identify and de-duplicate"
59+
.. " the event, it supports string templates containing APISIX and"
60+
.. " NGINX variables, like \"req_${request_id}\", which allows you"
61+
.. " to use values returned by upstream services or request-id"
62+
.. " plugin integration",
63+
},
64+
event_subscription_id = {
65+
type = "string",
66+
description = "Event's subscription ID, which is automatically generated or"
67+
.. " specified by you when you assign the plan to the customer on"
68+
.. " Lago, used to associate API consumption to a customer subscription,"
69+
.. " it supports string templates containing APISIX and NGINX variables,"
70+
.. " like \"cus_${consumer_name}\", which allows you to use values"
71+
.. " returned by upstream services or APISIX consumer",
72+
},
73+
event_code = {
74+
type = "string",
75+
description = "Lago billable metric's code for associating an event to a specified"
76+
.. "billable item",
77+
},
78+
event_properties = {
79+
type = "object",
80+
patternProperties = {
81+
[".*"] = {
82+
type = "string",
83+
minLength = 1,
84+
},
85+
},
86+
description = "Event's properties, used to attach information to an event, this"
87+
.. " allows you to send certain information on a request to Lago, such"
88+
.. " as sending HTTP status to take a failed request off the bill, or"
89+
.. " sending the AI token consumption in the response body for accurate"
90+
.. " billing, its keys are fixed strings and its values can be string"
91+
.. " templates containing APISIX and NGINX variables, like \"${status}\""
92+
},
93+
94+
-- connection layer configurations
95+
ssl_verify = {type = "boolean", default = false},
96+
timeout = {
97+
type = "integer",
98+
minimum = 1,
99+
maximum = 60000,
100+
default = 3000,
101+
description = "timeout in milliseconds",
102+
},
103+
keepalive = {type = "boolean", default = true},
104+
keepalive_timeout = {
105+
type = "integer",
106+
minimum = 1000,
107+
default = 60000,
108+
description = "keepalive timeout in milliseconds",
109+
},
110+
keepalive_pool = {type = "integer", minimum = 1, default = 5},
111+
},
112+
required = {"endpoint_addrs", "token"}
113+
}
114+
schema = batch_processor_manager:wrap_schema(schema)
115+
116+
-- According to https://getlago.com/docs/api-reference/events/batch, the maximum batch size is 100,
117+
-- so we have to override the default batch size to make it work out of the box,the plugin does
118+
-- not set a maximum limit, so if Lago relaxes the limit, then user can modify it
119+
-- to a larger batch size
120+
-- This does not affect other plugins, schema is appended after deep copy
121+
schema.properties.batch_max_size.default = 100
122+
123+
124+
local _M = {
125+
version = 0.1,
126+
priority = 414,
127+
name = plugin_name,
128+
schema = schema,
129+
}
130+
131+
132+
function _M.check_schema(conf, schema_type)
133+
local check = {"endpoint_addrs"}
134+
core.utils.check_https(check, conf, plugin_name)
135+
core.utils.check_tls_bool({"ssl_verify"}, conf, plugin_name)
136+
137+
return core.schema.check(schema, conf)
138+
end
139+
140+
141+
local function send_http_data(conf, data)
142+
local params = {
143+
headers = {
144+
["Content-Type"] = "application/json",
145+
["Authorization"] = "Bearer " .. conf.tenant_id,
146+
},
147+
keepalive = conf.keepalive,
148+
ssl_verify = conf.ssl_verify,
149+
method = "POST",
150+
body = core.json.encode(data)
151+
}
152+
153+
if conf.keepalive then
154+
params.keepalive_timeout = conf.keepalive_timeout
155+
params.keepalive_pool = conf.keepalive_pool
156+
end
157+
158+
local httpc, err = http.new()
159+
if not httpc then
160+
return false, str_format("create http client error: %s", err)
161+
end
162+
httpc:set_timeout(conf.timeout)
163+
164+
-- select an random endpoint and build URL
165+
local endpoint_url = conf.endpoint_addrs[math_random(#conf.endpoint_addrs)]..conf.endpoint_uri
166+
local res, err = httpc:request_uri(endpoint_url, params)
167+
if not res then
168+
return false, err
169+
end
170+
171+
if res.status >= 300 then
172+
return false, str_format("lago api returned status: %d, body: %s",
173+
res.status, res.body or "")
174+
end
175+
176+
return true
177+
end
178+
179+
180+
function _M.log(conf, ctx)
181+
-- build usage event
182+
local event_transaction_id, err = core.utils.resolve_var(conf.event_transaction_id, ctx.var)
183+
if err then
184+
core.log.error("failed to resolve event_transaction_id, event dropped: ", err)
185+
return
186+
end
187+
188+
local event_subscription_id, err = core.utils.resolve_var(conf.event_subscription_id, ctx.var)
189+
if err then
190+
core.log.error("failed to resolve event_subscription_id, event dropped: ", err)
191+
return
192+
end
193+
194+
local properties = conf.event_properties
195+
-- parsing possible variables in value
196+
for key, value in pairs(properties) do
197+
local new_val, err, n_resolved = core.utils.resolve_var(value, ctx.var)
198+
if not err and n_resolved > 0 then
199+
properties[key] = new_val
200+
end
201+
end
202+
203+
local entry = {
204+
transaction_id = event_transaction_id,
205+
external_subscription_id = event_subscription_id,
206+
code = conf.event_code,
207+
timestamp = ngx.req.start_time(),
208+
}
209+
210+
if #properties > 0 then -- properties is optional
211+
entry.properties = properties
212+
end
213+
214+
if batch_processor_manager:add_entry(conf, entry) then
215+
return
216+
end
217+
218+
-- generate a function to be executed by the batch processor
219+
local func = function(entries)
220+
return send_http_data(conf, {
221+
events = entries,
222+
})
223+
end
224+
225+
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
226+
end
227+
228+
229+
return _M

0 commit comments

Comments
 (0)