Skip to content

Commit 45d2021

Browse files
authored
Merge pull request #22 from antlabs/dev
Dev
2 parents 42e9a9b + a4687b1 commit 45d2021

20 files changed

+368
-35
lines changed

autobahn/autobahn-server.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,13 @@ type handler struct {
5555
}
5656

5757
// 运行在业务线程
58-
func (h *handler) echo(w http.ResponseWriter, r *http.Request) {
58+
func (h *handler) echoUnstream(w http.ResponseWriter, r *http.Request) {
5959
opts := []greatws.ServerOption{
6060
greatws.WithServerReplyPing(),
6161
greatws.WithServerDecompression(),
6262
greatws.WithServerIgnorePong(),
6363
greatws.WithServerCallback(&echoHandler{}),
64+
greatws.WithServerUnstreamMode(),
6465
greatws.WithServerEnableUTF8Check(),
6566
greatws.WithServerReadTimeout(5 * time.Second),
6667
greatws.WithServerMultiEventLoop(h.m),
@@ -103,6 +104,29 @@ func (h *handler) echoRunInIo(w http.ResponseWriter, r *http.Request) {
103104

104105
// 使用stream模式运行, 一个websocket一个go程
105106
func (h *handler) echoRunStream(w http.ResponseWriter, r *http.Request) {
107+
opts := []greatws.ServerOption{
108+
greatws.WithServerReplyPing(),
109+
greatws.WithServerDecompression(),
110+
greatws.WithServerIgnorePong(),
111+
greatws.WithServerCallback(&echoHandler{}),
112+
greatws.WithServerEnableUTF8Check(),
113+
greatws.WithServerReadTimeout(5 * time.Second),
114+
greatws.WithServerMultiEventLoop(h.m),
115+
greatws.WithServerCallbackInEventLoop(),
116+
}
117+
118+
if *runInEventLoop {
119+
opts = append(opts, greatws.WithServerCallbackInEventLoop())
120+
}
121+
122+
c, err := greatws.Upgrade(w, r, opts...)
123+
if err != nil {
124+
slog.Error("Upgrade fail:", "err", err.Error())
125+
}
126+
_ = c
127+
}
128+
129+
func (h *handler) echoRunStream2(w http.ResponseWriter, r *http.Request) {
106130
opts := []greatws.ServerOption{
107131
greatws.WithServerReplyPing(),
108132
greatws.WithServerDecompression(),
@@ -191,9 +215,10 @@ func main() {
191215
}
192216
}()
193217
mux := &http.ServeMux{}
194-
mux.HandleFunc("/autobahn", h.echo)
218+
mux.HandleFunc("/autobahn-unstream", h.echoUnstream)
195219
mux.HandleFunc("/autobahn-io", h.echoRunInIo)
196220
mux.HandleFunc("/autobahn-stream", h.echoRunStream)
221+
mux.HandleFunc("/autobahn-stream2", h.echoRunStream2)
197222
mux.HandleFunc("/autobahn-parse-loop", h.echoRunInParseLoop)
198223

199224
rawTCP, err := net.Listen("tcp", ":9001")

autobahn/config/fuzzingclient.json

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
{
22
"outdir": "./report/",
33
"servers": [
4+
{
5+
"agent": "non-tls-stream2",
6+
"url": "ws://127.0.0.1:9001/autobahn-stream2",
7+
"options": {
8+
"version": 18
9+
}
10+
},
411
{
512
"agent": "non-tls-parseloop",
613
"url": "ws://127.0.0.1:9001/autobahn-parse-loop",
@@ -10,7 +17,7 @@
1017
},
1118
{
1219
"agent": "non-tls-taskbind",
13-
"url": "ws://127.0.0.1:9001/autobahn",
20+
"url": "ws://127.0.0.1:9001/autobahn-unstream",
1421
"options": {
1522
"version": 18
1623
}
@@ -31,11 +38,10 @@
3138
}
3239
],
3340
"cases": [
34-
"*"
41+
"*"
3542
],
3643
"exclude-cases": [
3744
""
3845
],
3946
"exclude-agent-cases": {}
40-
}
41-
47+
}

common_options.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -294,17 +294,19 @@ func WithClientCallbackInEventLoop() ClientOption {
294294
}
295295
}
296296

297-
// 19.1 配置服务端使用stream模式处理请求,该模式一个连接会独占一个go程,如果你的请求对时序有要求,可以使用这个模式
297+
// 默认模式
298+
// 19.1 配置服务端使用stream模式处理请求,go程会复用,保证OnMessage的处理是有序的
298299
func WithServerStreamMode() ServerOption {
299300
return func(o *ConnOption) {
300-
o.runInGoTask = "stream"
301+
o.runInGoTask = "stream2"
301302
}
302303
}
303304

304-
// 19.2 配置客户端使用stream模式处理请求,该模式一个连接会独占一个go程,如果你的请求对时序有要求,可以使用这个模式
305+
// 默认模式
306+
// 19.2 配置客户端使用stream模式处理请求,go程会复用,保证OnMessage的处理是有序的
305307
func WithClientStreamMode() ClientOption {
306308
return func(o *DialOption) {
307-
o.runInGoTask = "stream"
309+
o.runInGoTask = "stream2"
308310
}
309311
}
310312

@@ -326,6 +328,21 @@ func WithClientCustomTaskMode(taskName string) ClientOption {
326328
}
327329
}
328330

331+
// 20.1 配置服务端使用unstream模式处理请求,go程会复用, 但是不保证顺序性,如果你的请求对时序有要求,请不要使用这个模式,
332+
func WithServerUnstreamMode() ServerOption {
333+
return func(o *ConnOption) {
334+
o.runInGoTask = "unstream"
335+
}
336+
}
337+
338+
// 默认stream2,忽略这个API
339+
// 19.2 配置客户端使用stream模式处理请求,go程会复用,但是不保证顺序性,如果你的请求对时序有要求,请不要使用这个模式,
340+
func WithClientUnstreamMode() ClientOption {
341+
return func(o *DialOption) {
342+
o.runInGoTask = "unstream"
343+
}
344+
}
345+
329346
// last 配置event
330347
func WithServerMultiEventLoop(m *MultiEventLoop) ServerOption {
331348
return func(o *ConnOption) {

config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,5 +58,5 @@ func (c *Config) defaultSetting() {
5858
c.tcpNoDelay = true
5959
// 对于text消息,默认不检查text是utf8字符
6060
c.utf8Check = func(b []byte) bool { return true }
61-
c.runInGoTask = "unstream"
61+
c.runInGoTask = "stream2" //默认使用stream2模块
6262
}

event_loop.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"sync"
1919
"sync/atomic"
2020
"time"
21+
22+
"github.com/antlabs/greatws/task/driver"
2123
)
2224

2325
type evFlag int
@@ -45,8 +47,10 @@ func CreateEventLoop(setSize int, flag evFlag, parent *MultiEventLoop) (e *Event
4547
parent: parent,
4648
}
4749

50+
var c driver.Conf
51+
c.Log = parent.Logger
4852
// 初始化任务池
49-
e.localTask = newSelectTask(parent.configTask.initCount, parent.configTask.min, parent.configTask.max)
53+
e.localTask = newSelectTask(parent.ctx, parent.configTask.initCount, parent.configTask.min, parent.configTask.max, &c)
5054

5155
// TODO+
5256
// e.localTask.taskConfig = e.parent.configTask.taskConfig

multi_event_loops.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package greatws
1515

1616
import (
17+
"context"
1718
"log/slog"
1819
"os"
1920
"runtime"
@@ -22,6 +23,7 @@ import (
2223

2324
_ "github.com/antlabs/greatws/task/io"
2425
_ "github.com/antlabs/greatws/task/stream"
26+
_ "github.com/antlabs/greatws/task/stream2"
2527
_ "github.com/antlabs/greatws/task/unstream"
2628
)
2729

@@ -59,6 +61,8 @@ type MultiEventLoop struct {
5961
*slog.Logger
6062

6163
evLoopStart uint32
64+
65+
ctx context.Context
6266
}
6367

6468
var (
@@ -121,13 +125,13 @@ func NewMultiEventLoopMust(opts ...EvOption) *MultiEventLoop {
121125
panic(err)
122126
}
123127

124-
m.Logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: m.level}))
125128
return m
126129
}
127130

128131
// 创建一个多路事件循环
129132
func NewMultiEventLoop(opts ...EvOption) (e *MultiEventLoop, err error) {
130133
m := &MultiEventLoop{}
134+
m.Logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: m.level}))
131135
m.initDefaultSetting()
132136
for _, o := range opts {
133137
o(m)
@@ -137,6 +141,7 @@ func NewMultiEventLoop(opts ...EvOption) (e *MultiEventLoop, err error) {
137141
if *m.parseInParseLoop {
138142
m.parseLoop = newTaskParse()
139143
}
144+
m.ctx = context.Background()
140145
m.loops = make([]*EventLoop, m.numLoops)
141146

142147
for i := 0; i < m.numLoops; i++ {

select_task.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,24 @@
1313
// limitations under the License.
1414
package greatws
1515

16-
import "github.com/antlabs/greatws/task/driver"
16+
import (
17+
"context"
18+
19+
"github.com/antlabs/greatws/task/driver"
20+
)
1721

1822
type selectTask struct {
1923
taskDriverName string
2024
task driver.Tasker
2125
}
2226
type selectTasks []selectTask
2327

24-
func newSelectTask(initCount, min, max int) []selectTask {
28+
func newSelectTask(ctx context.Context, initCount, min, max int, c *driver.Conf) []selectTask {
2529

2630
all := driver.GetAllRegister()
2731
rv := make([]selectTask, 0, len(all))
2832
for _, val := range all {
29-
task := val.Driver.New(initCount, min, max)
33+
task := val.Driver.New(ctx, initCount, min, max, c)
3034
rv = append(rv, selectTask{
3135
taskDriverName: val.Name,
3236
task: task,

task/README.md

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
11
## greatws task模块
22

3-
43
## 设计目的
4+
55
尽量让task 变成可插拔的模块,可以方便的扩展和替换以及测试第三方go程池的任务模块。
66

7-
## io 模块
8-
task 运行在io go程里面,适配轻量级任务,比如proxy,或者nginx,redis这种,基本不访问第三方中间件,比如mysql或者别的业务服务,只有数据包的加工
7+
## stream2 模块(默认)
8+
9+
task 会分解到不同go程运行,适合对数据包的加工有顺序要求的场景,比如asr识别
910

1011
## stream 模块
12+
1113
task 会独占一个go程, 适合对数据包的加工有顺序要求的场景,比如asr识别这种
1214

13-
## unstream模块
14-
task 只管并发执行,不管有序性,比如推送/红点/点赞之类的。
15+
## io 模块
16+
17+
task 运行在io go程里面,适配轻量级任务,比如proxy,或者nginx,redis这种,基本不访问第三方中间件,比如mysql或者别的业务服务,只有数据包的加工
18+
19+
## unstream模块(实验)
20+
21+
task 只管并发执行,不管有序性

task/driver/conf.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright 2023-2024 antlabs. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package driver
16+
17+
import "log/slog"
18+
19+
type Conf struct {
20+
Log *slog.Logger
21+
}

task/driver/error.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
// Copyright 2023-2024 antlabs. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
114
package driver
215

316
import "errors"

task/driver/register.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
// Copyright 2023-2024 antlabs. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
114
package driver
215

316
import (

task/driver/tasker.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,13 @@
1414

1515
package driver
1616

17+
import (
18+
"context"
19+
)
20+
1721
// 初始化一个go程池
1822
type TaskDriver interface {
19-
New(initCount, min, max int) Tasker
23+
New(ctx context.Context, initCount, min, max int, c *Conf) Tasker
2024
}
2125

2226
// 某个池的实例

task/io/task_io.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,27 @@
1111
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
14-
package greatws
14+
package io
1515

16-
import "github.com/antlabs/greatws/task/driver"
16+
import (
17+
"context"
18+
19+
"github.com/antlabs/greatws/task/driver"
20+
)
1721

1822
func init() {
1923
driver.Register("io", &taskIo{})
2024
}
2125

22-
var _ driver.TaskExecutor = (*taskIo)(nil)
2326
var _ driver.TaskDriver = (*taskIo)(nil)
2427
var _ driver.Tasker = (*taskIo)(nil)
28+
var _ driver.TaskExecutor = (*taskIo)(nil)
2529

2630
type taskIo struct{}
2731

2832
func (t *taskIo) GetGoroutines() int { return 0 } // 获取goroutine数
2933

30-
func (t *taskIo) New(initCount, min, max int) driver.Tasker {
34+
func (t *taskIo) New(ctx context.Context, initCount, min, max int, c *driver.Conf) driver.Tasker {
3135
return t
3236
}
3337

0 commit comments

Comments
 (0)