Skip to content

Commit

Permalink
Fix requestChannel to not drop first payload (#398)
Browse files Browse the repository at this point in the history
  • Loading branch information
yschimke authored Sep 21, 2017
1 parent bf339d9 commit cc5ea54
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 6 deletions.
14 changes: 9 additions & 5 deletions rsocket-core/src/main/java/io/rsocket/RSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package io.rsocket;

import static io.rsocket.Frame.Request.initialRequestN;
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_C;
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.collection.IntObjectHashMap;
import io.rsocket.Frame.Request;
import io.rsocket.exceptions.ApplicationException;
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.util.PayloadImpl;
Expand Down Expand Up @@ -157,7 +157,8 @@ private Mono<Void> handleFrame(Frame frame) {
case REQUEST_N:
return handleRequestN(streamId, frame);
case REQUEST_STREAM:
return handleStream(streamId, requestStream(new PayloadImpl(frame)), frame);
return handleStream(
streamId, requestStream(new PayloadImpl(frame)), initialRequestN(frame));
case REQUEST_CHANNEL:
return handleChannel(streamId, frame);
case PAYLOAD:
Expand Down Expand Up @@ -235,8 +236,7 @@ private Mono<Void> handleRequestResponse(int streamId, Mono<Payload> response) {
return responseFrame.flatMap(connection::sendOne);
}

private Mono<Void> handleStream(int streamId, Flux<Payload> response, Frame firstFrame) {
int initialRequestN = Request.initialRequestN(firstFrame);
private Mono<Void> handleStream(int streamId, Flux<Payload> response, int initialRequestN) {
Flux<Frame> responseFrames =
response
.map(payload -> Frame.PayloadFrame.from(streamId, FrameType.NEXT, payload))
Expand Down Expand Up @@ -287,7 +287,11 @@ private Mono<Void> handleChannel(int streamId, Frame firstFrame) {
})
.doFinally(signalType -> removeChannelProcessor(streamId));

return handleStream(streamId, requestChannel(payloads), firstFrame);
// not chained, as the payload should be enqueued in the Unicast processor before this method returns
// and any later payload can be processed
frames.onNext(new PayloadImpl(firstFrame));

return handleStream(streamId, requestChannel(payloads), initialRequestN(firstFrame));
}

private Mono<Void> handleKeepAliveFrame(Frame frame) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.rsocket.Payload;
import io.rsocket.util.PayloadImpl;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -79,7 +80,7 @@ public void testRequestResponse10() {
assertEquals(10, outputCount);
}

private PayloadImpl testPayload(int metadataPresent) {
private Payload testPayload(int metadataPresent) {
String metadata;
switch (metadataPresent % 5) {
case 0:
Expand Down Expand Up @@ -164,4 +165,35 @@ public void testRequestStreamWithDelayedRequestN() {

assertEquals(10, ts.count());
}

@Test(timeout = 10000)
@Ignore
public void testChannel0() {
Flux<Payload> publisher = setup.getRSocket().requestChannel(Flux.empty());

long count = publisher.count().block();

assertEquals(0, count);
}

@Test(timeout = 10000)
public void testChannel1() {
Flux<Payload> publisher = setup.getRSocket().requestChannel(Flux.just(testPayload(0)));

long count = publisher.count().block();

assertEquals(1, count);
}

@Test(timeout = 10000)
public void testChannel3() {
Flux<Payload> publisher =
setup
.getRSocket()
.requestChannel(Flux.just(testPayload(0), testPayload(1), testPayload(2)));

long count = publisher.count().block();

assertEquals(3, count);
}
}
7 changes: 7 additions & 0 deletions rsocket-test/src/main/java/io/rsocket/test/TestRSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.util.PayloadImpl;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -43,4 +44,10 @@ public Mono<Void> metadataPush(Payload payload) {
public Mono<Void> fireAndForget(Payload payload) {
return Mono.empty();
}

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
// TODO is defensive copy neccesary?
return Flux.from(payloads).map(p -> new PayloadImpl(p.getDataUtf8(), p.getMetadataUtf8()));
}
}

0 comments on commit cc5ea54

Please sign in to comment.