Skip to content

Commit c5fb9ea

Browse files
committed
LIVY-239: Moving session ID genaration to StateStore and its subclasses
This commit also - updates the test cases around SessionStore - Removes the SessionManagerState class (Not neede anymore). We are storing a Long value that can be incremented. Task-url: https://issues.cloudera.org/browse/LIVY-239
1 parent cdf80f7 commit c5fb9ea

File tree

6 files changed

+56
-25
lines changed

6 files changed

+56
-25
lines changed

server/src/main/scala/com/cloudera/livy/server/recovery/BlackholeStateStore.scala

+7
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package com.cloudera.livy.server.recovery
2020

21+
import java.util.concurrent.atomic.AtomicLong
22+
2123
import scala.reflect.ClassTag
2224

2325
import com.cloudera.livy.LivyConf
@@ -27,11 +29,16 @@ import com.cloudera.livy.LivyConf
2729
* Livy will use this when session recovery is disabled.
2830
*/
2931
class BlackholeStateStore(livyConf: LivyConf) extends StateStore(livyConf) {
32+
33+
val atomicLong: AtomicLong = new AtomicLong(-1L)
34+
3035
def set(key: String, value: Object): Unit = {}
3136

3237
def get[T: ClassTag](key: String): Option[T] = None
3338

3439
def getChildren(key: String): Seq[String] = List.empty[String]
3540

3641
def remove(key: String): Unit = {}
42+
43+
override def increment(key: String): Long = atomicLong.incrementAndGet()
3744
}

server/src/main/scala/com/cloudera/livy/server/recovery/FileSystemStateStore.scala

+6
Original file line numberDiff line numberDiff line change
@@ -120,4 +120,10 @@ class FileSystemStateStore(
120120
}
121121

122122
private def absPath(key: String): Path = new Path(fsUri.getPath(), key)
123+
124+
override def increment(key: String): Long = synchronized {
125+
val incrementedValue = get[Long](key).getOrElse(-1L) + 1
126+
set(key, incrementedValue.asInstanceOf[Object])
127+
incrementedValue
128+
}
123129
}

server/src/main/scala/com/cloudera/livy/server/recovery/SessionStore.scala

+6-12
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ import scala.util.control.NonFatal
2727
import com.cloudera.livy.{LivyConf, Logging}
2828
import com.cloudera.livy.sessions.Session.RecoveryMetadata
2929

30-
private[recovery] case class SessionManagerState(nextSessionId: Int)
31-
3230
/**
3331
* SessionStore provides high level functions to get/save session state from/to StateStore.
3432
*/
@@ -64,18 +62,14 @@ class SessionStore(
6462
}
6563

6664
/**
67-
* Return the next unused session id with specified session type.
68-
* It checks the SessionManagerState stored and returns the next free session id.
69-
* If no SessionManagerState is stored, it returns 0.
70-
* It saves the new session ID to the session store.
65+
* Return the next unused session ID from state store with the specified session type.
66+
* If no value is stored state store, it returns 0.
67+
* It saves the next unused session ID to the session store before returning the current value.
7168
*
72-
* @throws Exception If SessionManagerState stored is corrupted, it throws an error.
69+
* @throws Exception If session store is corrupted or unreachable, it throws an error.
7370
*/
74-
def getNextSessionId(sessionType: String): Int = synchronized {
75-
val nextSessionId = store.get[SessionManagerState](sessionManagerPath(sessionType))
76-
.map(_.nextSessionId).getOrElse(0)
77-
store.set(sessionManagerPath(sessionType), SessionManagerState(nextSessionId + 1))
78-
nextSessionId
71+
def getNextSessionId(sessionType: String): Int = {
72+
store.increment(sessionManagerPath(sessionType)).toInt
7973
}
8074

8175
/**

server/src/main/scala/com/cloudera/livy/server/recovery/StateStore.scala

+7
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ abstract class StateStore(livyConf: LivyConf) extends JsonMapper {
7171
* @throws Exception Throw when persisting the state store fails.
7272
*/
7373
def remove(key: String): Unit
74+
75+
/**
76+
* Gets the Long value for the given key, increments the value, and stores the new value before
77+
* returning the value.
78+
* @return incremented value
79+
*/
80+
def increment(key: String): Long
7481
}
7582

7683
/**

server/src/main/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStore.scala

+27-9
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@ package com.cloudera.livy.server.recovery
1919

2020
import scala.collection.JavaConverters._
2121
import scala.reflect.ClassTag
22+
import scala.util.Try
23+
import scala.util.matching.Regex
2224

25+
import org.apache.curator.RetryPolicy
2326
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
2427
import org.apache.curator.framework.api.UnhandledErrorListener
28+
import org.apache.curator.framework.recipes.atomic.{DistributedAtomicLong => DistributedLong}
2529
import org.apache.curator.retry.RetryNTimes
2630
import org.apache.zookeeper.KeeperException.NoNodeException
2731

@@ -46,18 +50,22 @@ class ZooKeeperStateStore(
4650
}
4751

4852
private val zkAddress = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
53+
4954
require(!zkAddress.isEmpty, s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.")
50-
private val zkKeyPrefix = livyConf.get(ZK_KEY_PREFIX_CONF)
51-
private val curatorClient = mockCuratorClient.getOrElse {
52-
val retryValue = livyConf.get(ZK_RETRY_CONF)
55+
56+
private val retryValue = livyConf.get(ZK_RETRY_CONF)
57+
private val retryPolicy = Try {
58+
// a regex to match patterns like "m, n" where m and m both are integer values
5359
val retryPattern = """\s*(\d+)\s*,\s*(\d+)\s*""".r
54-
val retryPolicy = retryValue match {
55-
case retryPattern(n, sleepMs) => new RetryNTimes(5, 100)
56-
case _ => throw new IllegalArgumentException(
57-
s"$ZK_KEY_PREFIX_CONF contains bad value: $retryValue. " +
58-
"Correct format is <max retry count>,<sleep ms between retry>. e.g. 5,100")
59-
}
60+
val retryPattern(retryTimes, sleepMsBetweenRetries) = retryValue
61+
new RetryNTimes(retryTimes.toInt, sleepMsBetweenRetries.toInt)
62+
}.getOrElse { throw new IllegalArgumentException(
63+
s"$ZK_RETRY_CONF contains bad value: $retryValue. " +
64+
"Correct format is <max retry count>,<sleep ms between retry>. e.g. 5,100")
65+
}
6066

67+
private val zkKeyPrefix = livyConf.get(ZK_KEY_PREFIX_CONF)
68+
private val curatorClient = mockCuratorClient.getOrElse {
6169
CuratorFrameworkFactory.newClient(zkAddress, retryPolicy)
6270
}
6371

@@ -113,5 +121,15 @@ class ZooKeeperStateStore(
113121
}
114122
}
115123

124+
override def increment(key: String): Long = {
125+
val distributedSessionId = new DistributedLong(curatorClient, key, retryPolicy)
126+
distributedSessionId.increment() match {
127+
case atomicValue if atomicValue.succeeded() =>
128+
atomicValue.postValue()
129+
case _ =>
130+
throw new java.io.IOException(s"Failed to atomically increment the value for $key")
131+
}
132+
}
133+
116134
private def prefixKey(key: String) = s"/$zkKeyPrefix/$key"
117135
}

server/src/test/scala/com/cloudera/livy/server/recovery/SessionStoreSpec.scala

+3-4
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,11 @@ class SessionStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
8989
val stateStore = mock[StateStore]
9090
val sessionStore = new SessionStore(conf, stateStore)
9191

92-
when(stateStore.get[SessionManagerState](sessionManagerPath)).thenReturn(None)
92+
when(stateStore.increment(sessionManagerPath)).thenReturn(0L)
9393
sessionStore.getNextSessionId(sessionType) shouldBe 0
9494

95-
val sms = SessionManagerState(100)
96-
when(stateStore.get[SessionManagerState](sessionManagerPath)).thenReturn(Some(sms))
97-
sessionStore.getNextSessionId(sessionType) shouldBe sms.nextSessionId
95+
when(stateStore.increment(sessionManagerPath)).thenReturn(100)
96+
sessionStore.getNextSessionId(sessionType) shouldBe 100
9897
}
9998

10099
it("should remove session") {

0 commit comments

Comments
 (0)