Skip to content

Commit 5a590ad

Browse files
authored
Revert "Add logic to fail if specified partitions do not exist in the topic. …" (#34631)
This reverts commit 7dc3e38.
1 parent 18373bf commit 5a590ad

File tree

4 files changed

+4
-148
lines changed

4 files changed

+4
-148
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1908,15 +1908,7 @@ public void processElement(OutputReceiver<KafkaSourceDescriptor> receiver) {
19081908
}
19091909
} else {
19101910
for (String topic : topics) {
1911-
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
1912-
checkState(
1913-
partitionInfoList != null && !partitionInfoList.isEmpty(),
1914-
"Could not find any partitions info for topic "
1915-
+ topic
1916-
+ ". Please check Kafka configuration and make sure "
1917-
+ "that provided topics exist.");
1918-
1919-
for (PartitionInfo p : partitionInfoList) {
1911+
for (PartitionInfo p : consumer.partitionsFor(topic)) {
19201912
partitions.add(new TopicPartition(p.topic(), p.partition()));
19211913
}
19221914
}

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,9 @@
2323
import java.util.ArrayList;
2424
import java.util.Collections;
2525
import java.util.Comparator;
26-
import java.util.HashMap;
2726
import java.util.List;
2827
import java.util.Map;
29-
import java.util.Set;
3028
import java.util.regex.Pattern;
31-
import java.util.stream.Collectors;
3229
import org.apache.beam.sdk.coders.Coder;
3330
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
3431
import org.apache.beam.sdk.io.UnboundedSource;
@@ -40,7 +37,6 @@
4037
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
4138
import org.apache.kafka.clients.consumer.Consumer;
4239
import org.apache.kafka.clients.consumer.ConsumerConfig;
43-
import org.apache.kafka.common.KafkaException;
4440
import org.apache.kafka.common.PartitionInfo;
4541
import org.apache.kafka.common.TopicPartition;
4642
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -93,7 +89,7 @@ public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOpti
9389
for (String topic : topics) {
9490
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
9591
checkState(
96-
partitionInfoList != null && !partitionInfoList.isEmpty(),
92+
partitionInfoList != null,
9793
"Could not find any partitions info. Please check Kafka configuration and make sure "
9894
+ "that provided topics exist.");
9995
for (PartitionInfo p : partitionInfoList) {
@@ -104,34 +100,8 @@ public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOpti
104100
}
105101
}
106102
} else {
107-
final Map<String, List<Integer>> topicsAndPartitions = new HashMap<>();
108103
for (TopicPartition p : partitions) {
109-
topicsAndPartitions.computeIfAbsent(p.topic(), k -> new ArrayList<>()).add(p.partition());
110-
}
111-
try (Consumer<?, ?> consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig())) {
112-
for (Map.Entry<String, List<Integer>> e : topicsAndPartitions.entrySet()) {
113-
final String providedTopic = e.getKey();
114-
final List<Integer> providedPartitions = e.getValue();
115-
final Set<Integer> partitionsForTopic;
116-
try {
117-
partitionsForTopic =
118-
consumer.partitionsFor(providedTopic).stream()
119-
.map(PartitionInfo::partition)
120-
.collect(Collectors.toSet());
121-
for (Integer p : providedPartitions) {
122-
checkState(
123-
partitionsForTopic.contains(p),
124-
"Partition "
125-
+ p
126-
+ " does not exist for topic "
127-
+ providedTopic
128-
+ ". Please check Kafka configuration.");
129-
}
130-
} catch (KafkaException exception) {
131-
LOG.warn("Unable to access cluster. Skipping fail fast checks.");
132-
}
133-
Lineage.getSources().add("kafka", ImmutableList.of(bootStrapServers, providedTopic));
134-
}
104+
Lineage.getSources().add("kafka", ImmutableList.of(bootStrapServers, p.topic()));
135105
}
136106
}
137107

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.beam.sdk.io.kafka;
1919

2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
21-
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2221

2322
import java.util.ArrayList;
2423
import java.util.List;
@@ -190,14 +189,7 @@ static List<TopicPartition> getAllTopicPartitions(
190189
kafkaConsumerFactoryFn.apply(kafkaConsumerConfig)) {
191190
if (topics != null && !topics.isEmpty()) {
192191
for (String topic : topics) {
193-
List<PartitionInfo> partitionInfoList = kafkaConsumer.partitionsFor(topic);
194-
checkState(
195-
partitionInfoList != null && !partitionInfoList.isEmpty(),
196-
"Could not find any partitions info for topic "
197-
+ topic
198-
+ ". Please check Kafka configuration and make sure "
199-
+ "that provided topics exist.");
200-
for (PartitionInfo partition : partitionInfoList) {
192+
for (PartitionInfo partition : kafkaConsumer.partitionsFor(topic)) {
201193
current.add(new TopicPartition(topic, partition.partition()));
202194
}
203195
}

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java

Lines changed: 0 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import java.util.function.BiFunction;
3838
import java.util.stream.Collectors;
3939
import java.util.stream.LongStream;
40-
import org.apache.beam.sdk.Pipeline;
4140
import org.apache.beam.sdk.PipelineResult;
4241
import org.apache.beam.sdk.coders.ByteArrayCoder;
4342
import org.apache.beam.sdk.coders.NullableCoder;
@@ -89,7 +88,6 @@
8988
import org.apache.beam.sdk.values.Row;
9089
import org.apache.beam.sdk.values.TypeDescriptors;
9190
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
92-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
9391
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
9492
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
9593
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
@@ -112,7 +110,6 @@
112110
import org.junit.Ignore;
113111
import org.junit.Rule;
114112
import org.junit.Test;
115-
import org.junit.rules.ExpectedException;
116113
import org.junit.runner.RunWith;
117114
import org.junit.runners.JUnit4;
118115
import org.slf4j.Logger;
@@ -170,8 +167,6 @@ public class KafkaIOIT {
170167

171168
@Rule public TestPipeline readPipeline = TestPipeline.create();
172169

173-
@Rule public ExpectedException thrown = ExpectedException.none();
174-
175170
private static ExperimentalOptions sdfPipelineOptions;
176171

177172
static {
@@ -216,99 +211,6 @@ public static void afterClass() {
216211
}
217212
}
218213

219-
@Test
220-
public void testKafkaIOFailsFastWithInvalidPartitions() throws IOException {
221-
thrown.expect(Pipeline.PipelineExecutionException.class);
222-
thrown.expectMessage(
223-
"Partition 1000 does not exist for topic "
224-
+ options.getKafkaTopic()
225-
+ ". Please check Kafka configuration.");
226-
227-
// Use streaming pipeline to read Kafka records.
228-
readPipeline.getOptions().as(Options.class).setStreaming(true);
229-
TopicPartition invalidPartition = new TopicPartition(options.getKafkaTopic(), 1000);
230-
readPipeline.apply(
231-
"Read from unbounded Kafka",
232-
readFromKafka().withTopicPartitions(ImmutableList.of(invalidPartition)));
233-
234-
PipelineResult readResult = readPipeline.run();
235-
PipelineResult.State readState =
236-
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
237-
238-
// call asynchronous deleteTopics first since cancelIfTimeouted is blocking.
239-
tearDownTopic(options.getKafkaTopic());
240-
cancelIfTimeouted(readResult, readState);
241-
}
242-
243-
@Test
244-
public void testKafkaIOFailsFastWithInvalidTopics() throws IOException {
245-
// This test will fail on versions before 2.3.0 due to the non-existence of the
246-
// allow.auto.create.topics
247-
// flag. This can be removed when/if support for this older version is dropped.
248-
String actualVer = AppInfoParser.getVersion();
249-
assumeFalse(actualVer.compareTo("2.0.0") >= 0 && actualVer.compareTo("2.3.0") < 0);
250-
251-
thrown.expect(Pipeline.PipelineExecutionException.class);
252-
thrown.expectMessage(
253-
"Could not find any partitions info for topic invalid_topic. Please check Kafka configuration"
254-
+ " and make sure that provided topics exist.");
255-
256-
// Use streaming pipeline to read Kafka records.
257-
sdfReadPipeline.getOptions().as(Options.class).setStreaming(true);
258-
String invalidTopic = "invalid_topic";
259-
sdfReadPipeline.apply(
260-
"Read from unbounded Kafka",
261-
KafkaIO.<byte[], byte[]>read()
262-
.withConsumerConfigUpdates(ImmutableMap.of("allow.auto.create.topics", "false"))
263-
.withBootstrapServers(options.getKafkaBootstrapServerAddresses())
264-
.withTopics(ImmutableList.of(invalidTopic))
265-
.withKeyDeserializer(ByteArrayDeserializer.class)
266-
.withValueDeserializer(ByteArrayDeserializer.class));
267-
268-
PipelineResult readResult = sdfReadPipeline.run();
269-
PipelineResult.State readState =
270-
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
271-
272-
// call asynchronous deleteTopics first since cancelIfTimeouted is blocking.
273-
tearDownTopic(options.getKafkaTopic());
274-
cancelIfTimeouted(readResult, readState);
275-
}
276-
277-
@Test
278-
public void testKafkaIOFailsFastWithInvalidTopicsAndDynamicRead() throws IOException {
279-
// This test will fail on versions before 2.3.0 due to the non-existence of the
280-
// allow.auto.create.topics
281-
// flag. This can be removed when/if support for this older version is dropped.
282-
String actualVer = AppInfoParser.getVersion();
283-
assumeFalse(actualVer.compareTo("2.0.0") >= 0 && actualVer.compareTo("2.3.0") < 0);
284-
285-
thrown.expect(Pipeline.PipelineExecutionException.class);
286-
thrown.expectMessage(
287-
"Could not find any partitions info for topic invalid_topic. Please check Kafka configuration"
288-
+ " and make sure that provided topics exist.");
289-
290-
// Use streaming pipeline to read Kafka records.
291-
sdfReadPipeline.getOptions().as(Options.class).setStreaming(true);
292-
String invalidTopic = "invalid_topic";
293-
sdfReadPipeline.apply(
294-
"Read from unbounded Kafka",
295-
KafkaIO.<byte[], byte[]>read()
296-
.withConsumerConfigUpdates(ImmutableMap.of("allow.auto.create.topics", "false"))
297-
.withBootstrapServers(options.getKafkaBootstrapServerAddresses())
298-
.withTopics(ImmutableList.of(invalidTopic))
299-
.withDynamicRead(Duration.standardSeconds(5))
300-
.withKeyDeserializer(ByteArrayDeserializer.class)
301-
.withValueDeserializer(ByteArrayDeserializer.class));
302-
303-
PipelineResult readResult = sdfReadPipeline.run();
304-
PipelineResult.State readState =
305-
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
306-
307-
// call asynchronous deleteTopics first since cancelIfTimeouted is blocking.
308-
tearDownTopic(options.getKafkaTopic());
309-
cancelIfTimeouted(readResult, readState);
310-
}
311-
312214
@Test
313215
public void testKafkaIOReadsAndWritesCorrectlyInStreaming() throws IOException {
314216
// Use batch pipeline to write records.

0 commit comments

Comments
 (0)