-
Notifications
You must be signed in to change notification settings - Fork 315
Livy:337 Binding RPCServer to user provided port and not random port #334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 22 commits
bc8f2ac
968f584
8e3c0ef
a8c53b8
94d23e5
994ac16
238d238
951739c
dbaf50a
4a7e219
5ac6512
02c51b3
a2938d5
e6524d4
a6ea902
789ae88
edbf6c9
2c4189c
2b91398
a8e29d1
e59e9a7
59683d3
57b6c51
a22222e
3f58233
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,8 +51,10 @@ public static enum Entry implements ConfEntry { | |
|
||
// Address for the RSC driver to connect back with it's connection info. | ||
LAUNCHER_ADDRESS("launcher.address", null), | ||
LAUNCHER_PORT_RANGE("launcher.port.range", "10000~10010"), | ||
// Setting up of this propety by user has no benefit. It is currently being used | ||
// to pass port information from ContextLauncher to RSCDriver | ||
LAUNCHER_PORT("launcher.port", -1), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zjffdu And RSCDriver takes that information So this property still needed. Please let me if its ok . Other comments are indentation related , I'll fix them and push . |
||
|
||
// How long will the RSC wait for a connection for a Livy server before shutting itself down. | ||
SERVER_IDLE_TIMEOUT("server.idle-timeout", "10m"), | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,10 @@ | |
|
||
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.net.BindException; | ||
import java.net.InetSocketAddress; | ||
import java.net.ServerSocket; | ||
import java.net.SocketException; | ||
import java.security.SecureRandom; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentMap; | ||
|
@@ -61,18 +64,78 @@ public class RpcServer implements Closeable { | |
private static final SecureRandom RND = new SecureRandom(); | ||
|
||
private final String address; | ||
private final Channel channel; | ||
private Channel channel; | ||
private final EventLoopGroup group; | ||
private final int port; | ||
private final ConcurrentMap<String, ClientInfo> pendingClients; | ||
private final RSCConf config; | ||
|
||
private final String portRange; | ||
private static enum PortRangeSchema{START_PORT, END_PORT, MAX}; | ||
private final String PORT_DELIMITER = "~"; | ||
/** | ||
* Creating RPC Server | ||
* @param lconf | ||
* @throws IOException | ||
* @throws InterruptedException | ||
*/ | ||
public RpcServer(RSCConf lconf) throws IOException, InterruptedException { | ||
this.config = lconf; | ||
this.portRange = config.get(LAUNCHER_PORT_RANGE); | ||
this.group = new NioEventLoopGroup( | ||
this.config.getInt(RPC_MAX_THREADS), | ||
Utils.newDaemonThreadFactory("RPC-Handler-%d")); | ||
this.channel = new ServerBootstrap() | ||
this.config.getInt(RPC_MAX_THREADS), | ||
Utils.newDaemonThreadFactory("RPC-Handler-%d")); | ||
int [] portData = getPortNumberAndRange(); | ||
int startingPortNumber = portData[PortRangeSchema.START_PORT.ordinal()]; | ||
int endPort = portData[PortRangeSchema.END_PORT.ordinal()]; | ||
boolean isContected = false; | ||
for(int tries = startingPortNumber ; tries<=endPort ; tries++){ | ||
try { | ||
this.channel = getChannel(tries); | ||
isContected = true; | ||
break; | ||
} catch(SocketException e){ | ||
LOG.debug("RPC not able to connect port " + tries + " " + e.getMessage()); | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would happen when there is no available port in this range There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have Changed the code , to handle the case if no available port . It will throws exception and fails gracefully. Please let me know , if that ok . There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zjffdu Working on this . There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zjffdu done. Graceful failure if not port available (as would have happened if the random port is not available) |
||
if(!isContected) { | ||
throw new IOException("Unable to connect to provided ports " + this.portRange); | ||
} | ||
this.port = ((InetSocketAddress) channel.localAddress()).getPort(); | ||
this.pendingClients = new ConcurrentHashMap<>(); | ||
LOG.info("Connected to the port " + this.port); | ||
String address = config.get(RPC_SERVER_ADDRESS); | ||
if (address == null) { | ||
address = config.findLocalAddress(); | ||
} | ||
this.address = address; | ||
} | ||
|
||
/** | ||
* Get Port Numbers | ||
*/ | ||
private int[] getPortNumberAndRange() throws ArrayIndexOutOfBoundsException, | ||
NumberFormatException { | ||
String[] split = this.portRange.split(PORT_DELIMITER); | ||
int [] portRange = new int [PortRangeSchema.MAX.ordinal()]; | ||
try { | ||
portRange[PortRangeSchema.START_PORT.ordinal()] = | ||
Integer.parseInt(split[PortRangeSchema.START_PORT.ordinal()]); | ||
portRange[PortRangeSchema.END_PORT.ordinal()] = | ||
Integer.parseInt(split[PortRangeSchema.END_PORT.ordinal()]); | ||
} catch(ArrayIndexOutOfBoundsException e) { | ||
LOG.error("Port Range format is not correct " + this.portRange); | ||
throw e; | ||
} catch(NumberFormatException e) { | ||
LOG.error("Port are not in numeric format " + this.portRange); | ||
throw e; | ||
} | ||
return portRange; | ||
} | ||
/** | ||
* @throws InterruptedException | ||
**/ | ||
private Channel getChannel(int portNumber) throws BindException, InterruptedException { | ||
Channel channel = new ServerBootstrap() | ||
.group(group) | ||
.channel(NioServerSocketChannel.class) | ||
.childHandler(new ChannelInitializer<SocketChannel>() { | ||
|
@@ -97,19 +160,11 @@ public void run() { | |
.option(ChannelOption.SO_BACKLOG, 1) | ||
.option(ChannelOption.SO_REUSEADDR, true) | ||
.childOption(ChannelOption.SO_KEEPALIVE, true) | ||
.bind(0) | ||
.bind(portNumber) | ||
.sync() | ||
.channel(); | ||
this.port = ((InetSocketAddress) channel.localAddress()).getPort(); | ||
this.pendingClients = new ConcurrentHashMap<>(); | ||
|
||
String address = config.get(RPC_SERVER_ADDRESS); | ||
if (address == null) { | ||
address = config.findLocalAddress(); | ||
} | ||
this.address = address; | ||
return channel; | ||
} | ||
|
||
/** | ||
* Tells the RPC server to expect connections from clients. | ||
* | ||
|
@@ -310,3 +365,4 @@ private ClientInfo(String id, String secret, ClientCallback callback) { | |
} | ||
|
||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,8 @@ | |
package com.cloudera.livy.rsc.rpc; | ||
|
||
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.net.SocketException; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.concurrent.CountDownLatch; | ||
|
@@ -186,6 +188,25 @@ public void testEncryption() throws Exception { | |
assertEquals(outbound.message, reply.message); | ||
} | ||
|
||
@Test | ||
public void testPortRange() throws Exception { | ||
String portRangeData = emptyConfig.get(LAUNCHER_PORT_RANGE); | ||
String portRange[] = portRangeData.split("~"); | ||
int startPort = Integer.parseInt(portRange[0]); | ||
int endPort = Integer.parseInt(portRange[1]); | ||
for (int index = startPort; index <= endPort; index++) { | ||
RpcServer server = autoClose(new RpcServer(emptyConfig)); | ||
assertTrue(startPort <= server.getPort() && server.getPort() <= endPort); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems you are using the default value of LAUNCHER_PORT_RANGE, don't understand why using a for loop here ? Basically I think we need to add 2 test cases. One is specifying a wrong value of LAUNCHER_PORT_RANGE and verify the correct Exception is caught, another is specifying a correct value of LAUNCHER_PORT_RANGE, and the port of server is in this range. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zjffdu Thanks for your suggestion . Working on it , will push in an hour. :) |
||
} | ||
try { | ||
autoClose(new RpcServer(emptyConfig)); | ||
} catch (IOException ee) { | ||
assertTrue(ee instanceof IOException); | ||
assertEquals(ee.getMessage(), "Unable to connect to provided ports " | ||
+ portRangeData); | ||
} | ||
} | ||
|
||
private void transfer(Rpc serverRpc, Rpc clientRpc) { | ||
EmbeddedChannel client = (EmbeddedChannel) clientRpc.getChannel(); | ||
EmbeddedChannel server = (EmbeddedChannel) serverRpc.getChannel(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add one comment for this configuration, especially about whether start/end port is inclusive and exclusive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zjffdu
done