@@ -19,9 +19,13 @@ package com.cloudera.livy.server.recovery
19
19
20
20
import scala .collection .JavaConverters ._
21
21
import scala .reflect .ClassTag
22
+ import scala .util .Try
23
+ import scala .util .matching .Regex
22
24
25
+ import org .apache .curator .RetryPolicy
23
26
import org .apache .curator .framework .{CuratorFramework , CuratorFrameworkFactory }
24
27
import org .apache .curator .framework .api .UnhandledErrorListener
28
+ import org .apache .curator .framework .recipes .atomic .{DistributedAtomicLong => DistributedLong }
25
29
import org .apache .curator .retry .RetryNTimes
26
30
import org .apache .zookeeper .KeeperException .NoNodeException
27
31
@@ -46,18 +50,22 @@ class ZooKeeperStateStore(
46
50
}
47
51
48
52
private val zkAddress = livyConf.get(LivyConf .RECOVERY_STATE_STORE_URL )
53
+
49
54
require(! zkAddress.isEmpty, s " Please config ${LivyConf .RECOVERY_STATE_STORE_URL .key}. " )
50
- private val zkKeyPrefix = livyConf.get(ZK_KEY_PREFIX_CONF )
51
- private val curatorClient = mockCuratorClient.getOrElse {
52
- val retryValue = livyConf.get(ZK_RETRY_CONF )
55
+
56
+ private val retryValue = livyConf.get(ZK_RETRY_CONF )
57
+ private val retryPolicy = Try {
58
+ // a regex to match patterns like "m, n" where m and m both are integer values
53
59
val retryPattern = """ \s*(\d+)\s*,\s*(\d+)\s*""" .r
54
- val retryPolicy = retryValue match {
55
- case retryPattern(n, sleepMs) => new RetryNTimes (5 , 100 )
56
- case _ => throw new IllegalArgumentException (
57
- s " $ZK_KEY_PREFIX_CONF contains bad value: $retryValue. " +
58
- " Correct format is <max retry count>,<sleep ms between retry>. e.g. 5,100" )
59
- }
60
+ val retryPattern(retryTimes, sleepMsBetweenRetries) = retryValue
61
+ new RetryNTimes (retryTimes.toInt, sleepMsBetweenRetries.toInt )
62
+ }.getOrElse { throw new IllegalArgumentException (
63
+ s " $ZK_RETRY_CONF contains bad value: $retryValue. " +
64
+ " Correct format is <max retry count>,<sleep ms between retry>. e.g. 5,100" )
65
+ }
60
66
67
+ private val zkKeyPrefix = livyConf.get(ZK_KEY_PREFIX_CONF )
68
+ private val curatorClient = mockCuratorClient.getOrElse {
61
69
CuratorFrameworkFactory .newClient(zkAddress, retryPolicy)
62
70
}
63
71
@@ -113,5 +121,15 @@ class ZooKeeperStateStore(
113
121
}
114
122
}
115
123
124
+ override def increment (key : String ): Long = {
125
+ val distributedSessionId = new DistributedLong (curatorClient, key, retryPolicy)
126
+ distributedSessionId.increment() match {
127
+ case atomicValue if atomicValue.succeeded() =>
128
+ atomicValue.postValue()
129
+ case _ =>
130
+ throw new java.io.IOException (s " Failed to atomically increment the value for $key" )
131
+ }
132
+ }
133
+
116
134
private def prefixKey (key : String ) = s " / $zkKeyPrefix/ $key"
117
135
}
0 commit comments