-
Notifications
You must be signed in to change notification settings - Fork 315
Livy:337 Binding RPCServer to user provided port and not random port #334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 18 commits
bc8f2ac
968f584
8e3c0ef
a8c53b8
94d23e5
994ac16
238d238
951739c
dbaf50a
4a7e219
5ac6512
02c51b3
a2938d5
e6524d4
a6ea902
789ae88
edbf6c9
2c4189c
2b91398
a8e29d1
e59e9a7
59683d3
57b6c51
a22222e
3f58233
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,8 +51,10 @@ public static enum Entry implements ConfEntry { | |
|
||
// Address for the RSC driver to connect back with it's connection info. | ||
LAUNCHER_ADDRESS("launcher.address", null), | ||
LAUNCHER_PORT_RANGE("launcher.port.range", "10000~10110"), | ||
// Setting up of this propety by user has no benefit. It is currently being used | ||
// to pass port information from ContextLauncher to RSCDriver | ||
LAUNCHER_PORT("launcher.port", -1), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zjffdu And RSCDriver takes that information So this property still needed. Please let me if its ok . Other comments are indentation related , I'll fix them and push . |
||
|
||
// How long will the RSC wait for a connection for a Livy server before shutting itself down. | ||
SERVER_IDLE_TIMEOUT("server.idle-timeout", "10m"), | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,10 @@ | |
|
||
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.net.BindException; | ||
import java.net.InetSocketAddress; | ||
import java.net.ServerSocket; | ||
import java.net.SocketException; | ||
import java.security.SecureRandom; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentMap; | ||
|
@@ -61,55 +64,107 @@ public class RpcServer implements Closeable { | |
private static final SecureRandom RND = new SecureRandom(); | ||
|
||
private final String address; | ||
private final Channel channel; | ||
private Channel channel; | ||
private final EventLoopGroup group; | ||
private final int port; | ||
private final ConcurrentMap<String, ClientInfo> pendingClients; | ||
private final RSCConf config; | ||
|
||
private final String portRange; | ||
private static enum PortRangeSchema{START_PORT, END_PORT, MAX}; | ||
private final String PORT_DELIMITER = "~"; | ||
/** | ||
* Creating RPC Server | ||
* @param lconf | ||
* @throws IOException | ||
* @throws InterruptedException | ||
*/ | ||
public RpcServer(RSCConf lconf) throws IOException, InterruptedException { | ||
this.config = lconf; | ||
this.portRange = config.get(LAUNCHER_PORT_RANGE); | ||
this.group = new NioEventLoopGroup( | ||
this.config.getInt(RPC_MAX_THREADS), | ||
Utils.newDaemonThreadFactory("RPC-Handler-%d")); | ||
this.channel = new ServerBootstrap() | ||
.group(group) | ||
.channel(NioServerSocketChannel.class) | ||
.childHandler(new ChannelInitializer<SocketChannel>() { | ||
@Override | ||
public void initChannel(SocketChannel ch) throws Exception { | ||
SaslServerHandler saslHandler = new SaslServerHandler(config); | ||
final Rpc newRpc = Rpc.createServer(saslHandler, config, ch, group); | ||
saslHandler.rpc = newRpc; | ||
|
||
Runnable cancelTask = new Runnable() { | ||
@Override | ||
public void run() { | ||
LOG.warn("Timed out waiting for hello from client."); | ||
newRpc.close(); | ||
} | ||
}; | ||
saslHandler.cancelTask = group.schedule(cancelTask, | ||
config.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), | ||
TimeUnit.MILLISECONDS); | ||
} | ||
}) | ||
.option(ChannelOption.SO_BACKLOG, 1) | ||
.option(ChannelOption.SO_REUSEADDR, true) | ||
.childOption(ChannelOption.SO_KEEPALIVE, true) | ||
.bind(0) | ||
.sync() | ||
.channel(); | ||
this.config.getInt(RPC_MAX_THREADS), | ||
Utils.newDaemonThreadFactory("RPC-Handler-%d")); | ||
int [] portData = getPortNumberAndRange(); | ||
int startingPortNumber = portData[PortRangeSchema.START_PORT.ordinal()]; | ||
int endPort = portData[PortRangeSchema.END_PORT.ordinal()]; | ||
boolean isContected = false; | ||
for(int tries = startingPortNumber ; tries<=endPort ; tries++){ | ||
try { | ||
this.channel = getChannel(tries); | ||
isContected = true; | ||
break; | ||
} catch(SocketException e){ | ||
LOG.debug("RPC not able to connect port " + tries + " " + e.getMessage()); | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would happen when there is no available port in this range There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have Changed the code , to handle the case if no available port . It will throws exception and fails gracefully. Please let me know , if that ok . There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zjffdu Working on this . There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zjffdu done. Graceful failure if not port available (as would have happened if the random port is not available) |
||
if(!isContected) { | ||
throw new IOException("Unable to connect to provided ports " + this.portRange); | ||
} | ||
this.port = ((InetSocketAddress) channel.localAddress()).getPort(); | ||
this.pendingClients = new ConcurrentHashMap<>(); | ||
|
||
LOG.info("Connected to the port " + this.port); | ||
String address = config.get(RPC_SERVER_ADDRESS); | ||
if (address == null) { | ||
address = config.findLocalAddress(); | ||
} | ||
this.address = address; | ||
} | ||
|
||
/** | ||
* Get Port Numbers | ||
*/ | ||
public int[] getPortNumberAndRange() throws ArrayIndexOutOfBoundsException, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. change to private |
||
NumberFormatException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation |
||
String[] split = this.portRange.split(PORT_DELIMITER); | ||
int [] portRange=new int [PortRangeSchema.MAX.ordinal()]; | ||
try { | ||
portRange[PortRangeSchema.START_PORT.ordinal()] = | ||
Integer.parseInt(split[PortRangeSchema.START_PORT.ordinal()]); | ||
portRange[PortRangeSchema.END_PORT.ordinal()] = | ||
Integer.parseInt(split[PortRangeSchema.END_PORT.ordinal()]); | ||
} catch(ArrayIndexOutOfBoundsException e) { | ||
LOG.error("Port Range format is not correct " + this.portRange); | ||
throw e; | ||
} catch(NumberFormatException e) { | ||
LOG.error("Port are not in numeric format " + this.portRange); | ||
throw e; | ||
} | ||
return portRange; | ||
} | ||
/** | ||
* @throws InterruptedException | ||
**/ | ||
public Channel getChannel(int portNumber) throws BindException, InterruptedException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. private |
||
Channel channel = new ServerBootstrap() | ||
.group(group) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation |
||
.channel(NioServerSocketChannel.class) | ||
.childHandler(new ChannelInitializer<SocketChannel>() { | ||
@Override | ||
public void initChannel(SocketChannel ch) throws Exception { | ||
SaslServerHandler saslHandler = new SaslServerHandler(config); | ||
final Rpc newRpc = Rpc.createServer(saslHandler, config, ch, group); | ||
saslHandler.rpc = newRpc; | ||
|
||
Runnable cancelTask = new Runnable() { | ||
@Override | ||
public void run() { | ||
LOG.warn("Timed out waiting for hello from client."); | ||
newRpc.close(); | ||
} | ||
}; | ||
saslHandler.cancelTask = group.schedule(cancelTask, | ||
config.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), | ||
TimeUnit.MILLISECONDS); | ||
} | ||
}) | ||
.option(ChannelOption.SO_BACKLOG, 1) | ||
.option(ChannelOption.SO_REUSEADDR, true) | ||
.childOption(ChannelOption.SO_KEEPALIVE, true) | ||
.bind(portNumber) | ||
.sync() | ||
.channel(); | ||
return channel; | ||
} | ||
/** | ||
* Tells the RPC server to expect connections from clients. | ||
* | ||
|
@@ -310,3 +365,4 @@ private ClientInfo(String id, String secret, ClientCallback callback) { | |
} | ||
|
||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add one comment for this configuration, especially about whether start/end port is inclusive and exclusive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zjffdu
done