Skip to content

Commit b18a660

Browse files
committed
+limit+test code
1 parent dc87e87 commit b18a660

File tree

5 files changed

+333
-11
lines changed

5 files changed

+333
-11
lines changed

README.md

Lines changed: 181 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* 支持 epoll/kqueue
1616
* 低内存占用
1717
* 高tps
18+
* 对websocket的兼容性较高,完整实现rfc6455, rfc7692
1819

1920
# 暂不支持
2021

@@ -26,10 +27,33 @@
2627

2728
早期阶段,暂时不建议生产使用
2829

30+
## 内容
31+
* [安装](#Installation)
32+
* [例子](#example)
33+
* [net/http升级到websocket服务端](#net-http升级到websocket服务端)
34+
* [gin升级到websocket服务端](#gin升级到websocket服务端)
35+
* [客户端](#客户端)
36+
* [配置函数](#配置函数)
37+
* [客户端配置参数](#客户端配置)
38+
* [配置header](#配置header)
39+
* [配置握手时的超时时间](#配置握手时的超时时间)
40+
* [配置自动回复ping消息](#配置自动回复ping消息)
41+
* [配置客户端最大读取message](#配置客户端最大读message)
42+
* [服务配置参数](#服务端配置)
43+
* [配置服务自动回复ping消息](#配置服务自动回复ping消息)
44+
* [配置服务端最大读取message](#配置服务端最大读message)
2945
# 例子-服务端
30-
46+
### net http升级到websocket服务端
3147
```go
3248

49+
package main
50+
51+
import (
52+
"fmt"
53+
54+
"github.com/antlabs/greatws"
55+
)
56+
3357
type echoHandler struct{}
3458

3559
func (e *echoHandler) OnOpen(c *greatws.Conn) {
@@ -92,7 +116,163 @@ func main() {
92116
log.Println("non-tls server exit:", http.Serve(rawTCP, mux))
93117
}
94118
```
119+
[返回](#内容)
120+
121+
### gin升级到websocket服务端
122+
```go
123+
package main
124+
125+
import (
126+
"fmt"
127+
128+
"github.com/antlabs/greatws"
129+
"github.com/gin-gonic/gin"
130+
)
131+
132+
type handler struct{
133+
m *greatws.MultiEventLoop
134+
}
135+
136+
func (h *handler) OnOpen(c *greatws.Conn) {
137+
fmt.Printf("服务端收到一个新的连接")
138+
}
139+
140+
func (h *handler) OnMessage(c *greatws.Conn, op greatws.Opcode, msg []byte) {
141+
// 如果msg的生命周期不是在OnMessage中结束,需要拷贝一份
142+
// newMsg := make([]byte, len(msg))
143+
// copy(newMsg, msg)
144+
145+
fmt.Printf("收到客户端消息:%s\n", msg)
146+
c.WriteMessage(op, msg)
147+
// os.Stdout.Write(msg)
148+
}
149+
150+
func (h *handler) OnClose(c *greatws.Conn, err error) {
151+
fmt.Printf("服务端连接关闭:%v\n", err)
152+
}
153+
154+
func main() {
155+
r := gin.Default()
156+
var h handler
157+
h.m = greatws.NewMultiEventLoopMust(greatws.WithEventLoops(0), greatws.WithMaxEventNum(256), greatws.WithLogLevel(slog.LevelError)) // epoll, kqueue
158+
h.m.Start()
159+
160+
r.GET("/", func(c *gin.Context) {
161+
con, err := greatws.Upgrade(c.Writer, c.Request, greatws.WithServerCallback(h.m), greatws.WithServerMultiEventLoop(h.m))
162+
if err != nil {
163+
return
164+
}
165+
con.StartReadLoop()
166+
})
167+
r.Run()
168+
}
169+
```
170+
[返回](#内容)
171+
172+
### 客户端
173+
```go
174+
package main
175+
176+
import (
177+
"fmt"
178+
"time"
179+
180+
"github.com/antlabs/greatws"
181+
)
182+
183+
var m *greatws.MultiEventLoop
184+
type handler struct{}
185+
186+
func (h *handler) OnOpen(c *greatws.Conn) {
187+
fmt.Printf("客户端连接成功\n")
188+
}
189+
190+
func (h *handler) OnMessage(c *greatws.Conn, op greatws.Opcode, msg []byte) {
191+
// 如果msg的生命周期不是在OnMessage中结束,需要拷贝一份
192+
// newMsg := make([]byte, len(msg))
193+
// copy(newMsg, msg)
194+
195+
fmt.Printf("收到服务端消息:%s\n", msg)
196+
c.WriteMessage(op, msg)
197+
time.Sleep(time.Second)
198+
}
199+
200+
func (h *handler) OnClose(c *greatws.Conn, err error) {
201+
fmt.Printf("客户端端连接关闭:%v\n", err)
202+
}
203+
204+
func main() {
205+
m = greatws.NewMultiEventLoopMust(greatws.WithEventLoops(0), greatws.WithMaxEventNum(256), greatws.WithLogLevel(slog.LevelError)) // epoll, kqueue
206+
m.Start()
207+
c, err := greatws.Dial("ws://127.0.0.1:8080/", greatws.WithClientCallback(&handler{}), greatws.WithServerMultiEventLoop(h.m))
208+
if err != nil {
209+
fmt.Printf("连接失败:%v\n", err)
210+
return
211+
}
212+
213+
c.WriteMessage(opcode.Text, []byte("hello"))
214+
time.Sleep(time.Hour) //demo里面等待下OnMessage 看下执行效果,因为greatws.Dial和WriteMessage都是非阻塞的函数调用,不会卡住主go程
215+
}
216+
```
217+
[返回](#内容)
218+
## 配置函数
219+
### 客户端配置参数
220+
#### 配置header
221+
```go
222+
func main() {
223+
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientHTTPHeader(http.Header{
224+
"h1": "v1",
225+
"h2":"v2",
226+
}))
227+
}
228+
```
229+
[返回](#内容)
230+
#### 配置握手时的超时时间
231+
```go
232+
func main() {
233+
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientDialTimeout(2 * time.Second))
234+
}
235+
```
236+
[返回](#内容)
95237

238+
#### 配置自动回复ping消息
239+
```go
240+
func main() {
241+
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientReplyPing())
242+
}
243+
```
244+
[返回](#内容)
245+
#### 配置客户端最大读message
246+
```go
247+
// 限制客户端最大服务返回返回的最大包是1024,如果超过这个大小报错
248+
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientReadMaxMessage(1024))
249+
```
250+
[返回](#内容)
251+
### 服务端配置参数
252+
#### 配置服务自动回复ping消息
253+
```go
254+
func main() {
255+
c, err := greatws.Upgrade(w, r, greatws.WithServerReplyPing())
256+
if err != nil {
257+
fmt.Println("Upgrade fail:", err)
258+
return
259+
}
260+
}
261+
```
262+
[返回](#内容)
263+
264+
#### 配置服务端最大读message
265+
```go
266+
func main() {
267+
// 配置服务端读取客户端最大的包是1024大小, 超过该值报错
268+
c, err := greatws.Upgrade(w, r, greatws.WithServerReadMaxMessage(1024))
269+
if err != nil {
270+
fmt.Println("Upgrade fail:", err)
271+
return
272+
}
273+
}
274+
```
275+
[返回](#内容)
96276
## 100w websocket长链接测试
97277

98278
### e5 洋垃圾机器

common_options_test.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1728,4 +1728,117 @@ func Test_CommonOption(t *testing.T) {
17281728
t.Error("not run server:method fail")
17291729
}
17301730
})
1731+
1732+
t.Run("22.1.WithServerReadMaxMessage:local-Upgrade", func(t *testing.T) {
1733+
var tsort testServerOptionReadTimeout
1734+
1735+
tsort.err = make(chan error, 1)
1736+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1737+
c, err := Upgrade(w, r, WithServerCallback(&tsort), WithServerReadMaxMessage(1<<10), WithServerMultiEventLoop(m))
1738+
if err != nil {
1739+
t.Error(err)
1740+
}
1741+
c.StartReadLoop()
1742+
}))
1743+
1744+
defer ts.Close()
1745+
1746+
url := strings.ReplaceAll(ts.URL, "http", "ws")
1747+
con, err := Dial(url, WithClientMultiEventLoop(m), WithClientOnMessageFunc(func(c *Conn, mt Opcode, payload []byte) {
1748+
}))
1749+
if err != nil {
1750+
t.Error(err)
1751+
return
1752+
}
1753+
defer con.Close()
1754+
1755+
con.WriteMessage(Text, bytes.Repeat([]byte("1"), 1025))
1756+
select {
1757+
case d := <-tsort.err:
1758+
if d == nil {
1759+
t.Errorf("got:nil, need:error\n")
1760+
}
1761+
case <-time.After(1000 * time.Millisecond):
1762+
t.Errorf(" Test_ServerOption:WithServerReadMaxMessage timeout\n")
1763+
}
1764+
if atomic.LoadInt32(&tsort.run) != 1 {
1765+
t.Error("not run server:method fail")
1766+
}
1767+
})
1768+
1769+
t.Run("22.2.WithServerReadMaxMessage", func(t *testing.T) {
1770+
var tsort testServerOptionReadTimeout
1771+
1772+
upgrade := NewUpgrade(WithServerCallback(&tsort), WithServerReadMaxMessage(1<<10), WithServerMultiEventLoop(m))
1773+
tsort.err = make(chan error, 1)
1774+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1775+
c, err := upgrade.Upgrade(w, r)
1776+
if err != nil {
1777+
t.Error(err)
1778+
}
1779+
c.StartReadLoop()
1780+
}))
1781+
1782+
defer ts.Close()
1783+
1784+
url := strings.ReplaceAll(ts.URL, "http", "ws")
1785+
con, err := Dial(url, WithClientMultiEventLoop(m), WithClientOnMessageFunc(func(c *Conn, mt Opcode, payload []byte) {
1786+
}))
1787+
if err != nil {
1788+
t.Error(err)
1789+
return
1790+
}
1791+
defer con.Close()
1792+
1793+
con.WriteMessage(Text, bytes.Repeat([]byte("1"), 1025))
1794+
select {
1795+
case d := <-tsort.err:
1796+
if d == nil {
1797+
t.Errorf("got:nil, need:error\n")
1798+
}
1799+
case <-time.After(100 * time.Millisecond):
1800+
t.Errorf(" Test_ServerOption:WithServerReadTimeout timeout\n")
1801+
}
1802+
if atomic.LoadInt32(&tsort.run) != 1 {
1803+
t.Error("not run server:method fail")
1804+
}
1805+
})
1806+
1807+
t.Run("22.3.WithClientReadMaxMessage", func(t *testing.T) {
1808+
var tsort testServerOptionReadTimeout
1809+
1810+
upgrade := NewUpgrade(WithServerCallback(&tsort), WithServerReadTimeout(time.Millisecond*60), WithServerMultiEventLoop(m))
1811+
tsort.err = make(chan error, 1)
1812+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1813+
c, err := upgrade.Upgrade(w, r)
1814+
if err != nil {
1815+
t.Error(err)
1816+
}
1817+
c.WriteMessage(Binary, bytes.Repeat([]byte("1"), 1025))
1818+
c.StartReadLoop()
1819+
}))
1820+
1821+
defer ts.Close()
1822+
1823+
url := strings.ReplaceAll(ts.URL, "http", "ws")
1824+
con, err := Dial(url, WithClientMultiEventLoop(m), WithClientReadMaxMessage(1<<10), WithClientOnMessageFunc(func(c *Conn, mt Opcode, payload []byte) {
1825+
}))
1826+
if err != nil {
1827+
t.Error(err)
1828+
return
1829+
}
1830+
defer con.Close()
1831+
1832+
select {
1833+
case d := <-tsort.err:
1834+
if d == nil {
1835+
t.Errorf("got:nil, need:error\n")
1836+
}
1837+
case <-time.After(100 * time.Millisecond):
1838+
t.Errorf(" Test_ServerOption:WithServerReadTimeout timeout\n")
1839+
}
1840+
if atomic.LoadInt32(&tsort.run) != 1 {
1841+
t.Error("not run server:method fail")
1842+
}
1843+
})
17311844
}

conn_core.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package greatws
1717
import (
1818
"bytes"
1919
"encoding/binary"
20+
"errors"
2021
"fmt"
2122
"log/slog"
2223
"math/rand"
@@ -159,6 +160,10 @@ func (c *Conn) readHeader() (sucess bool, err error) {
159160
head = head[8:]
160161
}
161162

163+
if c.readMaxMessage > 0 && c.rh.PayloadLen > c.readMaxMessage {
164+
return false, TooBigMessage
165+
}
166+
162167
if c.rh.Mask {
163168
c.rh.MaskKey = binary.LittleEndian.Uint32(head[:4])
164169
}
@@ -493,14 +498,29 @@ func (c *Conn) processCallbackData(f frame.Frame2, payload *[]byte, rsv1 bool, d
493498
return false
494499
}
495500

501+
func (c *Conn) writeAndMaybeOnClose(err error) error {
502+
var sc *StatusCode
503+
defer func() {
504+
c.onCloseOnce.Do(&c.mu2, func() {
505+
c.Callback.OnClose(c, err)
506+
})
507+
}()
508+
509+
if errors.As(err, &sc) {
510+
if err := c.WriteTimeout(opcode.Close, sc.toBytes(), 2*time.Second); err != nil {
511+
return err
512+
}
513+
}
514+
return nil
515+
}
516+
496517
func (c *Conn) writeErrAndOnClose(code StatusCode, userErr error) error {
497518
defer func() {
498519
c.onCloseOnce.Do(&c.mu2, func() {
499520
c.Callback.OnClose(c, userErr)
500521
})
501522
}()
502-
503-
if err := c.WriteTimeout(opcode.Close, statusCodeToBytes(code), 2*time.Second); err != nil {
523+
if err := c.WriteTimeout(opcode.Close, code.toBytes(), 2*time.Second); err != nil {
504524
return err
505525
}
506526

@@ -638,7 +658,7 @@ func (c *Conn) WriteControl(op Opcode, data []byte) (err error) {
638658
}
639659

640660
func (c *Conn) WriteCloseTimeout(sc StatusCode, t time.Duration) (err error) {
641-
buf := statusCodeToBytes(sc)
661+
buf := sc.toBytes()
642662
return c.WriteTimeout(opcode.Close, buf, t)
643663
}
644664

conn_unix.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,11 @@ fail:
442442
bytespool.PutBytes(c.rbuf)
443443
c.rbuf = nil
444444
}
445+
446+
if err != nil {
447+
// 如果是status code类型,要回写符合rfc的close包
448+
c.writeAndMaybeOnClose(err)
449+
}
445450
return err
446451
}
447452

0 commit comments

Comments
 (0)