forked from pkg/poller
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwait_pollable.go
157 lines (138 loc) · 3.2 KB
/
wait_pollable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package poller
import (
"io"
"syscall"
)
// WaitPollable represents a file descriptor that can be read/written
// and polled/waited for readiness notification.
type WaitPollable struct {
*Pollable
cr, cw chan error
}
func NewWaitPollable(poller *Poller, fd uintptr) (*WaitPollable, error) {
p := &WaitPollable{
cr: make(chan error),
cw: make(chan error),
}
if pp, err := poller.RegisterHandler(fd, waitHandler, p); err != nil {
return nil, err
} else {
p.Pollable = pp
return p, nil
}
}
func (p *Poller) Register(fd uintptr) (*WaitPollable, error) {
return NewWaitPollable(p, fd)
}
func waitHandler(fd uintptr, revents uint32, _p interface{}) {
p := _p.(*WaitPollable)
if revents&EPOLLOUT != 0 {
p.wake('w', nil)
revents &= ^EPOLLOUT
}
if revents&EPOLLIN != 0 {
p.wake('r', nil)
revents &= ^EPOLLIN
}
if revents != 0 {
p.wake('r', EventError{fd, revents})
}
}
// Read reads up to len(b) bytes from the underlying fd. It returns the number of
// bytes read and an error, if any. EOF is signaled by a zero count with
// err set to io.EOF.
//
// Callers to Read will block if there is no data available to read.
func (p *WaitPollable) Read(b []byte) (int, error) {
n, e := p.read(b)
if n < 0 {
n = 0
}
if n == 0 && len(b) > 0 && e == nil {
return 0, io.EOF
}
if e != nil {
return n, e
}
return n, nil
}
func (p *WaitPollable) read(b []byte) (int, error) {
for {
n, e := syscall.Read(int(p.fd), b)
if e != syscall.EAGAIN {
return n, e
}
if err := p.WaitRead(); err != nil {
return 0, err
}
}
}
// Write writes len(b) bytes to the fd. It returns the number of bytes
// written and an error, if any. Write returns a non-nil error when n !=
// len(b).
//
// Callers to Write will block if there is no buffer capacity available.
func (p *WaitPollable) Write(b []byte) (int, error) {
n, e := p.write(b)
if n < 0 {
n = 0
}
if n != len(b) {
return n, io.ErrShortWrite
}
if e != nil {
return n, e
}
return n, nil
}
func (p *WaitPollable) write(b []byte) (int, error) {
for {
// TODO(dfc) this is wrong
n, e := syscall.Write(int(p.fd), b)
if e != syscall.EAGAIN {
return n, e
}
if err := p.WaitWrite(); err != nil {
return 0, err
}
}
}
// Close deregisters the Pollable and closes the underlying file descriptor.
func (p *WaitPollable) Close() error {
err := p.Pollable.Close()
close(p.cr)
close(p.cw)
return err
}
func (p *WaitPollable) wantRead() error {
return p.poller.WantEvents(p.Pollable, EPOLLIN, true)
}
func (p *WaitPollable) wantWrite() error {
return p.poller.WantEvents(p.Pollable, EPOLLOUT, true)
}
// WaitRead waits for the WaitPollable to become ready for
// reading.
func (p *WaitPollable) WaitRead() error {
debug("Pollable: %p, fd: %v waitread", p, p.fd)
if err := p.wantRead(); err != nil {
return err
}
return <-p.cr
}
// WaitWrite waits for the WaitPollable to become ready for
// writing.
func (p *WaitPollable) WaitWrite() error {
debug("Pollable: %p, fd: %v waitwrite", p, p.fd)
if err := p.wantWrite(); err != nil {
return err
}
return <-p.cw
}
func (p *WaitPollable) wake(mode int, err error) {
debug("Pollable: %p, fd: %v wake: %c, %v", p, p.fd, mode, err)
if mode == 'r' {
p.cr <- err
} else {
p.cw <- err
}
}