Skip to content

Commit afa896e

Browse files
committed
UNDERTOW-2547 - Perform gathering write
Modify conduit to perform a gather write to decrease latency
1 parent 42993e8 commit afa896e

File tree

1 file changed

+63
-31
lines changed

1 file changed

+63
-31
lines changed

core/src/main/java/io/undertow/client/http/HttpRequestConduit.java

+63-31
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,30 @@
1818

1919
package io.undertow.client.http;
2020

21+
import static org.xnio.Bits.allAreClear;
22+
import static org.xnio.Bits.allAreSet;
23+
import static org.xnio.Bits.anyAreSet;
24+
25+
import java.io.IOException;
26+
import java.nio.ByteBuffer;
27+
import java.nio.channels.ClosedChannelException;
28+
import java.nio.channels.FileChannel;
29+
import java.util.Iterator;
30+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
31+
2132
import io.undertow.client.ClientRequest;
33+
import io.undertow.connector.ByteBufferPool;
34+
import io.undertow.connector.PooledByteBuffer;
2235
import io.undertow.server.TruncatedResponseException;
2336
import io.undertow.util.HeaderMap;
2437
import io.undertow.util.HttpString;
2538
import org.jboss.logging.Logger;
26-
import io.undertow.connector.ByteBufferPool;
27-
import io.undertow.connector.PooledByteBuffer;
2839
import org.xnio.XnioWorker;
2940
import org.xnio.channels.StreamSourceChannel;
3041
import org.xnio.conduits.AbstractStreamSinkConduit;
3142
import org.xnio.conduits.Conduits;
3243
import org.xnio.conduits.StreamSinkConduit;
3344

34-
import java.io.IOException;
35-
import java.nio.ByteBuffer;
36-
import java.nio.channels.ClosedChannelException;
37-
import java.nio.channels.FileChannel;
38-
import java.util.Iterator;
39-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
40-
41-
import static org.xnio.Bits.allAreClear;
42-
import static org.xnio.Bits.allAreSet;
43-
import static org.xnio.Bits.anyAreSet;
44-
4545
/**
4646
* @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
4747
* @author Emanuel Muckenhuber
@@ -101,7 +101,7 @@ final class HttpRequestConduit extends AbstractStreamSinkConduit<StreamSinkCondu
101101
* @return
102102
* @throws java.io.IOException
103103
*/
104-
private int processWrite(int state, final ByteBuffer userData) throws IOException {
104+
private int processWrite(int state, final Object userData) throws IOException {
105105
return doProcessWrite(state, userData);
106106
}
107107

@@ -117,7 +117,7 @@ private int processWrite(int state, final ByteBuffer userData) throws IOExceptio
117117
* @return
118118
* @throws java.io.IOException
119119
*/
120-
private int doProcessWrite(int state, final ByteBuffer userData) throws IOException {
120+
private int doProcessWrite(int state, final Object userData) throws IOException {
121121
if (state == STATE_START) {
122122
pooledBuffer = pool.allocate();
123123
}
@@ -331,23 +331,22 @@ private int doProcessWrite(int state, final ByteBuffer userData) throws IOExcept
331331
this.string = null;
332332
buffer.flip();
333333
//for performance reasons we use a gather write if there is user data
334-
if(userData == null) {
334+
if (userData == null) {
335335
do {
336336
res = next.write(buffer);
337337
if (res == 0) {
338338
log.trace("Continuation");
339339
return STATE_BUF_FLUSH;
340340
}
341341
} while (buffer.hasRemaining());
342+
} else if (userData instanceof ByteBuffer[]) {
343+
if (!writeBufferArray(buildDataArray((ByteBuffer[]) userData, buffer))) {
344+
return STATE_BUF_FLUSH;
345+
}
342346
} else {
343-
ByteBuffer[] b = {buffer, userData};
344-
do {
345-
long r = next.write(b, 0, b.length);
346-
if (r == 0 && buffer.hasRemaining()) {
347-
log.trace("Continuation");
348-
return STATE_BUF_FLUSH;
349-
}
350-
} while (buffer.hasRemaining());
347+
if (!writeBufferArray(new ByteBuffer[] {buffer, (ByteBuffer) userData})) {
348+
return STATE_BUF_FLUSH;
349+
}
351350
}
352351
pooledBuffer.close();
353352
pooledBuffer = null;
@@ -439,14 +438,16 @@ private int doProcessWrite(int state, final ByteBuffer userData) throws IOExcept
439438
}
440439
} while (buffer.hasRemaining());
441440
} else {
442-
ByteBuffer[] b = {buffer, userData};
443-
do {
444-
long r = next.write(b, 0, b.length);
445-
if (r == 0 && buffer.hasRemaining()) {
446-
log.trace("Continuation");
441+
442+
if (userData instanceof ByteBuffer[]) {
443+
if (!writeBufferArray(buildDataArray((ByteBuffer[]) userData, buffer))) {
447444
return STATE_BUF_FLUSH;
448445
}
449-
} while (buffer.hasRemaining());
446+
} else {
447+
if (!writeBufferArray(new ByteBuffer[] {buffer, (ByteBuffer) userData})) {
448+
return STATE_BUF_FLUSH;
449+
}
450+
}
450451
}
451452
// fall thru
452453
}
@@ -565,7 +566,7 @@ public long write(final ByteBuffer[] srcs, final int offset, final int length) t
565566
try {
566567
if (state != 0) {
567568
//todo: use gathering write here
568-
state = processWrite(state, null);
569+
state = processWrite(state, srcs);
569570
if (state != 0) {
570571
return 0;
571572
}
@@ -749,4 +750,35 @@ public void freeBuffers() {
749750
this.state = state & ~MASK_STATE | FLAG_SHUTDOWN;
750751
}
751752
}
753+
754+
private static ByteBuffer[] buildDataArray(ByteBuffer[] userData, ByteBuffer buffer) {
755+
int userDataLen = userData.length;
756+
ByteBuffer[] data = new ByteBuffer[userDataLen + 1];
757+
data[0] = buffer;
758+
System.arraycopy(userData, 0, data, 1, userDataLen);
759+
return data;
760+
}
761+
762+
private boolean writeBufferArray(ByteBuffer[] data) throws IOException {
763+
long totalWritten = 0;
764+
long totalRemaining = 0;
765+
766+
for (ByteBuffer buffer : data) {
767+
totalRemaining += buffer.remaining();
768+
}
769+
770+
do {
771+
long r = next.write(data, 0, data.length);
772+
totalWritten += r;
773+
if ((r == 0) && (totalWritten < totalRemaining)) {
774+
log.trace("Continuation");
775+
return false;
776+
}
777+
} while (totalWritten < totalRemaining);
778+
779+
// ? The original code allowed for an early return under certain conditions. If that condition is met, retunr
780+
// false from the loop. Otherwise, return true here to indicate a successful completion. Can't return bytes
781+
// written since it might get confused for STATE_BUF_FLUSH
782+
return true;
783+
}
752784
}

0 commit comments

Comments
 (0)