Skip to content

Commit 1482804

Browse files
authored
[apache#1668] fix: Replace assertions with exception-throwing codes (apache#1804)
### What changes were proposed in this pull request? Replace assertions with exception-throwing codes to make them work in production. ### Why are the changes needed? For: apache#1668. This will make the exception checks work and make the code more reliable. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs.
1 parent ceae615 commit 1482804

File tree

20 files changed

+97
-28
lines changed

20 files changed

+97
-28
lines changed

client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,9 @@ public void acceptMapCompletionEvents() throws IOException {
163163
LOG.debug("Got " + events.length + " map completion events from " + fromEventIdx);
164164
}
165165

166-
assert !update.shouldReset() : "Unexpected legacy state";
166+
if (update.shouldReset()) {
167+
throw new RssException("Unexpected legacy state");
168+
}
167169

168170
// Update the last seen event ID
169171
fromEventIdx += events.length;

client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,10 @@ private void checkSentBlockCount() {
321321
long expected = blockIds.size();
322322
long bufferManagerTracked = bufferManager.getBlockCount();
323323

324-
assert serverToPartitionToBlockIds != null;
324+
if (serverToPartitionToBlockIds == null) {
325+
throw new RssException("serverToPartitionToBlockIds should not be null");
326+
}
327+
325328
// to filter the multiple replica's duplicate blockIds
326329
Set<Long> blockIds = new HashSet<>();
327330
for (Map<Integer, Set<Long>> partitionBlockIds : serverToPartitionToBlockIds.values()) {

client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,10 @@ private void checkSentBlockCount() {
368368
long expected = blockIds.size();
369369
long bufferManagerTracked = bufferManager.getBlockCount();
370370

371-
assert serverToPartitionToBlockIds != null;
371+
if (serverToPartitionToBlockIds == null) {
372+
throw new RssException("serverToPartitionToBlockIds should not be null");
373+
}
374+
372375
// to filter the multiple replica's duplicate blockIds
373376
Set<Long> blockIds = new HashSet<>();
374377
for (Map<Integer, Set<Long>> partitionBlockIds : serverToPartitionToBlockIds.values()) {

client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,10 @@ private static void parseRssWorkerFromHostInfo(
362362
new ShuffleServerInfo(info[0].split(":")[0], Integer.parseInt(info[0].split(":")[1]));
363363

364364
String[] partitions = info[1].split("_");
365-
assert (partitions.length > 0);
365+
366+
if (partitions.length <= 0) {
367+
throw new RssException("The length of partitions should not be less than 0");
368+
}
366369
for (String partitionId : partitions) {
367370
rssWorker.computeIfAbsent(Integer.parseInt(partitionId), k -> new HashSet<>());
368371
rssWorker.get(Integer.parseInt(partitionId)).add(serverInfo);

client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import org.slf4j.LoggerFactory;
100100

101101
import org.apache.uniffle.common.ShuffleServerInfo;
102+
import org.apache.uniffle.common.exception.RssException;
102103
import org.apache.uniffle.common.util.JavaUtils;
103104

104105
// This only knows how to deal with a single srcIndex for a given targetIndex.
@@ -1109,7 +1110,9 @@ private void registerCompletedInputForPipelinedShuffle(
11091110
shuffleInfoEventsMap.put(inputIdentifier, eventInfo);
11101111
}
11111112

1112-
assert (eventInfo != null);
1113+
if (eventInfo == null) {
1114+
throw new RssException("eventInfo should not be null");
1115+
}
11131116
eventInfo.spillProcessed(srcAttemptIdentifier.getSpillEventId());
11141117
numFetchedSpills.getAndIncrement();
11151118

client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Objects;
2525
import java.util.Set;
2626

27+
import org.apache.commons.collections4.CollectionUtils;
2728
import org.apache.hadoop.conf.Configuration;
2829
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
2930
import org.apache.tez.common.CallableWithNdc;
@@ -44,6 +45,7 @@
4445
import org.apache.uniffle.client.factory.ShuffleClientFactory;
4546
import org.apache.uniffle.common.RemoteStorageInfo;
4647
import org.apache.uniffle.common.ShuffleServerInfo;
48+
import org.apache.uniffle.common.exception.RssException;
4749
import org.apache.uniffle.common.util.UnitConverter;
4850

4951
public class RssTezFetcherTask extends CallableWithNdc<FetchResult> {
@@ -89,7 +91,9 @@ public RssTezFetcherTask(
8991
Map<Integer, Roaring64NavigableMap> rssSuccessBlockIdBitmapMap,
9092
int numPhysicalInputs,
9193
int partitionNum) {
92-
assert (inputs != null && inputs.size() > 0);
94+
if (CollectionUtils.isEmpty(inputs)) {
95+
throw new RssException("inputs should not be empty");
96+
}
9397
this.fetcherCallback = fetcherCallback;
9498
this.inputContext = inputContext;
9599
this.conf = conf;

client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -804,7 +804,9 @@ public synchronized void copySucceeded(
804804
pipelinedShuffleInfoEventsMap.put(inputIdentifier, eventInfo);
805805
}
806806

807-
assert (eventInfo != null);
807+
if (eventInfo == null) {
808+
throw new RssException("eventInfo should not be null");
809+
}
808810
eventInfo.spillProcessed(srcAttemptIdentifier.getSpillEventId());
809811
numFetchedSpills++;
810812

client-tez/src/main/java/org/apache/tez/runtime/library/input/RssOrderedGroupedKVInput.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,12 @@ public synchronized List<Event> initialize() throws IOException {
136136
TezDAGID tezDAGID = tezVertexID.getDAGId();
137137
int sourceVertexId = this.conf.getInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, -1);
138138
int destinationVertexId = this.conf.getInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, -1);
139-
assert sourceVertexId != -1;
140-
assert destinationVertexId != -1;
139+
if (sourceVertexId == -1) {
140+
throw new RssException("sourceVertexId should not be -1");
141+
}
142+
if (destinationVertexId == -1) {
143+
throw new RssException("destinationVertexId should not be -1");
144+
}
141145
this.shuffleId =
142146
RssTezUtils.computeShuffleId(tezDAGID.getId(), sourceVertexId, destinationVertexId);
143147
this.applicationAttemptId =

client-tez/src/main/java/org/apache/tez/runtime/library/input/RssUnorderedKVInput.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,12 @@ public synchronized List<Event> initialize() throws Exception {
130130
TezDAGID tezDAGID = tezVertexID.getDAGId();
131131
int sourceVertexId = this.conf.getInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, -1);
132132
int destinationVertexId = this.conf.getInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, -1);
133-
assert sourceVertexId != -1;
134-
assert destinationVertexId != -1;
133+
if (sourceVertexId == -1) {
134+
throw new RssException("sourceVertexId should not be -1");
135+
}
136+
if (destinationVertexId == -1) {
137+
throw new RssException("destinationVertexId should not be -1");
138+
}
135139
this.shuffleId =
136140
RssTezUtils.computeShuffleId(tezDAGID.getId(), sourceVertexId, destinationVertexId);
137141
this.applicationAttemptId =

client-tez/src/main/java/org/apache/tez/runtime/library/output/RssOrderedPartitionedKVOutput.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.slf4j.LoggerFactory;
6969

7070
import org.apache.uniffle.common.ShuffleServerInfo;
71+
import org.apache.uniffle.common.exception.RssException;
7172

7273
import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
7374
import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
@@ -166,8 +167,12 @@ public TezRemoteShuffleUmbilicalProtocol run() throws Exception {
166167
TezDAGID tezDAGID = tezVertexID.getDAGId();
167168
int sourceVertexId = this.conf.getInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, -1);
168169
int destinationVertexId = this.conf.getInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, -1);
169-
assert sourceVertexId != -1;
170-
assert destinationVertexId != -1;
170+
if (sourceVertexId == -1) {
171+
throw new RssException("sourceVertexId should not be -1");
172+
}
173+
if (destinationVertexId == -1) {
174+
throw new RssException("destinationVertexId should not be -1");
175+
}
171176
this.shuffleId =
172177
RssTezUtils.computeShuffleId(tezDAGID.getId(), sourceVertexId, destinationVertexId);
173178
GetShuffleServerRequest request =

client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedKVOutput.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.slf4j.LoggerFactory;
6969

7070
import org.apache.uniffle.common.ShuffleServerInfo;
71+
import org.apache.uniffle.common.exception.RssException;
7172

7273
import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
7374
import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
@@ -166,8 +167,12 @@ public TezRemoteShuffleUmbilicalProtocol run() throws Exception {
166167
TezDAGID tezDAGID = tezVertexID.getDAGId();
167168
int sourceVertexId = this.conf.getInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, -1);
168169
int destinationVertexId = this.conf.getInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, -1);
169-
assert sourceVertexId != -1;
170-
assert destinationVertexId != -1;
170+
if (sourceVertexId == -1) {
171+
throw new RssException("sourceVertexId should not be -1");
172+
}
173+
if (destinationVertexId == -1) {
174+
throw new RssException("destinationVertexId should not be -1");
175+
}
171176
this.shuffleId =
172177
RssTezUtils.computeShuffleId(tezDAGID.getId(), sourceVertexId, destinationVertexId);
173178
this.applicationAttemptId =

client-tez/src/main/java/org/apache/tez/runtime/library/output/RssUnorderedPartitionedKVOutput.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.slf4j.LoggerFactory;
6969

7070
import org.apache.uniffle.common.ShuffleServerInfo;
71+
import org.apache.uniffle.common.exception.RssException;
7172

7273
import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
7374
import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
@@ -164,8 +165,12 @@ public TezRemoteShuffleUmbilicalProtocol run() throws Exception {
164165
TezDAGID tezDAGID = tezVertexID.getDAGId();
165166
int sourceVertexId = this.conf.getInt(RSS_SHUFFLE_SOURCE_VERTEX_ID, -1);
166167
int destinationVertexId = this.conf.getInt(RSS_SHUFFLE_DESTINATION_VERTEX_ID, -1);
167-
assert sourceVertexId != -1;
168-
assert destinationVertexId != -1;
168+
if (sourceVertexId == -1) {
169+
throw new RssException("sourceVertexId should not be -1");
170+
}
171+
if (destinationVertexId == -1) {
172+
throw new RssException("destinationVertexId should not be -1");
173+
}
169174
this.shuffleId =
170175
RssTezUtils.computeShuffleId(tezDAGID.getId(), sourceVertexId, destinationVertexId);
171176
this.applicationAttemptId =

common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;
2828

29+
import org.apache.uniffle.common.exception.RssException;
2930
import org.apache.uniffle.common.netty.protocol.Message;
3031
import org.apache.uniffle.common.netty.protocol.MessageWithHeader;
3132
import org.apache.uniffle.common.netty.protocol.RpcResponse;
@@ -84,7 +85,9 @@ public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) thro
8485
msgType.encode(header);
8586
header.writeInt(bodyLength);
8687
in.encode(header);
87-
assert header.writableBytes() == 0;
88+
if (header.writableBytes() != 0) {
89+
throw new RssException("header's writable bytes should be 0");
90+
}
8891

8992
if (body != null) {
9093
// We transfer ownership of the reference on in.body() to MessageWithHeader.

common/src/main/java/org/apache/uniffle/common/netty/TransportFrameDecoder.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.netty.channel.ChannelHandlerContext;
2929
import io.netty.channel.ChannelInboundHandlerAdapter;
3030

31+
import org.apache.uniffle.common.exception.RssException;
3132
import org.apache.uniffle.common.netty.protocol.Message;
3233

3334
/**
@@ -160,7 +161,9 @@ private ByteBuf decodeNext() {
160161
remaining -= next.readableBytes();
161162
frame.addComponent(next).writerIndex(frame.writerIndex() + next.readableBytes());
162163
}
163-
assert remaining == 0;
164+
if (remaining != 0) {
165+
throw new RssException("The remaining should be 0");
166+
}
164167
return frame;
165168
}
166169

common/src/main/java/org/apache/uniffle/common/netty/client/TransportClientFactory.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.slf4j.Logger;
3939
import org.slf4j.LoggerFactory;
4040

41+
import org.apache.uniffle.common.exception.RssException;
4142
import org.apache.uniffle.common.netty.IOMode;
4243
import org.apache.uniffle.common.netty.TransportFrameDecoder;
4344
import org.apache.uniffle.common.netty.handle.TransportChannelHandler;
@@ -233,7 +234,9 @@ public void initChannel(SocketChannel ch) {
233234
}
234235

235236
TransportClient client = clientRef.get();
236-
assert client != null : "Channel future completed successfully with null client";
237+
if (client == null) {
238+
throw new RssException("Channel future completed unsuccessfully with null client");
239+
}
237240

238241
if (logger.isDebugEnabled()) {
239242
logger.debug("Connection to {} successful", address);

common/src/main/java/org/apache/uniffle/common/netty/protocol/Message.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import io.netty.buffer.ByteBuf;
2121

22+
import org.apache.uniffle.common.exception.RssException;
2223
import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
2324

2425
public abstract class Message implements Encodable {
@@ -66,7 +67,9 @@ public enum Type implements Encodable {
6667
private final byte id;
6768

6869
Type(int id) {
69-
assert id < 128 : "Cannot have more than 128 message types";
70+
if (id >= 128) {
71+
throw new RssException("Cannot have more than 128 message types");
72+
}
7073
this.id = (byte) id;
7174
}
7275

common/src/main/java/org/apache/uniffle/common/storage/StorageMedia.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.uniffle.common.storage;
1919

20+
import org.apache.uniffle.common.exception.RssException;
2021
import org.apache.uniffle.proto.RssProtos.StorageInfo;
2122

2223
public enum StorageMedia {
@@ -29,7 +30,9 @@ public enum StorageMedia {
2930
private final byte val;
3031

3132
StorageMedia(int code) {
32-
assert (code >= -1 && code < 256);
33+
if (code < -1 || code >= 256) {
34+
throw new RssException("The code should be within [-1, 256)");
35+
}
3336
this.val = (byte) code;
3437
}
3538

common/src/main/java/org/apache/uniffle/common/storage/StorageStatus.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.uniffle.common.storage;
1919

20+
import org.apache.uniffle.common.exception.RssException;
2021
import org.apache.uniffle.proto.RssProtos.StorageInfo;
2122

2223
public enum StorageStatus {
@@ -28,7 +29,9 @@ public enum StorageStatus {
2829
private final byte val;
2930

3031
StorageStatus(int code) {
31-
assert (code >= -1 && code < 256);
32+
if (code < -1 || code >= 256) {
33+
throw new RssException("The code should be within [-1, 256)");
34+
}
3235
this.val = (byte) code;
3336
}
3437

coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
import org.apache.uniffle.common.Application;
4141
import org.apache.uniffle.common.ServerStatus;
42+
import org.apache.uniffle.common.exception.RssException;
4243
import org.apache.uniffle.common.web.resource.BaseResource;
4344
import org.apache.uniffle.common.web.resource.Response;
4445
import org.apache.uniffle.coordinator.ApplicationManager;
@@ -100,8 +101,9 @@ public Response<List<ServerNode>> nodes(@QueryParam("status") String status) {
100101
public Response<Object> cancelDecommission(CancelDecommissionRequest params) {
101102
return execute(
102103
() -> {
103-
assert CollectionUtils.isNotEmpty(params.getServerIds())
104-
: "Parameter[serverIds] should not be null!";
104+
if (CollectionUtils.isEmpty(params.getServerIds())) {
105+
throw new RssException("Parameter[serverIds] should not be empty!");
106+
}
105107
params.getServerIds().forEach(getClusterManager()::cancelDecommission);
106108
return null;
107109
});
@@ -122,8 +124,9 @@ public Response<Object> cancelDecommission(@PathParam("id") String serverId) {
122124
public Response<Object> decommission(DecommissionRequest params) {
123125
return execute(
124126
() -> {
125-
assert CollectionUtils.isNotEmpty(params.getServerIds())
126-
: "Parameter[serverIds] should not be null!";
127+
if (CollectionUtils.isEmpty(params.getServerIds())) {
128+
throw new RssException("Parameter[serverIds] should not be empty!");
129+
}
127130
params.getServerIds().forEach(getClusterManager()::decommission);
128131
return null;
129132
});

server/src/main/java/org/apache/uniffle/server/ShuffleServer.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,12 @@ private void initialization() throws Exception {
284284
nettyServerEnabled =
285285
shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE) == ServerType.GRPC_NETTY;
286286
if (nettyServerEnabled) {
287-
assert nettyPort >= 0;
287+
if (nettyPort < 0) {
288+
throw new RssException(
289+
String.format(
290+
"%s must be set during startup when using GRPC_NETTY",
291+
ShuffleServerConf.NETTY_SERVER_PORT.key()));
292+
}
288293
streamServer = new StreamServer(this);
289294
}
290295

0 commit comments

Comments
 (0)