diff --git a/block-node/block-access/src/main/java/module-info.java b/block-node/block-access/src/main/java/module-info.java index b60c84618..11e2d5b07 100644 --- a/block-node/block-access/src/main/java/module-info.java +++ b/block-node/block-access/src/main/java/module-info.java @@ -4,11 +4,10 @@ module org.hiero.block.node.access.service { uses com.swirlds.config.api.spi.ConfigurationBuilderFactory; - requires transitive com.hedera.pbj.runtime; + requires transitive org.hiero.block.protobuf; requires transitive org.hiero.block.node.spi; requires com.swirlds.metrics.api; - requires org.hiero.block.protobuf; - requires com.github.spotbugs.annotations; + requires com.hedera.pbj.runtime; provides org.hiero.block.node.spi.BlockNodePlugin with BlockAccessServicePlugin; diff --git a/block-node/block-access/src/main/java/org/hiero/block/node/access/service/BlockAccessServicePlugin.java b/block-node/block-access/src/main/java/org/hiero/block/node/access/service/BlockAccessServicePlugin.java index c6dd2207c..7cc053829 100644 --- a/block-node/block-access/src/main/java/org/hiero/block/node/access/service/BlockAccessServicePlugin.java +++ b/block-node/block-access/src/main/java/org/hiero/block/node/access/service/BlockAccessServicePlugin.java @@ -6,20 +6,12 @@ import static java.lang.System.Logger.Level.WARNING; import com.hedera.hapi.block.stream.Block; -import com.hedera.pbj.runtime.grpc.GrpcException; -import com.hedera.pbj.runtime.grpc.Pipeline; -import com.hedera.pbj.runtime.grpc.Pipelines; -import com.hedera.pbj.runtime.grpc.ServiceInterface; -import com.hedera.pbj.runtime.io.buffer.Bytes; import com.swirlds.metrics.api.Counter; -import edu.umd.cs.findbugs.annotations.NonNull; -import java.util.Arrays; -import java.util.List; +import org.hiero.block.api.BlockAccessServiceInterface; import org.hiero.block.api.BlockRequest; import org.hiero.block.api.BlockResponse; import org.hiero.block.api.BlockResponse.Code; // PBJ doesn't generate GRPC stubs for some reason, also the proto file is broken when PBJ compiles it... -import org.hiero.block.api.protoc.BlockAccessServiceGrpc; import org.hiero.block.node.spi.BlockNodeContext; import org.hiero.block.node.spi.BlockNodePlugin; import org.hiero.block.node.spi.ServiceBuilder; @@ -28,7 +20,7 @@ /** * Plugin that implements the BlockAccessService and provides the 'block' RPC. */ -public class BlockAccessServicePlugin implements BlockNodePlugin, ServiceInterface { +public class BlockAccessServicePlugin implements BlockNodePlugin, BlockAccessServiceInterface { /** The logger for this class. */ private final System.Logger LOGGER = System.getLogger(getClass().getName()); @@ -43,13 +35,15 @@ public class BlockAccessServicePlugin implements BlockNodePlugin, ServiceInterfa /** Counter for the number of responses not found */ private Counter responseCounterNotFound; + // ==== BlockAccessServiceInterface Methods ======================================================================== + /** * Handle a request for a single block * * @param request the request containing the block number or latest flag * @return the response containing the block or an error status */ - private BlockResponse handleBlockRequest(BlockRequest request) { + public BlockResponse getBlock(BlockRequest request) { LOGGER.log(DEBUG, "Received BlockRequest for block number: {0}", request.blockNumber()); requestCounter.increment(); @@ -71,7 +65,7 @@ private BlockResponse handleBlockRequest(BlockRequest request) { blockNumberToRetrieve = request.blockNumber(); } - // Check if block is within available range + // Check if block is within the available range if (!blockProvider.availableBlocks().contains(blockNumberToRetrieve)) { long lowestBlockNumber = blockProvider.availableBlocks().min(); long highestBlockNumber = blockProvider.availableBlocks().max(); @@ -99,11 +93,18 @@ private BlockResponse handleBlockRequest(BlockRequest request) { } // ==== BlockNodePlugin Methods ==================================================================================== + + /** + * {@inheritDoc} + */ @Override public String name() { return "BlockAccessServicePlugin"; } + /** + * {@inheritDoc} + */ @Override public void init(BlockNodeContext context, ServiceBuilder serviceBuilder) { // Create the metrics @@ -124,65 +125,4 @@ public void init(BlockNodeContext context, ServiceBuilder serviceBuilder) { // Register this service serviceBuilder.registerGrpcService(this); } - - // ==== ServiceInterface Methods =================================================================================== - /** - * BlockAccessService methods define the gRPC methods available on the BlockAccessService. - */ - enum BlockAccessServiceMethod implements Method { - /** - * The getBlock method retrieves a single block from the block node. - */ - getBlock - } - - /** - * {@inheritDoc} - */ - @NonNull - @Override - public String serviceName() { - String[] parts = fullName().split("\\."); - return parts[parts.length - 1]; - } - - /** - * {@inheritDoc} - */ - @NonNull - @Override - public String fullName() { - return BlockAccessServiceGrpc.SERVICE_NAME; - } - - /** - * {@inheritDoc} - */ - @NonNull - @Override - public List methods() { - return Arrays.asList(BlockAccessServiceMethod.values()); - } - - /** - * {@inheritDoc} - * - * This is called each time a new request is received. - */ - @NonNull - @Override - public Pipeline open( - @NonNull Method method, @NonNull RequestOptions requestOptions, @NonNull Pipeline pipeline) - throws GrpcException { - final BlockAccessServiceMethod blockAccessServiceMethod = (BlockAccessServiceMethod) method; - return switch (blockAccessServiceMethod) { - case getBlock: - yield Pipelines.unary() - .mapRequest(BlockRequest.PROTOBUF::parse) - .method(this::handleBlockRequest) - .mapResponse(BlockResponse.PROTOBUF::toBytes) - .respondTo(pipeline) - .build(); - }; - } } diff --git a/block-node/block-access/src/test/java/org/hiero/block/node/access/service/BlockAccessServicePluginTest.java b/block-node/block-access/src/test/java/org/hiero/block/node/access/service/BlockAccessServicePluginTest.java index 894e60aab..98c1f9a7e 100644 --- a/block-node/block-access/src/test/java/org/hiero/block/node/access/service/BlockAccessServicePluginTest.java +++ b/block-node/block-access/src/test/java/org/hiero/block/node/access/service/BlockAccessServicePluginTest.java @@ -1,7 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 package org.hiero.block.node.access.service; -import static org.hiero.block.node.access.service.BlockAccessServicePlugin.BlockAccessServiceMethod.getBlock; import static org.hiero.block.node.app.fixtures.TestUtils.enableDebugLogging; import static org.hiero.block.node.app.fixtures.blocks.BlockItemUtils.toBlockItemsUnparsed; import static org.hiero.block.node.app.fixtures.blocks.SimpleTestBlockItemBuilder.createNumberOfVerySimpleBlocks; @@ -25,10 +24,11 @@ import org.junit.jupiter.api.Test; public class BlockAccessServicePluginTest extends GrpcPluginTestBase { + private final BlockAccessServicePlugin plugin = new BlockAccessServicePlugin(); public BlockAccessServicePluginTest() { super(); - start(new BlockAccessServicePlugin(), getBlock, new SimpleInMemoryHistoricalBlockFacility()); + start(plugin, plugin.methods().getFirst(), new SimpleInMemoryHistoricalBlockFacility()); } /** @@ -54,7 +54,7 @@ void testServiceInterfaceBasics() { List methods = serviceInterface.methods(); assertNotNull(methods); assertEquals(1, methods.size()); - assertEquals(getBlock, methods.getFirst()); + assertEquals(plugin.methods().getFirst(), methods.getFirst()); } @Test diff --git a/block-node/publisher/src/main/java/org/hiero/block/node/publisher/PublisherServicePlugin.java b/block-node/publisher/src/main/java/org/hiero/block/node/publisher/PublisherServicePlugin.java index aae982349..7fb385053 100644 --- a/block-node/publisher/src/main/java/org/hiero/block/node/publisher/PublisherServicePlugin.java +++ b/block-node/publisher/src/main/java/org/hiero/block/node/publisher/PublisherServicePlugin.java @@ -8,12 +8,10 @@ import com.hedera.pbj.runtime.grpc.GrpcException; import com.hedera.pbj.runtime.grpc.Pipeline; import com.hedera.pbj.runtime.grpc.Pipelines; -import com.hedera.pbj.runtime.grpc.ServiceInterface; import com.hedera.pbj.runtime.io.buffer.Bytes; import com.swirlds.metrics.api.Counter; import com.swirlds.metrics.api.LongGauge; import edu.umd.cs.findbugs.annotations.NonNull; -import java.util.Arrays; import java.util.Comparator; import java.util.HashSet; import java.util.List; @@ -23,8 +21,9 @@ import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.locks.ReentrantLock; +import org.hiero.block.api.BlockStreamPublishServiceInterface; +import org.hiero.block.api.PublishStreamRequest; import org.hiero.block.api.PublishStreamResponse; -import org.hiero.block.api.protoc.BlockStreamPublishServiceGrpc; import org.hiero.block.internal.BlockItemUnparsed; import org.hiero.block.internal.PublishStreamRequestUnparsed; import org.hiero.block.node.publisher.PublisherConfig.PublisherType; @@ -59,7 +58,8 @@ *

* TODO Still lots to work out on tracking the various stages of blocks, latest in flight, etc. */ -public final class PublisherServicePlugin implements BlockNodePlugin, ServiceInterface, BlockNotificationHandler { +public final class PublisherServicePlugin + implements BlockNodePlugin, BlockStreamPublishServiceInterface, BlockNotificationHandler { /** The logger for this class. */ private final System.Logger LOGGER = System.getLogger(getClass().getName()); @@ -465,43 +465,8 @@ public void handlePersisted(PersistedNotification notification) { } } - // ==== ServiceInterface Methods =================================================================================== - - /** - * BlockStreamPublisherService types define the gRPC methods available on the BlockStreamPublisherService. - */ - enum BlockStreamPublisherServiceMethod implements Method { - /** - * The publishBlockStream method represents the bidirectional gRPC streaming method - * Consensus Nodes should use to publish the BlockStream to the Block Node. - */ - publishBlockStream - } - - /** - * {@inheritDoc} - */ - @NonNull - public String serviceName() { - String[] parts = fullName().split("\\."); - return parts[parts.length - 1]; - } - - /** - * {@inheritDoc} - */ - @NonNull - public String fullName() { - return BlockStreamPublishServiceGrpc.SERVICE_NAME; - } - - /** - * {@inheritDoc} - */ - @NonNull - public List methods() { - return Arrays.asList(BlockStreamPublisherServiceMethod.values()); - } + // ==== BlockStreamPublishServiceInterface Methods + // =================================================================================== /** * {@inheritDoc} @@ -516,8 +481,8 @@ public Pipeline open( throws GrpcException { stateLock.lock(); try { - final BlockStreamPublisherServiceMethod blockStreamPublisherServiceMethod = - (BlockStreamPublisherServiceMethod) method; + final BlockStreamPublishServiceMethod blockStreamPublisherServiceMethod = + (BlockStreamPublishServiceMethod) method; return switch (blockStreamPublisherServiceMethod) { case publishBlockStream: final var pipe = Pipelines., PublishStreamResponse>bidiStreaming() @@ -555,4 +520,12 @@ public Pipeline open( stateLock.unlock(); } } + + /** + * Never used, but required by the interface. We override the open method to handle directly. + */ + @Override + public Pipeline publishBlockStream(Pipeline replies) { + return null; + } } diff --git a/block-node/publisher/src/test/java/org/hiero/block/node/publisher/PublisherTest.java b/block-node/publisher/src/test/java/org/hiero/block/node/publisher/PublisherTest.java index 1dfcc98a8..556eb98e1 100644 --- a/block-node/publisher/src/test/java/org/hiero/block/node/publisher/PublisherTest.java +++ b/block-node/publisher/src/test/java/org/hiero/block/node/publisher/PublisherTest.java @@ -9,7 +9,6 @@ import static org.hiero.block.node.app.fixtures.blocks.SimpleTestBlockItemBuilder.sampleBlockHeader; import static org.hiero.block.node.app.fixtures.blocks.SimpleTestBlockItemBuilder.sampleBlockProof; import static org.hiero.block.node.app.fixtures.blocks.SimpleTestBlockItemBuilder.sampleRoundHeader; -import static org.hiero.block.node.publisher.PublisherServicePlugin.BlockStreamPublisherServiceMethod.publishBlockStream; import static org.hiero.block.node.spi.BlockNodePlugin.UNKNOWN_BLOCK_NUMBER; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -21,6 +20,7 @@ import com.hedera.pbj.runtime.io.buffer.Bytes; import java.util.List; import org.hiero.block.api.BlockItemSet; +import org.hiero.block.api.BlockStreamPublishServiceInterface.BlockStreamPublishServiceMethod; import org.hiero.block.api.PublishStreamRequest; import org.hiero.block.api.PublishStreamRequest.RequestOneOfType; import org.hiero.block.api.PublishStreamResponse; @@ -42,7 +42,10 @@ public class PublisherTest extends GrpcPluginTestBase { public PublisherTest() { - start(new PublisherServicePlugin(), publishBlockStream, new NoBlocksHistoricalBlockFacility()); + start( + new PublisherServicePlugin(), + BlockStreamPublishServiceMethod.publishBlockStream, + new NoBlocksHistoricalBlockFacility()); } @Test @@ -53,7 +56,7 @@ void testServiceInterfaceBasics() { List methods = serviceInterface.methods(); assertNotNull(methods); assertEquals(1, methods.size()); - assertEquals(publishBlockStream, methods.getFirst()); + assertEquals(BlockStreamPublishServiceMethod.publishBlockStream, methods.getFirst()); } @Test diff --git a/block-node/stream-subscriber/src/main/java/org/hiero/block/node/stream/subscriber/SubscriberServicePlugin.java b/block-node/stream-subscriber/src/main/java/org/hiero/block/node/stream/subscriber/SubscriberServicePlugin.java index 277368d6b..cd399c5ac 100644 --- a/block-node/stream-subscriber/src/main/java/org/hiero/block/node/stream/subscriber/SubscriberServicePlugin.java +++ b/block-node/stream-subscriber/src/main/java/org/hiero/block/node/stream/subscriber/SubscriberServicePlugin.java @@ -7,13 +7,11 @@ import com.hedera.pbj.runtime.grpc.Pipeline; import com.hedera.pbj.runtime.grpc.Pipelines; import com.hedera.pbj.runtime.grpc.Pipelines.ServerStreamingMethod; -import com.hedera.pbj.runtime.grpc.ServiceInterface; import com.hedera.pbj.runtime.io.buffer.Bytes; import com.swirlds.metrics.api.LongGauge; import edu.umd.cs.findbugs.annotations.NonNull; import java.lang.System.Logger; import java.lang.System.Logger.Level; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -24,9 +22,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; +import org.hiero.block.api.BlockStreamSubscribeServiceInterface; import org.hiero.block.api.SubscribeStreamRequest; import org.hiero.block.api.SubscribeStreamResponse; -import org.hiero.block.api.protoc.BlockStreamSubscribeServiceGrpc; import org.hiero.block.internal.SubscribeStreamResponseUnparsed; import org.hiero.block.node.spi.BlockNodeContext; import org.hiero.block.node.spi.BlockNodePlugin; @@ -39,9 +37,7 @@ *

The plugin registers itself with the service builder during initialization and manages * the lifecycle of subscriber connections. */ -public class SubscriberServicePlugin implements BlockNodePlugin, ServiceInterface { - /** The service name for this service, which must match the gRPC service name */ - private static final String SERVICE_NAME = parseGrpcName(); +public class SubscriberServicePlugin implements BlockNodePlugin, BlockStreamSubscribeServiceInterface { /** The logger for this class. */ private final Logger LOGGER = System.getLogger(getClass().getName()); /** The block node context, used to provide access to facilities */ @@ -61,23 +57,10 @@ public void init(@NonNull final BlockNodeContext context, @NonNull final Service serviceBuilder.registerGrpcService(this); } - /*==================== ServiceInterface Methods ====================*/ - - /** - * BlockStreamSubscriberService types define the gRPC methods available on the BlockStreamSubscriberService. - */ - enum SubscriberServiceMethod implements Method { - /** - * The subscribeBlockStream method represents the server-streaming gRPC method - * consumers should use to subscribe to the BlockStream from the Block Node. - */ - subscribeBlockStream - } - @Override public void start() { // Create the client handler and wait for it to start and reach a ready state. - clientHandler = new SubscribeBlockStreamHandler(context, this); + clientHandler = new SubscribeBlockStreamHandler(context); } @Override @@ -90,45 +73,12 @@ public void stop() { */ @Override @NonNull - public String serviceName() { - return SERVICE_NAME; - } - - /** - * {@inheritDoc} - */ - @Override - @NonNull - public String fullName() { - return BlockStreamSubscribeServiceGrpc.SERVICE_NAME; - } - - /** - * Minimal method to parse the gRPC service name from the full name. - *
This is called once and stored statically. - */ - private static String parseGrpcName() { - final String[] parts = BlockStreamSubscribeServiceGrpc.SERVICE_NAME.split("\\."); - return parts[parts.length - 1]; - } - - /** - * {@inheritDoc} - */ - @Override - @NonNull - public List methods() { - return Arrays.asList(SubscriberServiceMethod.values()); - } - - /** - * {@inheritDoc} - */ - @Override public List> configDataTypes() { return List.of(SubscriberConfig.class); } + /*==================== BlockStreamSubscribeServiceInterface Methods ====================*/ + /** * {@inheritDoc} * @@ -140,20 +90,27 @@ public Pipeline open( @NonNull Method method, @NonNull RequestOptions opts, @NonNull Pipeline responses) throws GrpcException { LOGGER.log(Level.DEBUG, "Real Plugin Open called"); - final SubscriberServiceMethod subscriberServiceMethod = (SubscriberServiceMethod) method; + final BlockStreamSubscribeServiceMethod subscriberServiceMethod = (BlockStreamSubscribeServiceMethod) method; return switch (subscriberServiceMethod) { - case subscribeBlockStream: - // subscribeBlockStream is server streaming end point so the client sends a single request and the - // server sends many responses - yield Pipelines.serverStreaming() - .mapRequest(SubscribeStreamRequest.PROTOBUF::parse) - .method(clientHandler) - .mapResponse(SubscribeStreamResponseUnparsed.PROTOBUF::toBytes) - .respondTo(responses) - .build(); + case subscribeBlockStream -> + // subscribeBlockStream is server streaming end point, so the client sends a single request, and the + // server sends many responses + Pipelines.serverStreaming() + .mapRequest(SubscribeStreamRequest.PROTOBUF::parse) + .method(clientHandler) + .mapResponse(SubscribeStreamResponseUnparsed.PROTOBUF::toBytes) + .respondTo(responses) + .build(); }; } + /** + * Does nothing but is required by the interface. We override the open method directly to handle requests. + */ + @Override + public void subscribeBlockStream( + SubscribeStreamRequest request, Pipeline replies) {} + // Visible for Testing Map getOpenSessions() { return clientHandler.getOpenSessions(); @@ -171,8 +128,6 @@ static class SubscribeBlockStreamHandler private final AtomicLong nextClientId = new AtomicLong(0); /** A context that applies to the pipeline this handler supports. */ private final BlockNodeContext context; - /** A plugin instance that created and "owns" this Handler. */ - private final SubscriberServicePlugin plugin; /** Set of open client sessions */ private final Map openSessions; @@ -180,12 +135,10 @@ static class SubscribeBlockStreamHandler private final ExecutorService virtualThreadExecutor; private final ExecutorCompletionService streamSessions; - private SubscribeBlockStreamHandler( - @NonNull final BlockNodeContext context, @NonNull final SubscriberServicePlugin plugin) { + private SubscribeBlockStreamHandler(@NonNull final BlockNodeContext context) { this.context = requireNonNull(context); - this.plugin = requireNonNull(plugin); this.openSessions = new ConcurrentSkipListMap<>(); - virtualThreadExecutor = context.threadPoolManager().getVirtualThreadExecutor(null); + virtualThreadExecutor = context.threadPoolManager().getVirtualThreadExecutor("SubscribeBlockStreamHandler"); streamSessions = new ExecutorCompletionService<>(virtualThreadExecutor); // create the metrics numberOfSubscribers = context.metrics() diff --git a/block-node/stream-subscriber/src/test/java/org/hiero/block/node/stream/subscriber/SubscriberTest.java b/block-node/stream-subscriber/src/test/java/org/hiero/block/node/stream/subscriber/SubscriberTest.java index 1d018e471..69a027623 100644 --- a/block-node/stream-subscriber/src/test/java/org/hiero/block/node/stream/subscriber/SubscriberTest.java +++ b/block-node/stream-subscriber/src/test/java/org/hiero/block/node/stream/subscriber/SubscriberTest.java @@ -3,11 +3,11 @@ import static java.util.concurrent.locks.LockSupport.parkNanos; import static org.assertj.core.api.Assertions.assertThat; +import static org.hiero.block.api.BlockStreamSubscribeServiceInterface.BlockStreamSubscribeServiceMethod.subscribeBlockStream; import static org.hiero.block.node.app.fixtures.TestUtils.enableDebugLogging; import static org.hiero.block.node.app.fixtures.blocks.BlockItemUtils.toBlockItem; import static org.hiero.block.node.app.fixtures.blocks.SimpleTestBlockItemBuilder.createNumberOfSimpleBlockBatches; import static org.hiero.block.node.spi.BlockNodePlugin.UNKNOWN_BLOCK_NUMBER; -import static org.hiero.block.node.stream.subscriber.SubscriberServicePlugin.SubscriberServiceMethod.subscribeBlockStream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; diff --git a/protobuf/src/main/java/module-info.java b/protobuf/src/main/java/module-info.java index 21685ebdf..230653305 100644 --- a/protobuf/src/main/java/module-info.java +++ b/protobuf/src/main/java/module-info.java @@ -1,9 +1,5 @@ // SPDX-License-Identifier: Apache-2.0 module org.hiero.block.protobuf { - exports com.hedera.hapi.block.stream.protoc; - exports com.hedera.hapi.block.stream.input.protoc; - exports com.hedera.hapi.block.stream.output.protoc; - exports com.hedera.hapi.platform.event.legacy; exports com.hedera.hapi.block.stream; exports com.hedera.hapi.block.stream.input; exports com.hedera.hapi.block.stream.output; @@ -60,18 +56,37 @@ exports com.hedera.hapi.node.state.primitives; exports com.hedera.hapi.node.state.throttles; exports com.hedera.hapi.node.state.congestion; - exports com.hedera.services.stream.proto; - exports com.hederahashgraph.api.proto.java; - exports com.hederahashgraph.service.proto.java; exports com.hedera.hapi.platform.state; exports com.hedera.hapi.node.state.roster; exports com.hedera.hapi.block.stream.schema; - exports com.hedera.hapi.platform.state.legacy to - com.google.protobuf; exports org.hiero.block.api; - exports org.hiero.block.api.protoc; exports org.hiero.block.internal; - exports org.hiero.block.internal.protoc; + + // only export protoc to simulator & suites till they are ported to PBJ + exports com.hedera.hapi.platform.state.legacy to + com.google.protobuf; + exports org.hiero.block.api.protoc to + org.hiero.block.simulator, + org.hiero.block.node.suites; + exports org.hiero.block.internal.protoc to + org.hiero.block.simulator; + exports com.hedera.services.stream.proto to + org.hiero.block.simulator; + exports com.hederahashgraph.api.proto.java to + org.hiero.block.simulator; + exports com.hederahashgraph.service.proto.java to + org.hiero.block.simulator; + exports com.hedera.hapi.block.stream.protoc to + org.hiero.block.simulator, + org.hiero.block.node.suites; + exports com.hedera.hapi.block.stream.input.protoc to + org.hiero.block.simulator, + org.hiero.block.node.suites; + exports com.hedera.hapi.block.stream.output.protoc to + org.hiero.block.simulator, + org.hiero.block.node.suites; + exports com.hedera.hapi.platform.event.legacy to + org.hiero.block.simulator; requires transitive com.hedera.pbj.runtime; requires transitive com.google.common;