diff --git a/prj/test/functional/grpc-proxy-tck/src/main/java/grpc/proxy/version_1/ListenableStreamObserver.java b/prj/test/functional/grpc-proxy-tck/src/main/java/grpc/proxy/version_1/ListenableStreamObserver.java index d82985f09865e..05d1b4790a44a 100644 --- a/prj/test/functional/grpc-proxy-tck/src/main/java/grpc/proxy/version_1/ListenableStreamObserver.java +++ b/prj/test/functional/grpc-proxy-tck/src/main/java/grpc/proxy/version_1/ListenableStreamObserver.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2024, Oracle and/or its affiliates. + * Copyright (c) 2000, 2025, Oracle and/or its affiliates. * * Licensed under the Universal Permissive License v 1.0 as shown at * https://oss.oracle.com/licenses/upl. @@ -8,7 +8,6 @@ package grpc.proxy.version_1; import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; import com.oracle.coherence.common.base.Logger; import com.oracle.coherence.grpc.BinaryHelper; import com.oracle.coherence.grpc.messages.cache.v1.MapEventMessage; @@ -17,11 +16,9 @@ import com.oracle.coherence.grpc.messages.proxy.v1.ProxyResponse; import com.tangosol.io.Serializer; import com.tangosol.net.cache.CacheEvent; -import com.tangosol.util.MapEvent; import com.tangosol.util.MapListener; import com.tangosol.util.ObservableMap; import com.tangosol.util.WrapperObservableMap; -import grpc.proxy.TestStreamObserver; import io.reactivex.rxjava3.annotations.NonNull; import java.util.ArrayList; @@ -33,7 +30,7 @@ import java.util.function.Consumer; public class ListenableStreamObserver - extends TestStreamObserver + extends TestProxyResponseStreamObserver { @Override diff --git a/prj/test/functional/grpc-proxy-tck/src/main/java/grpc/proxy/version_1/NamedCacheProxyProtocolIT.java b/prj/test/functional/grpc-proxy-tck/src/main/java/grpc/proxy/version_1/NamedCacheProxyProtocolIT.java index 547b40ec9e691..a138bc68711d7 100644 --- a/prj/test/functional/grpc-proxy-tck/src/main/java/grpc/proxy/version_1/NamedCacheProxyProtocolIT.java +++ b/prj/test/functional/grpc-proxy-tck/src/main/java/grpc/proxy/version_1/NamedCacheProxyProtocolIT.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2024, Oracle and/or its affiliates. + * Copyright (c) 2000, 2025, Oracle and/or its affiliates. * * Licensed under the Universal Permissive License v 1.0 as shown at * https://oss.oracle.com/licenses/upl. @@ -94,6 +94,7 @@ import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -121,7 +122,7 @@ public class NamedCacheProxyProtocolIT @MethodSource("serializers") public void shouldFailIfCacheNotEnsured(String ignored, Serializer serializer, String sScope) throws Exception { - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -145,7 +146,7 @@ public void shouldAddIndex(String ignored, Serializer serializer, String sScope) ValueExtractor extractor = new UniversalExtractor("foo"); ByteString binExtractor = toByteString(extractor, serializer); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -177,7 +178,7 @@ public void shouldAddSortedIndex(String ignored, Serializer serializer, String s ValueExtractor extractor = new UniversalExtractor("foo"); ByteString binExtractor = toByteString(extractor, serializer); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -209,7 +210,7 @@ public void shouldAddSortedIndexWithComparator(String ignored, Serializer serial Comparator comparator = new SafeComparator(new UniversalExtractor("bar")); ByteString binComparator = toByteString(comparator, serializer); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -254,7 +255,7 @@ public void shouldCallAggregateWithFilterExpectingSingleResult(String ignored, S Filter filter = Filters.equal("age", 25); int expected = cache.aggregate(filter, aggregator); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -298,7 +299,7 @@ public void shouldCallAggregateWithFilterMatchingNoEntriesExpectingSingleResult( Filter filter = Filters.equal("age", 100); int cExpected = cache.aggregate(filter, aggregator); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -342,7 +343,7 @@ public void shouldCallAggregateWithKeysExpectingSingleResult(String ignored, Ser int nExpected = cache.aggregate(listKeys, aggregator); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -391,7 +392,7 @@ public void shouldCallAggregateWithKeysMatchingNoEntriesExpectingSingleResult(St int nExpected = cache.aggregate(listKeys, aggregator); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -435,7 +436,7 @@ public void shouldCallAggregateWithNoKeysOrFilterExpectingSingleResult(String ig int nExpected = cache.aggregate(aggregator); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -477,7 +478,7 @@ public void shouldCallAggregateWithFilterExpectingMapResult(String ignored, Seri Filter filter = Filters.equal("age", 25); Map mapExpected = cache.aggregate(filter, aggregator); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -523,7 +524,7 @@ public void shouldCallAggregateWithKeysExpectingMapResult(String ignored, Serial .collect(Collectors.toList()); Map nExpected = cache.aggregate(listKeys, aggregator); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -558,7 +559,7 @@ public void shouldClearEmptyCache(String ignored, Serializer serializer, String NamedCache cache = ensureEmptyCache(sScope, sCacheName); cache.clear(); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -577,7 +578,7 @@ public void shouldClearPopulatedCache(String ignored, Serializer serializer, Str NamedCache cache = ensureEmptyCache(sScope, sCacheName); clearAndPopulate(cache, 10); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -598,7 +599,7 @@ public void shouldContainEntryWhenMappingPresent(String ignored, Serializer seri NamedCache cache = ensureEmptyCache(sScope, sCacheName); clearAndPopulate(cache, 3); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -626,7 +627,7 @@ public void shouldNotContainEntryWhenMappingHasDifferentValue(String ignored, Se NamedCache cache = ensureEmptyCache(sScope, sCacheName); clearAndPopulate(cache, 3); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -653,7 +654,7 @@ public void shouldNotContainEntryWhenMappingNotPresent(String ignored, Serialize NamedCache cache = ensureEmptyCache(sScope, sCacheName); clearAndPopulate(cache, 3); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -683,7 +684,7 @@ public void shouldReturnTrueForContainsKeyWithExistingMapping(String ignored, Se NamedCache cache = ensureEmptyCache(sScope, sCacheName); clearAndPopulate(cache, 5); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -708,7 +709,7 @@ public void shouldReturnFalseForContainsKeyWithNonExistentMapping(String ignored NamedCache cache = ensureEmptyCache(sScope, sCacheName); clearAndPopulate(cache, 5); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -733,7 +734,7 @@ public void shouldContainValueWhenValuePresent(String ignored, Serializer serial NamedCache cache = ensureEmptyCache(sScope, sCacheName); clearAndPopulate(cache, 3); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -759,7 +760,7 @@ public void shouldContainValueWhenValuePresentMultipleTimes(String ignored, Seri cache.put("key-11", "value-1"); cache.put("key-22", "value-2"); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -782,13 +783,13 @@ public void shouldNotContainValueWhenMappingNotPresent(String ignored, Serialize NamedCache cache = ensureEmptyCache(sScope, sCacheName); clearAndPopulate(cache, 3); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); int cacheId = ensureCache(channel, observer, sCacheName); - ByteString value = toByteString("value-100", serializer); + ByteString value = toByteString("value-100", serializer); BytesValue binValue = BytesValue.newBuilder().setValue(value).build(); NamedCacheResponse response = sendCacheRequest(channel, observer, cacheId, NamedCacheRequestType.ContainsValue, binValue); @@ -806,7 +807,7 @@ public void shouldDestroyCache(String ignored, Serializer serializer, String sSc String sCacheName = "test-cache"; NamedCache cache = ensureEmptyCache(sScope, sCacheName); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -854,7 +855,7 @@ public void shouldCallEntrySetWithFilterWhenSomeEntriesMatch(String ignored, Ser cache.put(person2.getLastName(), person2); cache.put(person3.getLastName(), person3); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -889,7 +890,7 @@ public void shouldCallEntrySetWithFilterWhenAllEntriesMatch(String ignored, Seri cache.put(person2.getLastName(), person2); cache.put(person3.getLastName(), person3); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -924,7 +925,7 @@ public void shouldCallEntrySetWithFilterWhenNoEntriesMatch(String ignored, Seria cache.put(person2.getLastName(), person2); cache.put(person3.getLastName(), person3); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -959,7 +960,7 @@ public void shouldCallEntrySetWithFilterAndComparatorWhenSomeEntriesMatch(String cache.put("key-" + i, i); } - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -1001,7 +1002,7 @@ public void shouldCallEntrySetWithFilterAndComparatorWhenAllEntriesMatch(String cache.put("key-" + i, i); } - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -1043,7 +1044,7 @@ public void shouldCallEntrySetWithFilterAndComparatorWhenNoEntriesMatch(String i cache.put("key-" + i, i); } - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -1079,7 +1080,7 @@ public void shouldReceiveTruncateEvent(String ignored, Serializer serializer, St cache.clear(); cache.put("foo", "bar"); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -1853,7 +1854,7 @@ public void shouldGetExistingKey(String ignored, Serializer serializer, String s cache.clear(); cache.put("key-1", "value-1"); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -1879,7 +1880,7 @@ public void shouldGetExistingKeyMappedToNull(String ignored, Serializer serializ cache.clear(); cache.put("key-1", null); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -1904,7 +1905,7 @@ public void shouldGetNonExistentKey(String ignored, Serializer serializer, Strin NamedCache cache = ensureEmptyCache(sScope, sCacheName); cache.clear(); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -1982,7 +1983,7 @@ void assertGetAll(NamedCache cache, Serializer serializer, Strin .map(s -> toByteString(s, serializer)) .collect(Collectors.toList()); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2020,7 +2021,7 @@ public void shouldCallInvoke(String ignored, Serializer serializer, String sScop ValueExtractor extractor = new UniversalExtractor<>("lastName"); InvocableMap.EntryProcessor processor = new ExtractorProcessor<>(extractor); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2050,7 +2051,7 @@ public void shouldCallInvokeWithMissingEntryProcessor(String ignored, Serializer String sCacheName = "people"; NamedCache cache = ensureEmptyCache(sScope, sCacheName); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2084,7 +2085,7 @@ public void shouldCallInvokeAllWithFilter(String ignored, Serializer serializer, InvocableMap.EntryProcessor processor = new ExtractorProcessor<>(extractor); Filter filter = Filters.equal("age", 25); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2126,7 +2127,7 @@ public void shouldCallInvokeAllWithNoFilterOrKeys(String ignored, Serializer ser ValueExtractor extractor = new UniversalExtractor<>("firstName"); InvocableMap.EntryProcessor processor = new ExtractorProcessor<>(extractor); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2169,7 +2170,7 @@ public void shouldCallInvokeAllWithKeys(String ignored, Serializer serializer, S List listKeys = Arrays.asList(BinaryHelper.toByteString(person1.getLastName(), serializer), BinaryHelper.toByteString(person2.getLastName(), serializer)); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2199,7 +2200,7 @@ public void shouldCallInvokeAllWithMissingProcessor(String ignored, Serializer s { String sCacheName = "people"; - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2225,7 +2226,7 @@ public void shouldBeEmpty(String ignored, Serializer serializer, String sScope) NamedCache cache = ensureEmptyCache(sScope, sCacheName); cache.clear(); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2244,7 +2245,7 @@ public void shouldNotBeEmpty(String ignored, Serializer serializer, String sScop NamedCache cache = ensureEmptyCache(sScope, sCacheName); clearAndPopulate(cache, 10); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2271,7 +2272,7 @@ public void shouldCallKeySetWithFilterWhenSomeEntriesMatch(String ignored, Seria cache.put(person2.getLastName(), person2); cache.put(person3.getLastName(), person3); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2309,7 +2310,7 @@ public void shouldCallKeySetWithFilterWhenAllEntriesMatch(String ignored, Serial cache.put(person2.getLastName(), person2); cache.put(person3.getLastName(), person3); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2347,7 +2348,7 @@ public void shouldCallKeySetWithFilterWhenNoEntriesMatch(String ignored, Seriali cache.put(person2.getLastName(), person2); cache.put(person3.getLastName(), person3); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2373,7 +2374,7 @@ public void shouldInsertNewEntry(String ignored, Serializer serializer, String s NamedCache cache = ensureEmptyCache(sScope, sCacheName); cache.clear(); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2402,7 +2403,7 @@ public void shouldUpdateEntry(String ignored, Serializer serializer, String sSco cache.clear(); cache.put("key-1", "value-1"); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2432,7 +2433,7 @@ public void shouldUpdateEntryPreviouslyMappedToNull(String ignored, Serializer s cache.clear(); cache.put("key-1", null); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2466,7 +2467,7 @@ public void shouldUpdateEntryWithNullValue(String ignored, Serializer serializer cache.clear(); cache.put("key-1", "value-1"); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2499,7 +2500,7 @@ public void shouldPutAll(String ignored, Serializer serializer, String sScope) t ByteString key2 = toByteString("key-2", serializer); ByteString value2 = toByteString("value-2", serializer); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2530,7 +2531,7 @@ public void shouldPutAllWithExpiry(String ignored, Serializer serializer, String ByteString value2 = toByteString("value-2", serializer); long cMillis = 50000L; - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2558,7 +2559,7 @@ public void shouldPutAllWithZeroEntries(String ignored, Serializer serializer, S NamedCache cache = ensureEmptyCache(sScope, sCacheName); cache.clear(); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2580,7 +2581,7 @@ public void shouldPutIfAbsentForNonExistentKey(String ignored, Serializer serial NamedCache cache = ensureEmptyCache(sScope, sCacheName); cache.clear(); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2606,7 +2607,7 @@ public void shouldPutIfAbsentForExistingKey(String ignored, Serializer serialize cache.clear(); cache.put("key-1", "value-1"); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2632,7 +2633,7 @@ public void shouldPutIfAbsentForExistingKeyMappedToNull(String ignored, Serializ cache.clear(); cache.put("key-1", null); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2660,7 +2661,7 @@ public void shouldRemoveOnNonExistentEntry(String ignored, Serializer serializer int count = 10; clearAndPopulate(cache, count); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2685,7 +2686,7 @@ public void shouldReturnPreviousValueForRemoveOnExistingMapping(String ignored, int cCount = 10; clearAndPopulate(cache, cCount); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2712,7 +2713,7 @@ public void shouldReturnFalseForRemoveMappingOnNonExistentMapping(String ignored NamedCache cache = ensureEmptyCache(sScope, sCacheName); cache.clear(); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2738,7 +2739,7 @@ public void shouldReturnFalseForRemoveMappingOnNonMatchingMapping(String ignored cache.clear(); cache.put("key-1", "value-1"); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2764,7 +2765,7 @@ public void shouldReturnTrueForRemoveMappingOnMatchingMapping(String ignored, Se cache.clear(); cache.put("key-1", "value-123"); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2797,7 +2798,7 @@ public void shouldRemoveIndexWhenIndexExists(String ignored, Serializer serializ // Add the index using the normal cache cache.addIndex(extractor, false, null); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2824,7 +2825,7 @@ public void shouldRemoveIndexWhenIndexDoesNotExist(String ignored, Serializer se ValueExtractor extractor = new UniversalExtractor("foo"); ByteString binExtractor = toByteString(extractor, serializer); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2851,7 +2852,7 @@ public void shouldReturnNullValueForReplaceOnNonExistentMapping(String ignored, NamedCache cache = ensureEmptyCache(sScope, sCacheName); cache.clear(); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2877,7 +2878,7 @@ public void shouldReturnNonNullForReplaceOnExistingMapping(String ignored, Seria NamedCache cache = ensureEmptyCache(sScope, sCacheName); clearAndPopulate(cache, 5); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2905,7 +2906,7 @@ public void shouldReturnFalseForReplaceMappingOnNonExistentMapping(String ignore NamedCache cache = ensureEmptyCache(sScope, sCacheName); cache.clear(); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2937,7 +2938,7 @@ public void shouldReturnFalseForReplaceMappingOnNonMatchingMapping(String ignore NamedCache cache = ensureEmptyCache(sScope, sCacheName); clearAndPopulate(cache, 5); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -2971,7 +2972,7 @@ public void shouldReturnTrueForReplaceMappingOnMatchingMapping(String ignored, S NamedCache cache = ensureEmptyCache(sScope, sCacheName); clearAndPopulate(cache, 5); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -3004,7 +3005,7 @@ public void shouldGetSizeOfEmptyCache(String ignored, Serializer serializer, Str NamedCache cache = ensureEmptyCache(sScope, sCacheName); cache.clear(); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -3025,7 +3026,7 @@ public void shouldGetSizeOfPopulatedCache(String ignored, Serializer serializer, NamedCache cache = ensureEmptyCache(sScope, sCacheName); clearAndPopulate(cache, 10); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -3077,7 +3078,7 @@ public void shouldCallValuesWithFilterWhenSomeEntriesMatch(String ignored, Seria cache.put(person2.getLastName(), person2); cache.put(person3.getLastName(), person3); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -3117,7 +3118,7 @@ public void shouldCallValuesWithFilterWhenAllEntriesMatch(String ignored, Serial cache.put(person2.getLastName(), person2); cache.put(person3.getLastName(), person3); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -3158,7 +3159,7 @@ public void shouldCallValuesWithFilterWhenNoEntriesMatch(String ignored, Seriali cache.put(person2.getLastName(), person2); cache.put(person3.getLastName(), person3); - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -3199,7 +3200,7 @@ public void shouldCallValuesWithFilterAndComparatorWhenSomeEntriesMatch(String cache.put("key-" + i, i); } - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -3246,7 +3247,7 @@ public void shouldCallValuesWithFilterAndComparatorWhenAllEntriesMatch(String cache.put("key-" + i, i); } - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -3290,7 +3291,7 @@ public void shouldCallValuesWithFilterAndComparatorWhenNoEntriesMatch(String cache.put("key-" + i, i); } - TestStreamObserver observer = new TestStreamObserver<>(); + TestProxyResponseStreamObserver observer = new TestProxyResponseStreamObserver(); StreamObserver channel = openChannel(observer); init(channel, observer, serializer, sScope); @@ -3321,7 +3322,7 @@ public void shouldCallValuesWithFilterAndComparatorWhenNoEntriesMatch(String // ----- helper methods ------------------------------------------------- protected Resp sendCacheRequest(StreamObserver channel, - TestStreamObserver observer, int cacheId, NamedCacheRequestType type, + TestProxyResponseStreamObserver observer, int cacheId, NamedCacheRequestType type, Message message) throws Exception { NamedCacheRequest request = NamedCacheRequest.newBuilder() @@ -3333,11 +3334,22 @@ protected Resp sendCacheRequest(StreamObserver future = observer.addListener(r -> r.getId() == requestId); + channel.onNext(proxyRequest); + + future.get(1, TimeUnit.MINUTES); observer.awaitCount(responseId, 1, TimeUnit.MINUTES); + observer.assertNoErrors(); - ProxyResponse proxyResponse = observer.valueAt(responseId - 1); + ProxyResponse[] aResponse = observer.values().stream() + .filter(r -> r.getId() == requestId) + .toArray(ProxyResponse[]::new); + + ProxyResponse proxyResponse = aResponse[0]; if (proxyResponse.getResponseCase() == ProxyResponse.ResponseCase.ERROR) { ErrorMessage error = proxyResponse.getError(); @@ -3368,8 +3380,7 @@ protected Resp sendCacheRequest(StreamObserver List sendStreamCacheRequest(StreamObserver channel, - TestStreamObserver observer, int cacheId, NamedCacheRequestType type, - Message message) throws Exception + TestProxyResponseStreamObserver observer, int cacheId, NamedCacheRequestType type, Message message) throws Exception { NamedCacheRequest request = NamedCacheRequest.newBuilder() .setCacheId(cacheId) diff --git a/prj/test/functional/grpc-proxy-tck/src/main/java/grpc/proxy/version_1/TestProxyResponseStreamObserver.java b/prj/test/functional/grpc-proxy-tck/src/main/java/grpc/proxy/version_1/TestProxyResponseStreamObserver.java new file mode 100644 index 0000000000000..f417e3795f013 --- /dev/null +++ b/prj/test/functional/grpc-proxy-tck/src/main/java/grpc/proxy/version_1/TestProxyResponseStreamObserver.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2000, 2025, Oracle and/or its affiliates. + * + * Licensed under the Universal Permissive License v 1.0 as shown at + * https://oss.oracle.com/licenses/upl. + */ + +package grpc.proxy.version_1; + +import com.oracle.coherence.common.base.Predicate; +import com.oracle.coherence.grpc.messages.proxy.v1.ProxyResponse; +import grpc.proxy.TestStreamObserver; +import io.reactivex.rxjava3.annotations.NonNull; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; + +public class TestProxyResponseStreamObserver + extends TestStreamObserver + { + @Override + public void onNext(@NonNull ProxyResponse proxyResponse) + { + super.onNext(proxyResponse); + for (Listener listener : f_listListener) + { + try + { + listener.onNext(proxyResponse); + } + catch (Exception e) + { + onError(e); + } + } + } + + public CompletableFuture addListener(Predicate predicate) + { + Listener l = new Listener(predicate); + f_listListener.add(l); + return l.f_future; + } + + public void removeListener(Predicate predicate) + { + Listener l = new Listener(predicate); + f_listListener.remove(l); + } + + // ----- inner class Listener ------------------------------------------- + + protected static class Listener + { + public Listener(Predicate predicate) + { + f_predicate = predicate; + } + + protected void onNext(ProxyResponse proxyResponse) + { + + ProxyResponse.ResponseCase type = proxyResponse.getResponseCase(); + if (!f_future.isDone() + && (type == ProxyResponse.ResponseCase.COMPLETE || type == ProxyResponse.ResponseCase.ERROR) + && f_predicate.evaluate(proxyResponse)) + { + f_future.complete(proxyResponse); + } + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) return false; + Listener listener = (Listener) o; + return Objects.equals(f_predicate, listener.f_predicate); + } + + @Override + public int hashCode() + { + return Objects.hashCode(f_predicate); + } + + // ----- data members ----------------------------------------------- + + private final Predicate f_predicate; + + private final CompletableFuture f_future = new CompletableFuture<>(); + } + + // ----- data members --------------------------------------------------- + + private final List f_listListener = new CopyOnWriteArrayList<>(); + }