Skip to content
This repository was archived by the owner on May 14, 2018. It is now read-only.

Commit 95f6deb

Browse files
author
Andrey Kurilov
committed
v2.0.2: upgraded the async runnable implementation
1 parent 8b375df commit 95f6deb

File tree

105 files changed

+128
-30878
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

105 files changed

+128
-30878
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ allprojects {
1111
apply plugin: "maven"
1212
apply plugin: "signing"
1313
group = "com.github.akurilov"
14-
version = "2.0.1"
14+
version = "2.0.2"
1515
}
1616

1717
ext.moduleName = "${group}.concurrent"
@@ -21,7 +21,7 @@ repositories {
2121
}
2222

2323
dependencies {
24-
compile("com.github.akurilov:java-commons:[2.0.1,)")
24+
compile("com.github.akurilov:java-commons:[2.0.3,)")
2525
testCompile("junit:junit:4.12")
2626
}
2727

src/main/java/com/github/akurilov/concurrent/AsyncRunnableBase.java

Lines changed: 98 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package com.github.akurilov.concurrent;
22

33
import java.io.IOException;
4-
import java.rmi.RemoteException;
54
import java.util.concurrent.TimeUnit;
6-
import java.util.concurrent.atomic.AtomicReference;
5+
import java.util.concurrent.locks.Condition;
6+
import java.util.concurrent.locks.Lock;
7+
import java.util.concurrent.locks.ReentrantLock;
78

89
import static com.github.akurilov.concurrent.AsyncRunnable.State.FINISHED;
910
import static com.github.akurilov.concurrent.AsyncRunnable.State.INITIAL;
@@ -14,117 +15,156 @@
1415
public abstract class AsyncRunnableBase
1516
implements AsyncRunnable {
1617

17-
private final AtomicReference<State> stateRef = new AtomicReference<>(INITIAL);
18-
protected final Object state = new Object();
18+
private volatile State state = State.INITIAL;
19+
private final Lock stateLock = new ReentrantLock();
20+
protected final Condition stateChanged = stateLock.newCondition();
1921

2022
@Override
2123
public final State state() {
22-
return stateRef.get();
24+
return state;
2325
}
2426

2527
@Override
2628
public boolean isInitial() {
27-
return INITIAL.equals(stateRef.get());
29+
return INITIAL == state;
2830
}
2931

3032
@Override
3133
public boolean isStarted() {
32-
return STARTED.equals(stateRef.get());
34+
return STARTED == state;
3335
}
3436

3537
@Override
3638
public boolean isShutdown() {
37-
return SHUTDOWN.equals(stateRef.get());
39+
return SHUTDOWN == state;
3840
}
3941

4042
@Override
4143
public boolean isStopped() {
42-
return STOPPED.equals(stateRef.get());
44+
return STOPPED == state;
4345
}
4446

4547
@Override
4648
public boolean isFinished() {
47-
return FINISHED.equals(stateRef.get());
49+
return FINISHED == state;
4850
}
4951

5052
@Override
5153
public boolean isClosed() {
52-
return null == stateRef.get();
54+
return null == state;
5355
}
5456

5557
@Override
5658
public final AsyncRunnableBase start()
5759
throws IllegalStateException {
58-
if(stateRef.compareAndSet(INITIAL, STARTED) || stateRef.compareAndSet(STOPPED, STARTED)) {
59-
synchronized(state) {
60-
doStart();
61-
state.notifyAll();
60+
if(stateLock.tryLock()) {
61+
try {
62+
if(state == INITIAL || state == STOPPED) {
63+
doStart();
64+
state = STARTED;
65+
stateChanged.signalAll();
66+
} else {
67+
throw new IllegalStateException(
68+
"Not allowed to start while state is \"" + state + "\""
69+
);
70+
}
71+
} finally {
72+
stateLock.unlock();
6273
}
6374
} else {
64-
throw new IllegalStateException(
65-
"Not allowed to start while state is \"" + stateRef.get() + "\""
66-
);
75+
throw new IllegalStateException("Start: failed to acquire the state lock");
6776
}
6877
return this;
6978
}
7079

7180
@Override
7281
public final AsyncRunnableBase shutdown()
7382
throws IllegalStateException {
74-
if(stateRef.compareAndSet(STARTED, SHUTDOWN)) {
75-
synchronized(state) {
76-
doShutdown();
77-
state.notifyAll();
83+
if(stateLock.tryLock()) {
84+
try {
85+
if(state == STARTED) {
86+
doShutdown();
87+
state = SHUTDOWN;
88+
stateChanged.signalAll();
89+
} else {
90+
throw new IllegalStateException(
91+
"Not allowed to shutdown while state is \"" + state + "\""
92+
);
93+
}
94+
} finally {
95+
stateLock.unlock();
7896
}
7997
} else {
80-
throw new IllegalStateException(
81-
"Not allowed to shutdown while state is \"" + stateRef.get() + "\""
82-
);
98+
throw new IllegalStateException("Shutdown: failed to acquire the state lock");
8399
}
84100
return this;
85101
}
86102

87103
@Override
88104
public final AsyncRunnableBase stop()
89-
throws IllegalStateException, RemoteException {
90-
try {
91-
shutdown();
92-
} catch(final IllegalStateException ignored) {
93-
}
94-
if(stateRef.compareAndSet(STARTED, STOPPED) || stateRef.compareAndSet(SHUTDOWN, STOPPED)) {
95-
synchronized(state) {
96-
doStop();
97-
state.notifyAll();
105+
throws IllegalStateException {
106+
if(stateLock.tryLock()) {
107+
try {
108+
if(state == STARTED || state == SHUTDOWN) {
109+
doStop();
110+
state = STOPPED;
111+
stateChanged.signalAll();
112+
} else {
113+
throw new IllegalStateException(
114+
"Not allowed to stop while state is \"" + state + "\""
115+
);
116+
}
117+
} finally {
118+
stateLock.unlock();
98119
}
99120
} else {
100-
throw new IllegalStateException(
101-
"Not allowed to stop while state is \"" + stateRef.get() + "\""
102-
);
121+
throw new IllegalStateException("Stop: failed to acquire the state lock");
103122
}
104123
return this;
105124
}
106125

107126
@Override
108127
public final AsyncRunnableBase await()
109128
throws IllegalStateException, InterruptedException {
110-
await(Long.MAX_VALUE, TimeUnit.DAYS);
129+
await(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
111130
return this;
112131
}
113132

114133
@Override
115134
public boolean await(final long timeout, final TimeUnit timeUnit)
116135
throws IllegalStateException, InterruptedException {
117-
final long timeOutMilliSec = timeUnit.toMillis(timeout);
118-
final long t = System.currentTimeMillis();
119-
while(isStarted() || isShutdown()) {
120-
if(System.currentTimeMillis() - t >= timeOutMilliSec) {
121-
return false;
122-
}
123-
synchronized(state) {
124-
state.wait(100);
136+
final long invokeTimeMillis = System.currentTimeMillis();
137+
final long timeOutMillis = timeUnit.toMillis(timeout);
138+
long elapsedTimeMillis;
139+
while(timeOutMillis > (elapsedTimeMillis = System.currentTimeMillis() - invokeTimeMillis)) {
140+
if(state != STARTED && state != SHUTDOWN) {
141+
return true; // condition is reached
142+
} else {
143+
if(stateLock.tryLock(timeOutMillis - elapsedTimeMillis, TimeUnit.MILLISECONDS)) {
144+
try {
145+
// spent a time to wait for the state lock, need to update the elapsed time
146+
elapsedTimeMillis = System.currentTimeMillis() - invokeTimeMillis;
147+
// recheck for the timeout condition
148+
if(timeOutMillis > elapsedTimeMillis) {
149+
if(
150+
stateChanged.await(
151+
timeOutMillis - elapsedTimeMillis, TimeUnit.MILLISECONDS
152+
)
153+
) { // the state is changed, recheck the condition
154+
if(state != STARTED && state != SHUTDOWN) {
155+
return true;
156+
} // continue otherwise (no timeout yet, condition is not reached)
157+
}
158+
} else { // timeout, exit the loop
159+
break;
160+
}
161+
} finally {
162+
stateLock.unlock();
163+
}
164+
}
125165
}
126166
}
127-
return true;
167+
return state != STARTED && state != SHUTDOWN;
128168
}
129169

130170
@Override
@@ -136,12 +176,18 @@ public void close()
136176
} catch(final IllegalStateException ignored) {
137177
}
138178
// then close actually
139-
synchronized(state) {
140-
if(null != stateRef.get()) {
141-
doClose();
142-
stateRef.set(null);
143-
state.notifyAll();
179+
if(stateLock.tryLock()) {
180+
try {
181+
if(null != state) {
182+
doClose();
183+
state = null;
184+
stateChanged.signalAll();
185+
}
186+
} finally {
187+
stateLock.unlock();
144188
}
189+
} else {
190+
throw new IllegalStateException("Close: failed to acquire the state lock");
145191
}
146192
}
147193

src/main/java/com/github/akurilov/concurrent/coroutines/Coroutine.java renamed to src/main/java/com/github/akurilov/concurrent/coroutine/Coroutine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.github.akurilov.concurrent.coroutines;
1+
package com.github.akurilov.concurrent.coroutine;
22

33
import com.github.akurilov.concurrent.AsyncRunnable;
44

src/main/java/com/github/akurilov/concurrent/coroutines/CoroutineBase.java renamed to src/main/java/com/github/akurilov/concurrent/coroutine/CoroutineBase.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,14 @@
1-
package com.github.akurilov.concurrent.coroutines;
1+
package com.github.akurilov.concurrent.coroutine;
22

33
import com.github.akurilov.concurrent.AsyncRunnableBase;
44

5-
import java.util.logging.Logger;
6-
75
/**
86
* The base class for all coroutines.
97
*/
108
public abstract class CoroutineBase
119
extends AsyncRunnableBase
1210
implements Coroutine {
1311

14-
private static final Logger LOG = Logger.getLogger(CoroutineBase.class.getName());
15-
1612
private final CoroutinesExecutor executor;
1713

1814
protected CoroutineBase(final CoroutinesExecutor executor) {

src/main/java/com/github/akurilov/concurrent/coroutines/CoroutinesExecutor.java renamed to src/main/java/com/github/akurilov/concurrent/coroutine/CoroutinesExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.github.akurilov.concurrent.coroutines;
1+
package com.github.akurilov.concurrent.coroutine;
22

33
import com.github.akurilov.concurrent.ContextAwareThreadFactory;
44

src/main/java/com/github/akurilov/concurrent/coroutines/CoroutinesExecutorTask.java renamed to src/main/java/com/github/akurilov/concurrent/coroutine/CoroutinesExecutorTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.github.akurilov.concurrent.coroutines;
1+
package com.github.akurilov.concurrent.coroutine;
22

33
import com.github.akurilov.concurrent.AsyncRunnableBase;
44

src/main/java/com/github/akurilov/concurrent/coroutines/ExclusiveCoroutineBase.java renamed to src/main/java/com/github/akurilov/concurrent/coroutine/ExclusiveCoroutineBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.github.akurilov.concurrent.coroutines;
1+
package com.github.akurilov.concurrent.coroutine;
22

33
import java.util.concurrent.locks.Lock;
44
import java.util.concurrent.locks.ReentrantLock;

src/main/java/com/github/akurilov/concurrent/coroutines/OutputCoroutine.java renamed to src/main/java/com/github/akurilov/concurrent/coroutine/OutputCoroutine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.github.akurilov.concurrent.coroutines;
1+
package com.github.akurilov.concurrent.coroutine;
22

33
import com.github.akurilov.commons.io.Output;
44

src/main/java/com/github/akurilov/concurrent/coroutines/RoundRobinOutputCoroutine.java renamed to src/main/java/com/github/akurilov/concurrent/coroutine/RoundRobinOutputCoroutine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.github.akurilov.concurrent.coroutines;
1+
package com.github.akurilov.concurrent.coroutine;
22

33
import com.github.akurilov.commons.collection.OptLockArrayBuffer;
44
import com.github.akurilov.commons.collection.OptLockBuffer;

src/main/java/com/github/akurilov/concurrent/coroutines/TransferCoroutine.java renamed to src/main/java/com/github/akurilov/concurrent/coroutine/TransferCoroutine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.github.akurilov.concurrent.coroutines;
1+
package com.github.akurilov.concurrent.coroutine;
22

33
import com.github.akurilov.commons.collection.OptLockArrayBuffer;
44
import com.github.akurilov.commons.collection.OptLockBuffer;

src/main/java/com/github/akurilov/concurrent/coroutines/example/HelloWorldCoroutine.java renamed to src/main/java/com/github/akurilov/concurrent/coroutine/example/HelloWorldCoroutine.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
package com.github.akurilov.concurrent.coroutines.example;
1+
package com.github.akurilov.concurrent.coroutine.example;
22

3-
import com.github.akurilov.concurrent.coroutines.CoroutineBase;
4-
import com.github.akurilov.concurrent.coroutines.CoroutinesExecutor;
3+
import com.github.akurilov.concurrent.coroutine.CoroutineBase;
4+
import com.github.akurilov.concurrent.coroutine.CoroutinesExecutor;
55

66
import java.io.IOException;
77

src/main/java/com/github/akurilov/concurrent/coroutines/example/HelloWorldExclusiveCoroutine.java renamed to src/main/java/com/github/akurilov/concurrent/coroutine/example/HelloWorldExclusiveCoroutine.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
package com.github.akurilov.concurrent.coroutines.example;
1+
package com.github.akurilov.concurrent.coroutine.example;
22

3-
import com.github.akurilov.concurrent.coroutines.CoroutinesExecutor;
4-
import com.github.akurilov.concurrent.coroutines.ExclusiveCoroutineBase;
3+
import com.github.akurilov.concurrent.coroutine.CoroutinesExecutor;
4+
import com.github.akurilov.concurrent.coroutine.ExclusiveCoroutineBase;
55

66
import java.io.IOException;
77

src/main/java/com/github/akurilov/concurrent/coroutines/example/Main.java renamed to src/main/java/com/github/akurilov/concurrent/coroutine/example/Main.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
package com.github.akurilov.concurrent.coroutines.example;
1+
package com.github.akurilov.concurrent.coroutine.example;
22

3-
import com.github.akurilov.concurrent.coroutines.Coroutine;
4-
import com.github.akurilov.concurrent.coroutines.CoroutinesExecutor;
3+
import com.github.akurilov.concurrent.coroutine.Coroutine;
4+
import com.github.akurilov.concurrent.coroutine.CoroutinesExecutor;
55

66
import java.io.IOException;
77
import java.util.concurrent.TimeUnit;

src/main/java/com/github/akurilov/concurrent/IndexThrottle.java renamed to src/main/java/com/github/akurilov/concurrent/throttle/IndexThrottle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.github.akurilov.concurrent;
1+
package com.github.akurilov.concurrent.throttle;
22

33
public interface IndexThrottle {
44

src/main/java/com/github/akurilov/concurrent/RateThrottle.java renamed to src/main/java/com/github/akurilov/concurrent/throttle/RateThrottle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.github.akurilov.concurrent;
1+
package com.github.akurilov.concurrent.throttle;
22

33
import java.util.concurrent.TimeUnit;
44

src/main/java/com/github/akurilov/concurrent/SequentialWeightsThrottle.java renamed to src/main/java/com/github/akurilov/concurrent/throttle/SequentialWeightsThrottle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.github.akurilov.concurrent;
1+
package com.github.akurilov.concurrent.throttle;
22

33
import java.util.Arrays;
44

src/main/java/com/github/akurilov/concurrent/Throttle.java renamed to src/main/java/com/github/akurilov/concurrent/throttle/Throttle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.github.akurilov.concurrent;
1+
package com.github.akurilov.concurrent.throttle;
22

33
/**
44
Throttle can make a decision about the specified thing to pass or to wait.

0 commit comments

Comments
 (0)