Skip to content

Commit 9ae24d0

Browse files
pralabhkumarzjffdu
authored andcommitted
Livy:337 Binding RPCServer to user provided port and not random port (#334)
* Code changes in RPCserver for user provided port * Indentation Changes * Indentation Changes * Indentation Changes * Indentation Changes * Configuring Port Range * Documentation Changed * launcher.port.range will take care of launching RPC * Checkstyle changes * Checkstyle changes * Dummy push * Code changes * Changed BindException Handling to SocketException Handling * Changed Import Order * Code changes to increase port range * Set Port isConntect to true * Indentation Changes & port range in livy-client.conf.template * Indentation changes * Changed visibilty of method private * Indentation Changes * Indenetation Changes * Unit test case to test port range * Checkstyle changes * Unit test case for port range * Added comment for Port Range Configuration and increase port range for unit test case
1 parent 02eef9a commit 9ae24d0

File tree

4 files changed

+104
-18
lines changed

4 files changed

+104
-18
lines changed

Diff for: conf/livy-client.conf.template

+3-2
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@
5555

5656
# Address for the RSC driver to connect back with it's connection info.
5757
# livy.rsc.launcher.address =
58-
# livy.rsc.launcher.port = -1
58+
# Port Range on which RPC will launch . Port range in inclusive of start and end port .
59+
# livy.rsc.launcher.port.range = 10000~10110
5960

6061
# How long will the RSC wait for a connection for a Livy server before shutting itself down.
6162
# livy.rsc.server.idle-timeout = 10m
@@ -83,4 +84,4 @@
8384
# livy.rsc.job-cancel.timeout = 30s
8485

8586
# Number of statements kept in driver's memory
86-
# livy.rsc.retained-statements = 100
87+
# livy.rsc.retained-statements = 100

Diff for: rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@ public static enum Entry implements ConfEntry {
5151

5252
// Address for the RSC driver to connect back with it's connection info.
5353
LAUNCHER_ADDRESS("launcher.address", null),
54+
LAUNCHER_PORT_RANGE("launcher.port.range", "10000~10010"),
55+
// Setting up of this propety by user has no benefit. It is currently being used
56+
// to pass port information from ContextLauncher to RSCDriver
5457
LAUNCHER_PORT("launcher.port", -1),
55-
5658
// How long will the RSC wait for a connection for a Livy server before shutting itself down.
5759
SERVER_IDLE_TIMEOUT("server.idle-timeout", "10m"),
5860

Diff for: rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcServer.java

+71-15
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919

2020
import java.io.Closeable;
2121
import java.io.IOException;
22+
import java.net.BindException;
2223
import java.net.InetSocketAddress;
24+
import java.net.ServerSocket;
25+
import java.net.SocketException;
2326
import java.security.SecureRandom;
2427
import java.util.concurrent.ConcurrentHashMap;
2528
import java.util.concurrent.ConcurrentMap;
@@ -61,18 +64,78 @@ public class RpcServer implements Closeable {
6164
private static final SecureRandom RND = new SecureRandom();
6265

6366
private final String address;
64-
private final Channel channel;
67+
private Channel channel;
6568
private final EventLoopGroup group;
6669
private final int port;
6770
private final ConcurrentMap<String, ClientInfo> pendingClients;
6871
private final RSCConf config;
69-
72+
private final String portRange;
73+
private static enum PortRangeSchema{START_PORT, END_PORT, MAX};
74+
private final String PORT_DELIMITER = "~";
75+
/**
76+
* Creating RPC Server
77+
* @param lconf
78+
* @throws IOException
79+
* @throws InterruptedException
80+
*/
7081
public RpcServer(RSCConf lconf) throws IOException, InterruptedException {
7182
this.config = lconf;
83+
this.portRange = config.get(LAUNCHER_PORT_RANGE);
7284
this.group = new NioEventLoopGroup(
73-
this.config.getInt(RPC_MAX_THREADS),
74-
Utils.newDaemonThreadFactory("RPC-Handler-%d"));
75-
this.channel = new ServerBootstrap()
85+
this.config.getInt(RPC_MAX_THREADS),
86+
Utils.newDaemonThreadFactory("RPC-Handler-%d"));
87+
int [] portData = getPortNumberAndRange();
88+
int startingPortNumber = portData[PortRangeSchema.START_PORT.ordinal()];
89+
int endPort = portData[PortRangeSchema.END_PORT.ordinal()];
90+
boolean isContected = false;
91+
for(int tries = startingPortNumber ; tries<=endPort ; tries++){
92+
try {
93+
this.channel = getChannel(tries);
94+
isContected = true;
95+
break;
96+
} catch(SocketException e){
97+
LOG.debug("RPC not able to connect port " + tries + " " + e.getMessage());
98+
}
99+
}
100+
if(!isContected) {
101+
throw new IOException("Unable to connect to provided ports " + this.portRange);
102+
}
103+
this.port = ((InetSocketAddress) channel.localAddress()).getPort();
104+
this.pendingClients = new ConcurrentHashMap<>();
105+
LOG.info("Connected to the port " + this.port);
106+
String address = config.get(RPC_SERVER_ADDRESS);
107+
if (address == null) {
108+
address = config.findLocalAddress();
109+
}
110+
this.address = address;
111+
}
112+
113+
/**
114+
* Get Port Numbers
115+
*/
116+
private int[] getPortNumberAndRange() throws ArrayIndexOutOfBoundsException,
117+
NumberFormatException {
118+
String[] split = this.portRange.split(PORT_DELIMITER);
119+
int [] portRange = new int [PortRangeSchema.MAX.ordinal()];
120+
try {
121+
portRange[PortRangeSchema.START_PORT.ordinal()] =
122+
Integer.parseInt(split[PortRangeSchema.START_PORT.ordinal()]);
123+
portRange[PortRangeSchema.END_PORT.ordinal()] =
124+
Integer.parseInt(split[PortRangeSchema.END_PORT.ordinal()]);
125+
} catch(ArrayIndexOutOfBoundsException e) {
126+
LOG.error("Port Range format is not correct " + this.portRange);
127+
throw e;
128+
} catch(NumberFormatException e) {
129+
LOG.error("Port are not in numeric format " + this.portRange);
130+
throw e;
131+
}
132+
return portRange;
133+
}
134+
/**
135+
* @throws InterruptedException
136+
**/
137+
private Channel getChannel(int portNumber) throws BindException, InterruptedException {
138+
Channel channel = new ServerBootstrap()
76139
.group(group)
77140
.channel(NioServerSocketChannel.class)
78141
.childHandler(new ChannelInitializer<SocketChannel>() {
@@ -97,19 +160,11 @@ public void run() {
97160
.option(ChannelOption.SO_BACKLOG, 1)
98161
.option(ChannelOption.SO_REUSEADDR, true)
99162
.childOption(ChannelOption.SO_KEEPALIVE, true)
100-
.bind(0)
163+
.bind(portNumber)
101164
.sync()
102165
.channel();
103-
this.port = ((InetSocketAddress) channel.localAddress()).getPort();
104-
this.pendingClients = new ConcurrentHashMap<>();
105-
106-
String address = config.get(RPC_SERVER_ADDRESS);
107-
if (address == null) {
108-
address = config.findLocalAddress();
109-
}
110-
this.address = address;
166+
return channel;
111167
}
112-
113168
/**
114169
* Tells the RPC server to expect connections from clients.
115170
*
@@ -310,3 +365,4 @@ private ClientInfo(String id, String secret, ClientCallback callback) {
310365
}
311366

312367
}
368+

Diff for: rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java

+27
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package com.cloudera.livy.rsc.rpc;
1919

2020
import java.io.Closeable;
21+
import java.io.IOException;
22+
import java.net.SocketException;
2123
import java.util.ArrayList;
2224
import java.util.Collection;
2325
import java.util.concurrent.CountDownLatch;
@@ -186,6 +188,31 @@ public void testEncryption() throws Exception {
186188
assertEquals(outbound.message, reply.message);
187189
}
188190

191+
@Test
192+
public void testPortRange() throws Exception {
193+
String portRange = "a~b";
194+
emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
195+
try {
196+
autoClose(new RpcServer(emptyConfig));
197+
} catch (Exception ee) {
198+
assertTrue(ee instanceof NumberFormatException);
199+
}
200+
portRange = "11000";
201+
emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
202+
try {
203+
autoClose(new RpcServer(emptyConfig));
204+
} catch (Exception ee) {
205+
assertTrue(ee instanceof ArrayIndexOutOfBoundsException);
206+
}
207+
portRange = "11000~11110";
208+
emptyConfig.set(LAUNCHER_PORT_RANGE, portRange);
209+
String [] portRangeData = portRange.split("~");
210+
int startPort = Integer.parseInt(portRangeData[0]);
211+
int endPort = Integer.parseInt(portRangeData[1]);
212+
RpcServer server = autoClose(new RpcServer(emptyConfig));
213+
assertTrue(startPort <= server.getPort() && server.getPort() <= endPort);
214+
}
215+
189216
private void transfer(Rpc serverRpc, Rpc clientRpc) {
190217
EmbeddedChannel client = (EmbeddedChannel) clientRpc.getChannel();
191218
EmbeddedChannel server = (EmbeddedChannel) serverRpc.getChannel();

0 commit comments

Comments
 (0)