22
22
import io .undertow .UndertowOptions ;
23
23
import io .undertow .server .OpenListener ;
24
24
import io .undertow .util .WorkerUtils ;
25
-
26
25
import org .xnio .Buffers ;
26
+ import org .xnio .ChannelListener ;
27
27
import org .xnio .ChannelListeners ;
28
28
import org .xnio .IoUtils ;
29
29
import org .xnio .Options ;
47
47
*/
48
48
public final class WriteTimeoutStreamSinkConduit extends AbstractStreamSinkConduit <StreamSinkConduit > {
49
49
50
- private XnioExecutor .Key handle ;
50
+ private volatile XnioExecutor .Key handle ;
51
51
private final StreamConnection connection ;
52
52
private volatile long expireTime = -1 ;
53
53
private final OpenListener openListener ;
@@ -82,6 +82,16 @@ public WriteTimeoutStreamSinkConduit(final StreamSinkConduit delegate, StreamCon
82
82
super (delegate );
83
83
this .connection = connection ;
84
84
this .openListener = openListener ;
85
+ this .connection .getCloseSetter ().set ((ChannelListener <StreamConnection >) channel -> {
86
+ if (handle != null ) {
87
+ synchronized (WriteTimeoutStreamSinkConduit .this ) {
88
+ if (handle != null ) {
89
+ handle .remove ();
90
+ handle = null ;
91
+ }
92
+ }
93
+ }
94
+ });
85
95
}
86
96
87
97
private void handleWriteTimeout (final long ret ) throws IOException {
@@ -124,10 +134,14 @@ public long write(final ByteBuffer[] srcs, final int offset, final int length) t
124
134
public int writeFinal (ByteBuffer src ) throws IOException {
125
135
int ret = super .writeFinal (src );
126
136
handleWriteTimeout (ret );
127
- if (!src .hasRemaining ()) {
128
- if (handle != null ) {
129
- handle .remove ();
130
- handle = null ;
137
+ if (!src .hasRemaining ()) {
138
+ if (handle != null ) {
139
+ synchronized (this ) {
140
+ if (handle != null ) {
141
+ handle .remove ();
142
+ handle = null ;
143
+ }
144
+ }
131
145
}
132
146
}
133
147
return ret ;
@@ -137,10 +151,14 @@ public int writeFinal(ByteBuffer src) throws IOException {
137
151
public long writeFinal (ByteBuffer [] srcs , int offset , int length ) throws IOException {
138
152
long ret = super .writeFinal (srcs , offset , length );
139
153
handleWriteTimeout (ret );
140
- if (!Buffers .hasRemaining (srcs )) {
141
- if (handle != null ) {
142
- handle .remove ();
143
- handle = null ;
154
+ if (!Buffers .hasRemaining (srcs )) {
155
+ if (handle != null ) {
156
+ synchronized (this ) {
157
+ if (handle != null ) {
158
+ handle .remove ();
159
+ handle = null ;
160
+ }
161
+ }
144
162
}
145
163
}
146
164
return ret ;
@@ -200,19 +218,33 @@ private Integer getTimeout() {
200
218
201
219
@ Override
202
220
public void terminateWrites () throws IOException {
203
- super .terminateWrites ();
204
- if (handle != null ) {
205
- handle .remove ();
206
- handle = null ;
221
+ try {
222
+ super .terminateWrites ();
223
+ } finally {
224
+ if (handle != null ) {
225
+ synchronized (this ) {
226
+ if (this .handle != null ) {
227
+ handle .remove ();
228
+ handle = null ;
229
+ }
230
+ }
231
+ }
207
232
}
208
233
}
209
234
210
235
@ Override
211
236
public void truncateWrites () throws IOException {
212
- super .truncateWrites ();
213
- if (handle != null ) {
214
- handle .remove ();
215
- handle = null ;
237
+ try {
238
+ super .truncateWrites ();
239
+ } finally {
240
+ if (handle != null ) {
241
+ synchronized (this ) {
242
+ if (this .handle != null ) {
243
+ handle .remove ();
244
+ handle = null ;
245
+ }
246
+ }
247
+ }
216
248
}
217
249
}
218
250
@@ -233,8 +265,12 @@ public void suspendWrites() {
233
265
234
266
XnioExecutor .Key handle = this .handle ;
235
267
if (handle != null ) {
236
- handle .remove ();
237
- this .handle = null ;
268
+ synchronized (this ) {
269
+ if (this .handle != null ) {
270
+ handle .remove ();
271
+ this .handle = null ;
272
+ }
273
+ }
238
274
}
239
275
}
240
276
@@ -253,7 +289,11 @@ private void handleResumeTimeout() {
253
289
expireTime = currentTime + timeout ;
254
290
XnioExecutor .Key key = handle ;
255
291
if (key == null ) {
256
- handle = connection .getIoThread ().executeAfter (timeoutCommand , timeout , TimeUnit .MILLISECONDS );
292
+ synchronized (this ) {
293
+ if (handle == null ) {
294
+ handle = connection .getIoThread ().executeAfter (timeoutCommand , timeout , TimeUnit .MILLISECONDS );
295
+ }
296
+ }
257
297
}
258
298
}
259
299
}
0 commit comments