Skip to content

Commit

Permalink
[improve][broker] Support showing client ip address in client stats w…
Browse files Browse the repository at this point in the history
…hile using reverse proxy (#23974)
  • Loading branch information
rdhabalia authored Mar 5, 2025
1 parent d6c09af commit 0cc266d
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.AckSetStateUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -64,6 +65,7 @@
import org.apache.pulsar.common.api.proto.KeyLongValue;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.TopicOperation;
Expand Down Expand Up @@ -226,7 +228,8 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.metadata = metadata != null ? metadata : Collections.emptyMap();

stats = new ConsumerStatsImpl();
stats.setAddress(cnx.clientSourceAddressAndPort());
String address = metadata != null ? metadata.get(Metadata.CLIENT_IP) : null;
stats.setAddress(StringUtils.isNotBlank(address) ? address : cnx.clientSourceAddressAndPort());
stats.consumerName = consumerName;
stats.appId = appId;
stats.setConnectedSince(DateFormatter.format(connectedSince));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
Expand All @@ -55,6 +56,7 @@
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.TopicOperation;
Expand Down Expand Up @@ -131,7 +133,8 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
this.metadata = metadata != null ? metadata : Collections.emptyMap();

this.stats = isNonPersistentTopic ? new NonPersistentPublisherStatsImpl() : new PublisherStatsImpl();
stats.setAddress(cnx.clientSourceAddressAndPort());
String address = metadata != null ? metadata.get(Metadata.CLIENT_IP) : null;
stats.setAddress(StringUtils.isNotBlank(address) ? address : cnx.clientSourceAddressAndPort());
stats.setConnectedSince(DateFormatter.now());
stats.setClientVersion(cnx.getClientVersion());
stats.setProducerName(producerName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
*/
package org.apache.pulsar.client.api;

import static org.testng.Assert.assertNotNull;
import java.net.InetAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.testng.annotations.Test;

import lombok.Cleanup;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.common.naming.Metadata;

@Test(groups = "broker-api")
public class TlsSniTest extends TlsProducerConsumerBase {
Expand All @@ -51,6 +54,7 @@ public void testIpAddressInBrokerServiceUrl() throws Exception {

ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceIpAddressUrl)
.tlsTrustCertsFilePath(CA_CERT_FILE_PATH).allowTlsInsecureConnection(false)
.proxyServiceUrl(brokerServiceIpAddressUrl, ProxyProtocol.SNI)
.enableTlsHostnameVerification(false)
.operationTimeout(1000, TimeUnit.MILLISECONDS);
Map<String, String> authParams = new HashMap<>();
Expand All @@ -62,6 +66,12 @@ public void testIpAddressInBrokerServiceUrl() throws Exception {
PulsarClient pulsarClient = clientBuilder.build();
// should be able to create producer successfully
pulsarClient.newProducer().topic(topicName).create();
pulsarClient.newConsumer().topic(topicName).subscriptionName("test").subscribe();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get();
Producer producer = topic.getProducers().values().iterator().next();
assertNotNull(producer.getMetadata().get(Metadata.CLIENT_IP));
Consumer consumer = topic.getSubscription("test").getDispatcher().getConsumers().iterator().next();
assertNotNull(consumer.getMetadata().get(Metadata.CLIENT_IP));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
@SuppressWarnings("unchecked")
public class ClientCnx extends PulsarHandler {

private static final Logger log = LoggerFactory.getLogger(ClientCnx.class);
protected final Authentication authentication;
protected State state;

Expand Down Expand Up @@ -1438,7 +1439,13 @@ private void checkRequestTimeout() {
}
}

private static final Logger log = LoggerFactory.getLogger(ClientCnx.class);
public boolean isProxy() {
return proxyToTargetBrokerAddress != null;
}

public SocketAddress getLocalAddress() {
return this.localAddress;
}

/**
* Check client connection is now free. This method will not change the state to idle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down Expand Up @@ -187,7 +188,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle

private final MessageCrypto msgCrypto;

private final Map<String, String> metadata;
private Map<String, String> metadata;

private final boolean readCompacted;
private final boolean resetIncludeHead;
Expand Down Expand Up @@ -361,7 +362,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
if (conf.getProperties().isEmpty()) {
metadata = Collections.emptyMap();
} else {
metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
metadata = new HashMap<>(conf.getProperties());
}

this.connectionHandler = new ConnectionHandler(this,
Expand Down Expand Up @@ -910,6 +911,7 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
// synchronized this, because redeliverUnAckMessage eliminate the epoch inconsistency between them
final CompletableFuture<Void> future = new CompletableFuture<>();
synchronized (this) {
updateProxyMetadataIfNeeded(cnx);
ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(),
priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted,
conf.getReplicateSubscriptionState(),
Expand Down Expand Up @@ -3134,6 +3136,14 @@ private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, Conc
return cmd;
}

private void updateProxyMetadataIfNeeded(ClientCnx cnx) {
boolean isProxy = cnx.isProxy() || client.getConfiguration().getProxyServiceUrl() != null;
if (isProxy && cnx.getLocalAddress() != null) {
metadata = metadata.isEmpty() ? new HashMap<>() : metadata;
metadata.put(Metadata.CLIENT_IP, cnx.getLocalAddress().toString());
}
}

private CompletableFuture<Void> doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
Map<String, Long> properties, TxnID txnID) {
long requestId = client.newRequestId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
Expand Down Expand Up @@ -156,7 +157,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne

private ScheduledFuture<?> keyGeneratorTask = null;

private final Map<String, String> metadata;
private Map<String, String> metadata;

private Optional<byte[]> schemaVersion = Optional.empty();

Expand Down Expand Up @@ -280,7 +281,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
if (conf.getProperties().isEmpty()) {
metadata = Collections.emptyMap();
} else {
metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
metadata = new HashMap<>(conf.getProperties());
}

InstrumentProvider ip = client.instrumentProvider();
Expand Down Expand Up @@ -1856,6 +1857,7 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
}

final CompletableFuture<Void> future = new CompletableFuture<>();
updateProxyMetadataIfNeeded(cnx);
cnx.sendRequestWithId(
Commands.newProducer(topic, producerId, requestId, producerName, conf.isEncryptionEnabled(), metadata,
schemaInfo, epoch, userProvidedProducerName,
Expand Down Expand Up @@ -2028,6 +2030,14 @@ public void connectionFailed(PulsarClientException exception) {
}
}

private void updateProxyMetadataIfNeeded(ClientCnx cnx) {
boolean isProxy = cnx.isProxy() || client.getConfiguration().getProxyServiceUrl() != null;
if (isProxy && cnx.getLocalAddress() != null) {
metadata = metadata.isEmpty() ? new HashMap<>() : metadata;
metadata.put(Metadata.CLIENT_IP, cnx.getLocalAddress().toString());
}
}

private void closeProducerTasks() {
Timeout timeout = sendTimeout;
if (timeout != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
*/
public class Metadata {

public static final String CLIENT_IP = "X-Pulsar-Client-IP";
private Metadata() {}

public static void validateMetadata(Map<String, String> metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ public void testSimpleProduceAndConsume() throws PulsarClientException, PulsarAd
SubscriptionStats subscriptionStats = topicStats.getSubscriptions().get(subName);
Assert.assertEquals(subscriptionStats.getConsumers().size(), 1);
Assert.assertEquals(subscriptionStats.getConsumers().get(0).getAddress(),
((ConsumerImpl) consumer).getClientCnx().ctx().channel().localAddress().toString().replaceFirst("/", ""));
((ConsumerImpl) consumer).getClientCnx().ctx().channel().localAddress().toString());

topicStats = admin.topics().getStats(topicName);
Assert.assertEquals(topicStats.getPublishers().size(), 1);
Assert.assertEquals(topicStats.getPublishers().get(0).getAddress(),
((ProducerImpl) producer).getClientCnx().ctx().channel().localAddress().toString().replaceFirst("/", ""));
((ProducerImpl) producer).getClientCnx().ctx().channel().localAddress().toString());
}
}

0 comments on commit 0cc266d

Please sign in to comment.