Skip to content

Commit bc8f2ac

Browse files
committed
Code changes in RPCserver for user provided port
1 parent 2abb8a3 commit bc8f2ac

File tree

1 file changed

+73
-33
lines changed

1 file changed

+73
-33
lines changed

Diff for: rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcServer.java

+73-33
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
import java.io.Closeable;
2121
import java.io.IOException;
22+
import java.net.BindException;
2223
import java.net.InetSocketAddress;
24+
import java.net.ServerSocket;
2325
import java.security.SecureRandom;
2426
import java.util.concurrent.ConcurrentHashMap;
2527
import java.util.concurrent.ConcurrentMap;
@@ -61,54 +63,91 @@ public class RpcServer implements Closeable {
6163
private static final SecureRandom RND = new SecureRandom();
6264

6365
private final String address;
64-
private final Channel channel;
66+
private Channel channel;
6567
private final EventLoopGroup group;
6668
private final int port;
6769
private final ConcurrentMap<String, ClientInfo> pendingClients;
6870
private final RSCConf config;
69-
71+
private final int DEFAULT_RETRY=10;
72+
73+
7074
public RpcServer(RSCConf lconf) throws IOException, InterruptedException {
7175
this.config = lconf;
7276
this.group = new NioEventLoopGroup(
73-
this.config.getInt(RPC_MAX_THREADS),
74-
Utils.newDaemonThreadFactory("RPC-Handler-%d"));
75-
this.channel = new ServerBootstrap()
76-
.group(group)
77-
.channel(NioServerSocketChannel.class)
78-
.childHandler(new ChannelInitializer<SocketChannel>() {
79-
@Override
80-
public void initChannel(SocketChannel ch) throws Exception {
81-
SaslServerHandler saslHandler = new SaslServerHandler(config);
82-
final Rpc newRpc = Rpc.createServer(saslHandler, config, ch, group);
83-
saslHandler.rpc = newRpc;
84-
85-
Runnable cancelTask = new Runnable() {
86-
@Override
87-
public void run() {
88-
LOG.warn("Timed out waiting for hello from client.");
89-
newRpc.close();
90-
}
91-
};
92-
saslHandler.cancelTask = group.schedule(cancelTask,
93-
config.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT),
94-
TimeUnit.MILLISECONDS);
95-
}
96-
})
97-
.option(ChannelOption.SO_BACKLOG, 1)
98-
.option(ChannelOption.SO_REUSEADDR, true)
99-
.childOption(ChannelOption.SO_KEEPALIVE, true)
100-
.bind(0)
101-
.sync()
102-
.channel();
77+
this.config.getInt(RPC_MAX_THREADS),
78+
Utils.newDaemonThreadFactory("RPC-Handler-%d"));
79+
int portNumber=config.getInt(LAUNCHER_PORT);
80+
for(int tries = 0 ; tries<DEFAULT_RETRY ; tries++){
81+
try {
82+
this.channel=createChannel(portNumber, tries);
83+
break;
84+
}catch(BindException e){
85+
LOG.warn("RPC not able to connect port "+ portNumber);
86+
portNumber = portNumber +1;
87+
}
88+
}
89+
10390
this.port = ((InetSocketAddress) channel.localAddress()).getPort();
10491
this.pendingClients = new ConcurrentHashMap<>();
105-
92+
LOG.warn("Connected to the port " + this.port);
10693
String address = config.get(RPC_SERVER_ADDRESS);
10794
if (address == null) {
10895
address = config.findLocalAddress();
10996
}
11097
this.address = address;
11198
}
99+
100+
/**
101+
* If user set the port number by livy.rsc.launcher.port then use that
102+
* @param portNumber : Provided by the user
103+
* @return
104+
* @throws IOException
105+
* @throws InterruptedException
106+
*/
107+
public Channel createChannel(int portNumber,int tries) throws IOException, InterruptedException{
108+
if(portNumber==-1) {
109+
return getChannel(0);
110+
}
111+
else {
112+
return getChannel(config.getInt(LAUNCHER_PORT) + tries );
113+
}
114+
}
115+
116+
/**
117+
* @throws InterruptedException
118+
*
119+
*/
120+
public Channel getChannel(int portNumber) throws BindException, InterruptedException{
121+
Channel channel = new ServerBootstrap()
122+
.group(group)
123+
.channel(NioServerSocketChannel.class)
124+
.childHandler(new ChannelInitializer<SocketChannel>() {
125+
@Override
126+
public void initChannel(SocketChannel ch) throws Exception {
127+
SaslServerHandler saslHandler = new SaslServerHandler(config);
128+
final Rpc newRpc = Rpc.createServer(saslHandler, config, ch, group);
129+
saslHandler.rpc = newRpc;
130+
131+
Runnable cancelTask = new Runnable() {
132+
@Override
133+
public void run() {
134+
LOG.warn("Timed out waiting for hello from client.");
135+
newRpc.close();
136+
}
137+
};
138+
saslHandler.cancelTask = group.schedule(cancelTask,
139+
config.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT),
140+
TimeUnit.MILLISECONDS);
141+
}
142+
})
143+
.option(ChannelOption.SO_BACKLOG, 1)
144+
.option(ChannelOption.SO_REUSEADDR, true)
145+
.childOption(ChannelOption.SO_KEEPALIVE, true)
146+
.bind(portNumber)
147+
.sync()
148+
.channel();
149+
return channel;
150+
}
112151

113152
/**
114153
* Tells the RPC server to expect connections from clients.
@@ -310,3 +349,4 @@ private ClientInfo(String id, String secret, ClientCallback callback) {
310349
}
311350

312351
}
352+

0 commit comments

Comments
 (0)