From 0cc266d54d69205c78dc1bcf03f0b608c20fb62b Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Tue, 4 Mar 2025 22:37:11 -0800 Subject: [PATCH] [improve][broker] Support showing client ip address in client stats while using reverse proxy (#23974) --- .../org/apache/pulsar/broker/service/Consumer.java | 5 ++++- .../org/apache/pulsar/broker/service/Producer.java | 5 ++++- .../org/apache/pulsar/client/api/TlsSniTest.java | 14 ++++++++++++-- .../org/apache/pulsar/client/impl/ClientCnx.java | 9 ++++++++- .../apache/pulsar/client/impl/ConsumerImpl.java | 14 ++++++++++++-- .../apache/pulsar/client/impl/ProducerImpl.java | 14 ++++++++++++-- .../org/apache/pulsar/common/naming/Metadata.java | 1 + .../server/ProxyEnableHAProxyProtocolTest.java | 4 ++-- 8 files changed, 55 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 61f9d5c86b32f..a59b755144ef3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -48,6 +48,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.impl.AckSetStateUtil; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -64,6 +65,7 @@ import org.apache.pulsar.common.api.proto.KeyLongValue; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.MessageIdData; +import org.apache.pulsar.common.naming.Metadata; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.TopicOperation; @@ -226,7 +228,8 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo this.metadata = metadata != null ? metadata : Collections.emptyMap(); stats = new ConsumerStatsImpl(); - stats.setAddress(cnx.clientSourceAddressAndPort()); + String address = metadata != null ? metadata.get(Metadata.CLIENT_IP) : null; + stats.setAddress(StringUtils.isNotBlank(address) ? address : cnx.clientSourceAddressAndPort()); stats.consumerName = consumerName; stats.appId = appId; stats.setConnectedSince(DateFormatter.format(connectedSince)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 0784f74591ec5..a494627aa4d79 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.bookkeeper.mledger.Position; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; @@ -55,6 +56,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ProducerAccessMode; import org.apache.pulsar.common.api.proto.ServerError; +import org.apache.pulsar.common.naming.Metadata; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.TopicOperation; @@ -131,7 +133,8 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN this.metadata = metadata != null ? metadata : Collections.emptyMap(); this.stats = isNonPersistentTopic ? new NonPersistentPublisherStatsImpl() : new PublisherStatsImpl(); - stats.setAddress(cnx.clientSourceAddressAndPort()); + String address = metadata != null ? metadata.get(Metadata.CLIENT_IP) : null; + stats.setAddress(StringUtils.isNotBlank(address) ? address : cnx.clientSourceAddressAndPort()); stats.setConnectedSince(DateFormatter.now()); stats.setClientVersion(cnx.getClientVersion()); stats.setProducerName(producerName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java index 173fa8acb0fdd..56fd26a032688 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java @@ -18,16 +18,19 @@ */ package org.apache.pulsar.client.api; +import static org.testng.Assert.assertNotNull; import java.net.InetAddress; import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; - import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.testng.annotations.Test; - import lombok.Cleanup; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.Producer; +import org.apache.pulsar.common.naming.Metadata; @Test(groups = "broker-api") public class TlsSniTest extends TlsProducerConsumerBase { @@ -51,6 +54,7 @@ public void testIpAddressInBrokerServiceUrl() throws Exception { ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceIpAddressUrl) .tlsTrustCertsFilePath(CA_CERT_FILE_PATH).allowTlsInsecureConnection(false) + .proxyServiceUrl(brokerServiceIpAddressUrl, ProxyProtocol.SNI) .enableTlsHostnameVerification(false) .operationTimeout(1000, TimeUnit.MILLISECONDS); Map authParams = new HashMap<>(); @@ -62,6 +66,12 @@ public void testIpAddressInBrokerServiceUrl() throws Exception { PulsarClient pulsarClient = clientBuilder.build(); // should be able to create producer successfully pulsarClient.newProducer().topic(topicName).create(); + pulsarClient.newConsumer().topic(topicName).subscriptionName("test").subscribe(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get(); + Producer producer = topic.getProducers().values().iterator().next(); + assertNotNull(producer.getMetadata().get(Metadata.CLIENT_IP)); + Consumer consumer = topic.getSubscription("test").getDispatcher().getConsumers().iterator().next(); + assertNotNull(consumer.getMetadata().get(Metadata.CLIENT_IP)); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index e8b691b2eea17..1659b611096bb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -119,6 +119,7 @@ @SuppressWarnings("unchecked") public class ClientCnx extends PulsarHandler { + private static final Logger log = LoggerFactory.getLogger(ClientCnx.class); protected final Authentication authentication; protected State state; @@ -1438,7 +1439,13 @@ private void checkRequestTimeout() { } } - private static final Logger log = LoggerFactory.getLogger(ClientCnx.class); + public boolean isProxy() { + return proxyToTargetBrokerAddress != null; + } + + public SocketAddress getLocalAddress() { + return this.localAddress; + } /** * Check client connection is now free. This method will not change the state to idle. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 6f2ad9152d3f6..4691c402b2fef 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -121,6 +121,7 @@ import org.apache.pulsar.common.api.proto.SingleMessageMetadata; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.naming.Metadata; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.schema.SchemaInfo; @@ -187,7 +188,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final MessageCrypto msgCrypto; - private final Map metadata; + private Map metadata; private final boolean readCompacted; private final boolean resetIncludeHead; @@ -361,7 +362,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat if (conf.getProperties().isEmpty()) { metadata = Collections.emptyMap(); } else { - metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties())); + metadata = new HashMap<>(conf.getProperties()); } this.connectionHandler = new ConnectionHandler(this, @@ -910,6 +911,7 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { // synchronized this, because redeliverUnAckMessage eliminate the epoch inconsistency between them final CompletableFuture future = new CompletableFuture<>(); synchronized (this) { + updateProxyMetadataIfNeeded(cnx); ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted, conf.getReplicateSubscriptionState(), @@ -3134,6 +3136,14 @@ private static BaseCommand newMultiMessageAckCommon(List() : metadata; + metadata.put(Metadata.CLIENT_IP, cnx.getLocalAddress().toString()); + } + } + private CompletableFuture doTransactionAcknowledgeForResponse(List messageIds, AckType ackType, Map properties, TxnID txnID) { long requestId = client.newRequestId(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 039468386edca..0b1f8edf1072c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -92,6 +92,7 @@ import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.naming.Metadata; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.Commands; @@ -156,7 +157,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private ScheduledFuture keyGeneratorTask = null; - private final Map metadata; + private Map metadata; private Optional schemaVersion = Optional.empty(); @@ -280,7 +281,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration if (conf.getProperties().isEmpty()) { metadata = Collections.emptyMap(); } else { - metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties())); + metadata = new HashMap<>(conf.getProperties()); } InstrumentProvider ip = client.instrumentProvider(); @@ -1856,6 +1857,7 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { } final CompletableFuture future = new CompletableFuture<>(); + updateProxyMetadataIfNeeded(cnx); cnx.sendRequestWithId( Commands.newProducer(topic, producerId, requestId, producerName, conf.isEncryptionEnabled(), metadata, schemaInfo, epoch, userProvidedProducerName, @@ -2028,6 +2030,14 @@ public void connectionFailed(PulsarClientException exception) { } } + private void updateProxyMetadataIfNeeded(ClientCnx cnx) { + boolean isProxy = cnx.isProxy() || client.getConfiguration().getProxyServiceUrl() != null; + if (isProxy && cnx.getLocalAddress() != null) { + metadata = metadata.isEmpty() ? new HashMap<>() : metadata; + metadata.put(Metadata.CLIENT_IP, cnx.getLocalAddress().toString()); + } + } + private void closeProducerTasks() { Timeout timeout = sendTimeout; if (timeout != null) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java index eba492cf6bf5d..635238ef91957 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/Metadata.java @@ -25,6 +25,7 @@ */ public class Metadata { + public static final String CLIENT_IP = "X-Pulsar-Client-IP"; private Metadata() {} public static void validateMetadata(Map metadata, diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java index 40aa8f5040556..f24a80fc2a2fa 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java @@ -122,11 +122,11 @@ public void testSimpleProduceAndConsume() throws PulsarClientException, PulsarAd SubscriptionStats subscriptionStats = topicStats.getSubscriptions().get(subName); Assert.assertEquals(subscriptionStats.getConsumers().size(), 1); Assert.assertEquals(subscriptionStats.getConsumers().get(0).getAddress(), - ((ConsumerImpl) consumer).getClientCnx().ctx().channel().localAddress().toString().replaceFirst("/", "")); + ((ConsumerImpl) consumer).getClientCnx().ctx().channel().localAddress().toString()); topicStats = admin.topics().getStats(topicName); Assert.assertEquals(topicStats.getPublishers().size(), 1); Assert.assertEquals(topicStats.getPublishers().get(0).getAddress(), - ((ProducerImpl) producer).getClientCnx().ctx().channel().localAddress().toString().replaceFirst("/", "")); + ((ProducerImpl) producer).getClientCnx().ctx().channel().localAddress().toString()); } }