Skip to content

Commit cdf80f7

Browse files
committed
LIVY-239: Moving the logic to generate session IDs from Session Manager
to SessionStore Task-url: https://issues.cloudera.org/browse/LIVY-239
1 parent 5de6cf2 commit cdf80f7

File tree

2 files changed

+8
-18
lines changed

2 files changed

+8
-18
lines changed

server/src/main/scala/com/cloudera/livy/server/recovery/SessionStore.scala

+6-7
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,6 @@ class SessionStore(
4747
store.set(sessionPath(sessionType, m.id), m)
4848
}
4949

50-
def saveNextSessionId(sessionType: String, id: Int): Unit = {
51-
store.set(sessionManagerPath(sessionType), SessionManagerState(id))
52-
}
53-
5450
/**
5551
* Return all sessions stored in the store with specified session type.
5652
*/
@@ -69,14 +65,17 @@ class SessionStore(
6965

7066
/**
7167
* Return the next unused session id with specified session type.
72-
* If checks the SessionManagerState stored and returns the next free session id.
68+
* It checks the SessionManagerState stored and returns the next free session id.
7369
* If no SessionManagerState is stored, it returns 0.
70+
* It saves the new session ID to the session store.
7471
*
7572
* @throws Exception If SessionManagerState stored is corrupted, it throws an error.
7673
*/
77-
def getNextSessionId(sessionType: String): Int = {
78-
store.get[SessionManagerState](sessionManagerPath(sessionType))
74+
def getNextSessionId(sessionType: String): Int = synchronized {
75+
val nextSessionId = store.get[SessionManagerState](sessionManagerPath(sessionType))
7976
.map(_.nextSessionId).getOrElse(0)
77+
store.set(sessionManagerPath(sessionType), SessionManagerState(nextSessionId + 1))
78+
nextSessionId
8079
}
8180

8281
/**

server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala

+2-11
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
6969

7070
protected implicit def executor: ExecutionContext = ExecutionContext.global
7171

72-
protected[this] final val idCounter = new AtomicInteger(0)
7372
protected[this] final val sessions = mutable.LinkedHashMap[Int, S]()
7473

7574
private[this] final val sessionTimeout =
@@ -78,11 +77,8 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
7877
mockSessions.getOrElse(recover()).foreach(register)
7978
new GarbageCollector().start()
8079

81-
def nextId(): Int = synchronized {
82-
val id = idCounter.getAndIncrement()
83-
sessionStore.saveNextSessionId(sessionType, idCounter.get())
84-
id
85-
}
80+
// sessionStore.getNextSessionId is guaranteed to return atomic and returns unique IDs.
81+
def nextId(): Int = sessionStore.getNextSessionId(sessionType)
8682

8783
def register(session: S): S = {
8884
info(s"Registering new session ${session.id}")
@@ -136,18 +132,13 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
136132
}
137133

138134
private def recover(): Seq[S] = {
139-
// Recover next session id from state store and create SessionManager.
140-
idCounter.set(sessionStore.getNextSessionId(sessionType))
141135

142136
// Retrieve session recovery metadata from state store.
143137
val sessionMetadata = sessionStore.getAllSessions[R](sessionType)
144138

145139
// Recover session from session recovery metadata.
146140
val recoveredSessions = sessionMetadata.flatMap(_.toOption).map(sessionRecovery)
147141

148-
info(s"Recovered ${recoveredSessions.length} $sessionType sessions." +
149-
s" Next session id: $idCounter")
150-
151142
// Print recovery error.
152143
val recoveryFailure = sessionMetadata.filter(_.isFailure).map(_.failed.get)
153144
recoveryFailure.foreach(ex => error(ex.getMessage, ex.getCause))

0 commit comments

Comments
 (0)