@@ -37,11 +37,14 @@ const (
37
37
etDelWrite = uint32 (0 )
38
38
etResetRead = uint32 (0 )
39
39
40
- // 一次性触发
40
+ // 一次性触发, TODO: 这里要看下是否需要,还是垂直触发+overflow fd记录
41
41
etReadOneShot = uint32 (unix .EPOLLERR | unix .EPOLLHUP | unix .EPOLLRDHUP | unix .EPOLLPRI | unix .EPOLLIN | unix .EPOLLOUT | unix .EPOLLET | unix .EPOLLONESHOT )
42
42
etWriteOneShot = uint32 (etReadOneShot )
43
43
etDelWriteOneShot = uint32 (0 )
44
44
etResetReadOneShot = uint32 (etReadOneShot )
45
+
46
+ processWrite = uint32 (unix .EPOLLOUT )
47
+ processRead = uint32 (unix .EPOLLIN | unix .EPOLLRDHUP | unix .EPOLLHUP | unix .EPOLLERR )
45
48
)
46
49
47
50
type epollState struct {
@@ -56,6 +59,10 @@ type epollState struct {
56
59
resetEv uint32
57
60
}
58
61
62
+ func (e * epollState ) getMultiEventLoop () * MultiEventLoop {
63
+ return e .parent .parent
64
+ }
65
+
59
66
func getReadWriteDeleteReset (oneShot bool , et bool ) (uint32 , uint32 , uint32 , uint32 ) {
60
67
if oneShot {
61
68
return etReadOneShot , etWriteOneShot , etDelWriteOneShot , etResetReadOneShot
@@ -155,7 +162,7 @@ func (e *epollState) apiPoll(tv time.Duration) (retVal int, err error) {
155
162
}
156
163
157
164
retVal , err = unix .EpollWait (e .epfd , e .events , msec )
158
- e .parent . parent .addPollEvNum ()
165
+ e .getMultiEventLoop () .addPollEvNum ()
159
166
if err != nil {
160
167
if errors .Is (err , unix .EINTR ) {
161
168
return 0 , nil
@@ -179,22 +186,44 @@ func (e *epollState) apiPoll(tv time.Duration) (retVal int, err error) {
179
186
continue
180
187
}
181
188
182
- if ev .Events & (unix .EPOLLIN | unix .EPOLLRDHUP | unix .EPOLLHUP | unix .EPOLLERR ) > 0 {
183
- e .parent .parent .addReadEvNum ()
189
+ if e .getMultiEventLoop ().parseInParseLoop {
190
+ isRead := ev .Events & processRead > 0
191
+ isWrite := ev .Events & processWrite > 0
192
+ e .getMultiEventLoop ().parseLoop .addTask (int (ev .Fd ), func () bool {
193
+ if isRead {
194
+ err = conn .processWebsocketFrame ()
195
+ if err != nil {
196
+ conn .closeWithLock (err )
197
+ return true
198
+ }
199
+ }
200
+
201
+ if isWrite {
202
+ // 刷新下直接写入失败的数据
203
+ conn .flushOrClose ()
204
+ }
205
+
206
+ return true
207
+ })
208
+ continue
209
+ }
210
+ if ev .Events & processRead > 0 {
211
+ e .getMultiEventLoop ().addReadEvNum ()
184
212
185
213
// 读取数据,这里要发行下websocket的解析变成流式解析
186
214
err = conn .processWebsocketFrame ()
187
215
if err != nil {
188
216
conn .closeWithLock (err )
189
217
}
190
218
}
191
- if ev .Events & unix . EPOLLOUT > 0 {
192
- e .parent . parent .addWriteEvNum ()
219
+ if ev .Events & processWrite > 0 {
220
+ e .getMultiEventLoop () .addWriteEvNum ()
193
221
// 刷新下直接写入失败的数据
194
222
conn .flushOrClose ()
195
223
}
196
224
if ev .Events & (unix .EPOLLERR | unix .EPOLLHUP | unix .EPOLLRDHUP ) > 0 {
197
225
conn .closeWithLock (io .EOF )
226
+ continue
198
227
}
199
228
}
200
229
0 commit comments