Skip to content

Support returning multiple streaming responses for a unary gRPC request #134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ dependencies {
testFixturesApi "org.hamcrest:hamcrest-core:3.0"
testFixturesApi "org.hamcrest:hamcrest-library:3.0"
testFixturesApi 'org.awaitility:awaitility:4.2.2'
testFixturesApi 'org.slf4j:slf4j-simple:2.0.16'

testFixturesApi "io.grpc:grpc-okhttp"

Expand Down
15 changes: 14 additions & 1 deletion src/main/java/org/wiremock/grpc/dsl/WireMockGrpc.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2023-2024 Thomas Akehurst
* Copyright (C) 2023-2025 Thomas Akehurst
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,8 @@
import com.github.tomakehurst.wiremock.common.Json;
import com.github.tomakehurst.wiremock.matching.StringValuePattern;
import com.google.protobuf.MessageOrBuilder;
import java.util.List;
import java.util.stream.Collectors;
import org.wiremock.annotations.Beta;
import org.wiremock.grpc.internal.JsonMessageUtils;

Expand Down Expand Up @@ -50,6 +52,17 @@ public static GrpcResponseDefinitionBuilder message(MessageOrBuilder messageOrBu
return new GrpcResponseDefinitionBuilder(Status.OK).fromJson(json);
}

public static GrpcResponseDefinitionBuilder messages(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest adding an overload taking a vararg so that it's not necessary to put List.of(...) when calling.

List<MessageOrBuilder> messageOrBuilderList) {
final String json =
"[\n"
+ messageOrBuilderList.stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is serialising the whole list not possible?

Seems a bit messy to have to do string concat like this.

.map(JsonMessageUtils::toJson)
.collect(Collectors.joining(",\n"))
+ "\n]";
return new GrpcResponseDefinitionBuilder(Status.OK).fromJson(json);
}

public static GrpcResponseDefinitionBuilder messageAsAny(MessageOrBuilder messageOrBuilder) {
final String initialJson = JsonMessageUtils.toJson(messageOrBuilder);
final ObjectNode jsonObject = Json.read(initialJson, ObjectNode.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2023-2024 Thomas Akehurst
* Copyright (C) 2023-2025 Thomas Akehurst
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,9 @@
import static org.wiremock.grpc.dsl.GrpcResponseDefinitionBuilder.GRPC_STATUS_REASON;
import static org.wiremock.grpc.internal.Delays.delayIfRequired;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.tomakehurst.wiremock.common.Pair;
import com.github.tomakehurst.wiremock.http.HttpHeader;
import com.github.tomakehurst.wiremock.http.StubRequestHandler;
Expand All @@ -28,11 +31,14 @@
import io.grpc.Status;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import org.wiremock.grpc.dsl.WireMockGrpc;

public class UnaryServerCallHandler extends BaseCallHandler
implements ServerCalls.UnaryMethod<DynamicMessage, DynamicMessage> {

private final ObjectMapper objectMapper = new ObjectMapper();

public UnaryServerCallHandler(
StubRequestHandler stubRequestHandler,
Descriptors.ServiceDescriptor serviceDescriptor,
Expand Down Expand Up @@ -85,12 +91,26 @@ public void invoke(DynamicMessage request, StreamObserver<DynamicMessage> respon
return;
}

DynamicMessage.Builder messageBuilder =
DynamicMessage.newBuilder(methodDescriptor.getOutputType());
try (MappingIterator<JsonNode> iterator =
objectMapper.readerFor(JsonNode.class).readValues(resp.getBody())) {

while (iterator.hasNext()) {
JsonNode node = iterator.next();

final DynamicMessage.Builder messageBuilder =
DynamicMessage.newBuilder(methodDescriptor.getOutputType());
final DynamicMessage response =
jsonMessageConverter.toMessage(node.toString(), messageBuilder);

final DynamicMessage response =
jsonMessageConverter.toMessage(resp.getBodyAsString(), messageBuilder);
responseObserver.onNext(response);
responseObserver.onNext(response);
}
} catch (IOException e) {
responseObserver.onError(
Status.INTERNAL
.withDescription("Error parsing response")
.withCause(e)
.asRuntimeException());
}
responseObserver.onCompleted();
},
ServeEvent.of(wireMockRequest));
Expand Down
21 changes: 18 additions & 3 deletions src/test/java/org/wiremock/grpc/GrpcAcceptanceTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2023-2024 Thomas Akehurst
* Copyright (C) 2023-2025 Thomas Akehurst
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.iterableWithSize;
Expand All @@ -39,6 +40,7 @@
import static org.wiremock.grpc.dsl.WireMockGrpc.jsonTemplate;
import static org.wiremock.grpc.dsl.WireMockGrpc.message;
import static org.wiremock.grpc.dsl.WireMockGrpc.messageAsAny;
import static org.wiremock.grpc.dsl.WireMockGrpc.messages;
import static org.wiremock.grpc.dsl.WireMockGrpc.method;

import com.example.grpc.AnotherGreetingServiceGrpc;
Expand Down Expand Up @@ -282,14 +284,27 @@ void throwsReturnedErrorFromStreamingClientCallWhenServerOnlyReturnsAHttpStatus(
}

@Test
void returnsStreamedResponseToUnaryRequest() {
void returnsStreamedResponseToUnaryRequestWithSingleItem() {
mockGreetingService.stubFor(
method("oneGreetingManyReplies")
.willReturn(message(HelloResponse.newBuilder().setGreeting("Hi Tom"))));

assertThat(greetingsClient.oneGreetingManyReplies("Tom"), hasItem("Hi Tom"));
}

@Test
void returnsStreamedResponseToUnaryRequest() {
mockGreetingService.stubFor(
method("oneGreetingManyReplies")
.willReturn(
messages(
List.of(
HelloResponse.newBuilder().setGreeting("Hi Tom"),
HelloResponse.newBuilder().setGreeting("Hi Tom again")))));

assertThat(greetingsClient.oneGreetingManyReplies("Tom"), hasItems("Hi Tom", "Hi Tom again"));
}

@Test
void returnsResponseWithImportedType() {
mockGreetingService.stubFor(
Expand Down Expand Up @@ -337,7 +352,7 @@ void networkFault() {

Exception exception =
assertThrows(StatusRuntimeException.class, () -> greetingsClient.greet("Alan"));
assertThat(exception.getMessage(), startsWith("UNKNOWN"));
assertThat(exception.getMessage(), startsWith("CANCELLED"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.iterableWithSize;
Expand All @@ -39,6 +40,7 @@
import static org.wiremock.grpc.dsl.WireMockGrpc.jsonTemplate;
import static org.wiremock.grpc.dsl.WireMockGrpc.message;
import static org.wiremock.grpc.dsl.WireMockGrpc.messageAsAny;
import static org.wiremock.grpc.dsl.WireMockGrpc.messages;
import static org.wiremock.grpc.dsl.WireMockGrpc.method;

import com.example.grpc.AnotherGreetingServiceGrpc;
Expand Down Expand Up @@ -283,14 +285,27 @@ void throwsReturnedErrorFromStreamingClientCallWhenServerOnlyReturnsAHttpStatus(
}

@Test
void returnsStreamedResponseToUnaryRequest() {
void returnsStreamedResponseToUnaryRequestWithSingleItem() {
mockGreetingService.stubFor(
method("oneGreetingManyReplies")
.willReturn(message(HelloResponse.newBuilder().setGreeting("Hi Tom"))));

assertThat(greetingsClient.oneGreetingManyReplies("Tom"), hasItem("Hi Tom"));
}

@Test
void returnsStreamedResponseToUnaryRequest() {
mockGreetingService.stubFor(
method("oneGreetingManyReplies")
.willReturn(
messages(
List.of(
HelloResponse.newBuilder().setGreeting("Hi Tom"),
HelloResponse.newBuilder().setGreeting("Hi Tom again")))));

assertThat(greetingsClient.oneGreetingManyReplies("Tom"), hasItems("Hi Tom", "Hi Tom again"));
}

@Test
void returnsResponseWithImportedType() {
mockGreetingService.stubFor(
Expand Down Expand Up @@ -338,7 +353,7 @@ void networkFault() {

Exception exception =
assertThrows(StatusRuntimeException.class, () -> greetingsClient.greet("Alan"));
assertThat(exception.getMessage(), startsWith("UNKNOWN"));
assertThat(exception.getMessage(), startsWith("CANCELLED"));
}

@Test
Expand Down