Skip to content

Commit 5e8474e

Browse files
authored
LIVY-212. Implemented session recovery for interactive session. Only work with YARN.
livy-server can now recover interactive sessions after restart. There are some special cases: - If livy-server crashes while an interactive session is starting, there's a chance the session is unrecoverable, depending on timing (whether livy-repl has registered to livy-server). - If livy-server is down longer than the value of server.idle_timeout (default: 10min), livy-repl will timeout and quit. Note: All previous statements are lost after recovery. This will be fixed in a different commit. Closes #208
1 parent e5796de commit 5e8474e

File tree

20 files changed

+548
-311
lines changed

20 files changed

+548
-311
lines changed

client-http/src/test/scala/com/cloudera/livy/client/http/HttpClientSpec.scala

+6-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ import com.cloudera.livy.client.common.{BufferUtils, Serializer}
4040
import com.cloudera.livy.client.common.HttpMessages._
4141
import com.cloudera.livy.server.WebServer
4242
import com.cloudera.livy.server.interactive.{InteractiveSession, InteractiveSessionServlet}
43-
import com.cloudera.livy.sessions.{SessionState, Spark}
43+
import com.cloudera.livy.server.recovery.SessionStore
44+
import com.cloudera.livy.sessions.{InteractiveSessionManager, SessionState, Spark}
4445
import com.cloudera.livy.test.jobs.Echo
4546
import com.cloudera.livy.utils.AppInfo
4647

@@ -264,7 +265,10 @@ private class HttpClientTestBootstrap extends LifeCycle {
264265
private implicit def executor: ExecutionContext = ExecutionContext.global
265266

266267
override def init(context: ServletContext): Unit = {
267-
val servlet = new InteractiveSessionServlet(new LivyConf()) {
268+
val conf = new LivyConf()
269+
val stateStore = mock(classOf[SessionStore])
270+
val sessionManager = new InteractiveSessionManager(conf, stateStore, Some(Seq.empty))
271+
val servlet = new InteractiveSessionServlet(sessionManager, stateStore, conf) {
268272
override protected def createSession(req: HttpServletRequest): InteractiveSession = {
269273
val session = mock(classOf[InteractiveSession])
270274
val id = sessionManager.nextId()

integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala

+5-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,11 @@ abstract class BaseIntegrationTestSuite extends FunSuite with Matchers with Befo
9191
} catch { case NonFatal(_) => }
9292
throw e
9393
} finally {
94-
try { s.stop() } catch { case NonFatal(_) => }
94+
try {
95+
s.stop()
96+
} catch {
97+
case NonFatal(e) => alert(s"Failed to stop session: $e")
98+
}
9599
}
96100
}
97101

integration-test/src/main/scala/com/cloudera/livy/test/framework/LivyRestClient.scala

+7-5
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String)
9999
// Keeping the original timeout to avoid slowing down local development.
100100
eventually(timeout(t), interval(1 second)) {
101101
val s = snapshot().state
102-
assert(strStates.contains(s), s"Session state $s doesn't equal one of $strStates")
102+
assert(strStates.contains(s), s"Session $id state $s doesn't equal one of $strStates")
103103
}
104104
}
105105

@@ -145,8 +145,10 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String)
145145
val data = output("data").asInstanceOf[Map[String, Any]]
146146
Left(data("text/plain").asInstanceOf[String])
147147
case Some("error") => Right(mapper.convertValue(output, classOf[StatementError]))
148-
case Some(status) => throw new Exception(s"Unknown statement status: $status")
149-
case None => throw new Exception(s"Unknown statement output: $newStmt")
148+
case Some(status) =>
149+
throw new IllegalStateException(s"Unknown statement $stmtId status: $status")
150+
case None =>
151+
throw new IllegalStateException(s"Unknown statement $stmtId output: $newStmt")
150152
}
151153
}
152154
}
@@ -158,15 +160,15 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String)
158160
matchStrings(result, expectedRegex)
159161
}
160162
case Right(error) =>
161-
assert(false, s"Got error from statement $code: ${error.evalue}")
163+
assert(false, s"Got error from statement $stmtId $code: ${error.evalue}")
162164
}
163165
}
164166

165167
def verifyError(
166168
ename: String = null, evalue: String = null, stackTrace: String = null): Unit = {
167169
result() match {
168170
case Left(result) =>
169-
assert(false, s"Statement `$code` expected to fail, but succeeded.")
171+
assert(false, s"Statement $stmtId `$code` expected to fail, but succeeded.")
170172
case Right(error) =>
171173
val remoteStack = Option(error.stackTrace).getOrElse(Nil).mkString("\n")
172174
Seq(error.ename -> ename, error.evalue -> evalue, remoteStack -> stackTrace).foreach {

integration-test/src/test/scala/com/cloudera/livy/test/InteractiveIT.scala

+33-2
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,42 @@ class InteractiveIT extends BaseIntegrationTestSuite {
126126
}
127127
}
128128

129+
test("recover interactive session") {
130+
withNewSession(Spark()) { s =>
131+
s.run("1").verifyResult("res0: Int = 1")
132+
133+
// Restart Livy.
134+
cluster.stopLivy()
135+
cluster.runLivy()
136+
137+
// Verify session still exists.
138+
s.verifySessionIdle()
139+
s.run("2").verifyResult("res1: Int = 2")
140+
// TODO, verify previous statement results still exist.
141+
142+
s.stop()
143+
144+
// Restart Livy.
145+
cluster.stopLivy()
146+
cluster.runLivy()
147+
148+
// Verify deleted session doesn't show up after recovery.
149+
s.verifySessionDoesNotExist()
150+
151+
// Verify new session doesn't reuse old session id.
152+
withNewSession(Spark(), Map.empty, false) { s1 =>
153+
s1.id should be > s.id
154+
}
155+
}
156+
}
157+
129158
private def withNewSession[R]
130-
(kind: Kind, sparkConf: Map[String, String] = Map.empty)
159+
(kind: Kind, sparkConf: Map[String, String] = Map.empty, waitForIdle: Boolean = true)
131160
(f: (LivyRestClient#InteractiveSession) => R): R = {
132161
withSession(livyClient.startSession(kind, sparkConf)) { s =>
133-
s.verifySessionIdle()
162+
if (waitForIdle) {
163+
s.verifySessionIdle()
164+
}
134165
f(s)
135166
}
136167
}

rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java

+10
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public class RSCClient implements LivyClient {
5757
private final Promise<Rpc> driverRpc;
5858
private final int executorGroupId;
5959
private final EventLoopGroup eventLoopGroup;
60+
private final Promise<URI> serverUriPromise;
6061

6162
private ContextInfo contextInfo;
6263
private volatile boolean isAlive;
@@ -71,16 +72,21 @@ public class RSCClient implements LivyClient {
7172
this.eventLoopGroup = new NioEventLoopGroup(
7273
conf.getInt(RPC_MAX_THREADS),
7374
Utils.newDaemonThreadFactory("RSCClient-" + executorGroupId + "-%d"));
75+
this.serverUriPromise = ImmediateEventExecutor.INSTANCE.newPromise();
7476

7577
Utils.addListener(this.contextInfoPromise, new FutureListener<ContextInfo>() {
7678
@Override
7779
public void onSuccess(ContextInfo info) throws Exception {
7880
connectToContext(info);
81+
String url = String.format("rsc://%s:%s@%s:%d",
82+
info.clientId, info.secret, info.remoteAddress, info.remotePort);
83+
serverUriPromise.setSuccess(URI.create(url));
7984
}
8085

8186
@Override
8287
public void onFailure(Throwable error) {
8388
connectionError(error);
89+
serverUriPromise.setFailure(error);
8490
}
8591
});
8692

@@ -174,6 +180,10 @@ public void onFailure(Throwable error) throws Exception {
174180
return promise;
175181
}
176182

183+
public Future<URI> getServerUri() {
184+
return serverUriPromise;
185+
}
186+
177187
@Override
178188
public <T> JobHandle<T> submit(Job<T> job) {
179189
return protocol.submit(job);

server/src/main/scala/com/cloudera/livy/server/LivyServer.scala

+5-2
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import com.cloudera.livy._
3636
import com.cloudera.livy.server.batch.BatchSessionServlet
3737
import com.cloudera.livy.server.interactive.InteractiveSessionServlet
3838
import com.cloudera.livy.server.recovery.{SessionStore, StateStore}
39-
import com.cloudera.livy.sessions.BatchSessionManager
39+
import com.cloudera.livy.sessions.{BatchSessionManager, InteractiveSessionManager}
4040
import com.cloudera.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF
4141
import com.cloudera.livy.utils.LivySparkUtils._
4242
import com.cloudera.livy.utils.SparkYarnApp
@@ -104,6 +104,7 @@ class LivyServer extends Logging {
104104
StateStore.init(livyConf)
105105
val sessionStore = new SessionStore(livyConf)
106106
val batchSessionManager = new BatchSessionManager(livyConf, sessionStore)
107+
val interactiveSessionManager = new InteractiveSessionManager(livyConf, sessionStore)
107108

108109
server = new WebServer(livyConf, host, port)
109110
server.context.setResourceBase("src/main/com/cloudera/livy/server")
@@ -125,7 +126,9 @@ class LivyServer extends Logging {
125126
val context = sce.getServletContext()
126127
context.initParameters(org.scalatra.EnvironmentKey) = livyConf.get(ENVIRONMENT)
127128

128-
mount(context, new InteractiveSessionServlet(livyConf), "/sessions/*")
129+
val interactiveServlet =
130+
new InteractiveSessionServlet(interactiveSessionManager, sessionStore, livyConf)
131+
mount(context, interactiveServlet, "/sessions/*")
129132

130133
val batchServlet = new BatchSessionServlet(batchSessionManager, sessionStore, livyConf)
131134
mount(context, batchServlet, "/batches/*")

server/src/main/scala/com/cloudera/livy/server/SessionServlet.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.scalatra._
2727

2828
import com.cloudera.livy.{LivyConf, Logging}
2929
import com.cloudera.livy.sessions.{Session, SessionManager}
30+
import com.cloudera.livy.sessions.Session.RecoveryMetadata
3031

3132
object SessionServlet extends Logging
3233

@@ -37,8 +38,8 @@ object SessionServlet extends Logging
3738
* Type parameters:
3839
* S: the session type
3940
*/
40-
abstract class SessionServlet[S <: Session](
41-
private[livy] val sessionManager: SessionManager[S],
41+
abstract class SessionServlet[S <: Session, R <: RecoveryMetadata](
42+
private[livy] val sessionManager: SessionManager[S, R],
4243
livyConf: LivyConf)
4344
extends JsonServlet
4445
with ApiVersioningSupport

server/src/main/scala/com/cloudera/livy/server/batch/BatchSessionServlet.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import javax.servlet.http.HttpServletRequest
2323
import com.cloudera.livy.LivyConf
2424
import com.cloudera.livy.server.SessionServlet
2525
import com.cloudera.livy.server.recovery.SessionStore
26-
import com.cloudera.livy.sessions.SessionManager
26+
import com.cloudera.livy.sessions.BatchSessionManager
2727
import com.cloudera.livy.utils.AppInfo
2828

2929
case class BatchSessionView(
@@ -34,10 +34,10 @@ case class BatchSessionView(
3434
log: Seq[String])
3535

3636
class BatchSessionServlet(
37-
sessionManager: SessionManager[BatchSession],
37+
sessionManager: BatchSessionManager,
3838
sessionStore: SessionStore,
3939
livyConf: LivyConf)
40-
extends SessionServlet[BatchSession](sessionManager, livyConf)
40+
extends SessionServlet(sessionManager, livyConf)
4141
{
4242

4343
override protected def createSession(req: HttpServletRequest): BatchSession = {

0 commit comments

Comments
 (0)