-
Notifications
You must be signed in to change notification settings - Fork 315
Livy:337 Binding RPCServer to user provided port and not random port #334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 16 commits
bc8f2ac
968f584
8e3c0ef
a8c53b8
94d23e5
994ac16
238d238
951739c
dbaf50a
4a7e219
5ac6512
02c51b3
a2938d5
e6524d4
a6ea902
789ae88
edbf6c9
2c4189c
2b91398
a8e29d1
e59e9a7
59683d3
57b6c51
a22222e
3f58233
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,10 @@ | |
|
||
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.net.BindException; | ||
import java.net.InetSocketAddress; | ||
import java.net.ServerSocket; | ||
import java.net.SocketException; | ||
import java.security.SecureRandom; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentMap; | ||
|
@@ -61,18 +64,78 @@ public class RpcServer implements Closeable { | |
private static final SecureRandom RND = new SecureRandom(); | ||
|
||
private final String address; | ||
private final Channel channel; | ||
private Channel channel; | ||
private final EventLoopGroup group; | ||
private final int port; | ||
private final ConcurrentMap<String, ClientInfo> pendingClients; | ||
private final RSCConf config; | ||
|
||
private final String portRange; | ||
private static enum PortRangeSchema{START_PORT, END_PORT, max}; | ||
private final String PORT_DELIMITER = "~"; | ||
/** | ||
* Creating RPC Server | ||
* @param lconf | ||
* @throws IOException | ||
* @throws InterruptedException | ||
*/ | ||
public RpcServer(RSCConf lconf) throws IOException, InterruptedException { | ||
this.config = lconf; | ||
this.portRange = config.get(LAUNCHER_PORT_RANGE); | ||
this.group = new NioEventLoopGroup( | ||
this.config.getInt(RPC_MAX_THREADS), | ||
Utils.newDaemonThreadFactory("RPC-Handler-%d")); | ||
this.channel = new ServerBootstrap() | ||
this.config.getInt(RPC_MAX_THREADS), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. code style issue, should have indention here. Seems travis still didn't detect it. @jerryshao Do you have any clues ? |
||
Utils.newDaemonThreadFactory("RPC-Handler-%d")); | ||
int [] portData = getPortNumberAndRange(); | ||
int startingPortNumber = portData[PortRangeSchema.START_PORT.ordinal()]; | ||
int endPort = portData[PortRangeSchema.END_PORT.ordinal()]; | ||
boolean isContected = false; | ||
for(int tries = startingPortNumber ; tries<=endPort ; tries++){ | ||
try { | ||
this.channel = getChannel(tries); | ||
isContected = true; | ||
break; | ||
}catch(SocketException e){ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. while space before |
||
LOG.warn("RPC not able to connect port " + tries + " " + e.getMessage()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. change to debug level |
||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would happen when there is no available port in this range There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have Changed the code , to handle the case if no available port . It will throws exception and fails gracefully. Please let me know , if that ok . There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zjffdu Working on this . There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zjffdu done. Graceful failure if not port available (as would have happened if the random port is not available) |
||
if(!isContected) { | ||
throw new IOException("Unable to connect to provided ports " + this.portRange); | ||
} | ||
this.port = ((InetSocketAddress) channel.localAddress()).getPort(); | ||
this.pendingClients = new ConcurrentHashMap<>(); | ||
LOG.warn("Connected to the port " + this.port); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. change to info level |
||
String address = config.get(RPC_SERVER_ADDRESS); | ||
if (address == null) { | ||
address = config.findLocalAddress(); | ||
} | ||
this.address = address; | ||
} | ||
|
||
/** | ||
* Get Port Numbers | ||
*/ | ||
public int[] getPortNumberAndRange() throws ArrayIndexOutOfBoundsException, NumberFormatException{ | ||
String[] split = this.portRange.split(PORT_DELIMITER); | ||
int [] portRange=new int [PortRangeSchema.max.ordinal()]; | ||
try { | ||
portRange[PortRangeSchema.START_PORT.ordinal()] = | ||
Integer.parseInt(split[PortRangeSchema.START_PORT.ordinal()]); | ||
portRange[PortRangeSchema.END_PORT.ordinal()] = | ||
Integer.parseInt(split[PortRangeSchema.END_PORT.ordinal()]); | ||
}catch(ArrayIndexOutOfBoundsException e) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. code style issue |
||
LOG.error("Port Range format is not correct " + this.portRange); | ||
throw e; | ||
} | ||
catch(NumberFormatException e) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. code style issue |
||
LOG.error("Port are not in numeric format " + this.portRange); | ||
throw e; | ||
} | ||
return portRange; | ||
} | ||
/** | ||
* @throws InterruptedException | ||
**/ | ||
public Channel getChannel(int portNumber) throws BindException, InterruptedException{ | ||
Channel channel = new ServerBootstrap() | ||
.group(group) | ||
.channel(NioServerSocketChannel.class) | ||
.childHandler(new ChannelInitializer<SocketChannel>() { | ||
|
@@ -97,19 +160,11 @@ public void run() { | |
.option(ChannelOption.SO_BACKLOG, 1) | ||
.option(ChannelOption.SO_REUSEADDR, true) | ||
.childOption(ChannelOption.SO_KEEPALIVE, true) | ||
.bind(0) | ||
.bind(portNumber) | ||
.sync() | ||
.channel(); | ||
this.port = ((InetSocketAddress) channel.localAddress()).getPort(); | ||
this.pendingClients = new ConcurrentHashMap<>(); | ||
|
||
String address = config.get(RPC_SERVER_ADDRESS); | ||
if (address == null) { | ||
address = config.findLocalAddress(); | ||
} | ||
this.address = address; | ||
return channel; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indention |
||
} | ||
|
||
/** | ||
* Tells the RPC server to expect connections from clients. | ||
* | ||
|
@@ -310,3 +365,4 @@ private ClientInfo(String id, String secret, ClientCallback callback) { | |
} | ||
|
||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove
LAUNCHER_PORT
, seems it is not used any more.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zjffdu
As mentioned in the Comment for launcher port . This property is still being used to pass port information from contextlauncher to RSCDriver.
ContextLauncher.java ,set this property
conf.set(LAUNCHER_PORT, factory.getServer().getPort());
And RSCDriver takes that information
int launcherPort = livyConf.getInt(LAUNCHER_PORT);
So this property still needed.
But user setting it up will have no benefit. So we should not mention this property in livy-client.conf.template.
Please let me if its ok .
Other comments are indentation related , I'll fix them and push .