Skip to content
This repository was archived by the owner on May 14, 2018. It is now read-only.

Commit 8b375df

Browse files
author
Andrey Kurilov
committed
v2.0.1: not finished, save
1 parent cdba0d5 commit 8b375df

14 files changed

+130
-166
lines changed

build.gradle

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ allprojects {
1111
apply plugin: "maven"
1212
apply plugin: "signing"
1313
group = "com.github.akurilov"
14-
version = "2.0.0"
14+
version = "2.0.1"
1515
}
1616

1717
ext.moduleName = "${group}.concurrent"
@@ -21,52 +21,29 @@ repositories {
2121
}
2222

2323
dependencies {
24-
compile("com.github.akurilov:java-commons:[2.0.0,)")
24+
compile("com.github.akurilov:java-commons:[2.0.1,)")
2525
testCompile("junit:junit:4.12")
2626
}
2727

2828
compileJava {
29-
sourceCompatibility = JavaVersion.VERSION_1_10
30-
targetCompatibility = JavaVersion.VERSION_1_10
31-
inputs.property("moduleName", moduleName)
32-
doFirst {
33-
options.compilerArgs = [
34-
"--module-path", classpath.asPath,
35-
]
36-
classpath = files()
37-
}
29+
sourceCompatibility = JavaVersion.VERSION_1_8
30+
targetCompatibility = JavaVersion.VERSION_1_8
3831
}
3932

4033
compileTestJava {
41-
sourceCompatibility = JavaVersion.VERSION_1_10
42-
targetCompatibility = JavaVersion.VERSION_1_10
43-
inputs.property("moduleName", moduleName)
44-
doFirst {
45-
options.compilerArgs = [
46-
"--module-path", classpath.asPath,
47-
"--add-modules", "ALL-MODULE-PATH",
48-
"--add-reads", "${moduleName}.test=junit",
49-
"--patch-module", "$moduleName=" + files(sourceSets.test.java.outputDir).asPath,
50-
]
51-
classpath = files()
52-
}
34+
sourceCompatibility = JavaVersion.VERSION_1_8
35+
targetCompatibility = JavaVersion.VERSION_1_8
5336
}
5437

5538
jar {
56-
inputs.property("moduleName", moduleName)
5739
manifest {
5840
attributes (
59-
"Automatic-Module-Name": moduleName,
6041
"Implementation-Version": version,
6142
"Implementation-Title": rootProject.name,
6243
)
6344
}
6445
}
6546

66-
javadoc {
67-
options.addStringOption("-module-path", classpath.asPath)
68-
}
69-
7047
task sourcesJar(type: Jar, dependsOn: classes) {
7148
classifier = "sources"
7249
from sourceSets.main.allSource

src/main/java/com/github/akurilov/concurrent/AsyncRunnableBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ public final AsyncRunnableBase await()
114114
@Override
115115
public boolean await(final long timeout, final TimeUnit timeUnit)
116116
throws IllegalStateException, InterruptedException {
117-
final var timeOutMilliSec = timeUnit.toMillis(timeout);
118-
final var t = System.currentTimeMillis();
117+
final long timeOutMilliSec = timeUnit.toMillis(timeout);
118+
final long t = System.currentTimeMillis();
119119
while(isStarted() || isShutdown()) {
120120
if(System.currentTimeMillis() - t >= timeOutMilliSec) {
121121
return false;
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.github.akurilov.concurrent;
2+
3+
public interface IndexThrottle {
4+
5+
boolean tryAcquire(final int index);
6+
7+
int tryAcquire(final int index, final int times);
8+
}

src/main/java/com/github/akurilov/concurrent/RateThrottle.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
/**
88
* A semaphore-like non-blocking throttle which permits at the given rate.
99
*/
10-
public final class RateThrottle<X>
11-
implements Throttle<X> {
10+
public final class RateThrottle
11+
implements Throttle {
1212

1313
private final long periodNanos;
1414
private volatile long startTime = -1;
@@ -27,10 +27,10 @@ public RateThrottle(final double rateLimit) {
2727
}
2828

2929
@Override
30-
public final boolean tryAcquire(final X item) {
30+
public final boolean tryAcquire() {
3131
synchronized(this) {
3232
if(startTime > 0) {
33-
final var periodCount = (nanoTime() - startTime) / periodNanos;
33+
final long periodCount = (nanoTime() - startTime) / periodNanos;
3434
if(periodCount > acquiredCount) {
3535
acquiredCount ++;
3636
return true;
@@ -46,10 +46,10 @@ public final boolean tryAcquire(final X item) {
4646
}
4747

4848
@Override
49-
public final int tryAcquire(final X item, final int requiredCount) {
49+
public final int tryAcquire(final int requiredCount) {
5050
synchronized(this) {
5151
if(startTime > 0) {
52-
final var availableCount = (int) (
52+
final int availableCount = (int) (
5353
(nanoTime() - startTime) / periodNanos - acquiredCount
5454
);
5555
if(availableCount > requiredCount) {
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package com.github.akurilov.concurrent;
2+
3+
import java.util.Arrays;
4+
5+
/**
6+
Created by kurila on 29.03.16.
7+
An throttle which uses the map of weights.
8+
The throttle determines the weight for each I/O task and makes the decision.
9+
The weight is used to pass the I/O task with specific ratio for the different keys.
10+
*/
11+
public final class SequentialWeightsThrottle {
12+
13+
// initial weight map (constant)
14+
private final int[] weights;
15+
private final int[] remainingWeights;
16+
17+
public SequentialWeightsThrottle(final int[] weights)
18+
throws IllegalArgumentException {
19+
this.weights = Arrays.copyOf(weights, weights.length);
20+
remainingWeights = new int[weights.length];
21+
resetRemainingWeights();
22+
}
23+
24+
private void resetRemainingWeights()
25+
throws IllegalArgumentException {
26+
for(int i = 0; i < weights.length; i ++) {
27+
remainingWeights[i] = weights[i];
28+
}
29+
}
30+
31+
private void ensureRemainingWeights() {
32+
for(int i = 0; i < weights.length; i ++) {
33+
if(remainingWeights[i] > 0) {
34+
return;
35+
}
36+
}
37+
resetRemainingWeights();
38+
}
39+
40+
public final boolean tryAcquire(final int index) {
41+
synchronized(remainingWeights) {
42+
ensureRemainingWeights();
43+
final int remainingWeight = remainingWeights[index];
44+
if(remainingWeight > 0) {
45+
remainingWeights[index] = remainingWeight - 1;
46+
return true;
47+
} else {
48+
return false;
49+
}
50+
}
51+
}
52+
53+
public final int tryAcquire(final int index, final int times) {
54+
if(times == 0) {
55+
return 0;
56+
}
57+
synchronized(remainingWeights) {
58+
ensureRemainingWeights();
59+
final int remainingWeight = remainingWeights[index];
60+
if(times > remainingWeight) {
61+
remainingWeights[index] = 0;
62+
return remainingWeight;
63+
} else {
64+
remainingWeights[index] = remainingWeight - times;
65+
return times;
66+
}
67+
}
68+
}
69+
}

src/main/java/com/github/akurilov/concurrent/Throttle.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,18 @@
44
Throttle can make a decision about the specified thing to pass or to wait.
55
The throttle calls are not blocking so the caller should block if the throttle tells so.
66
*/
7-
public interface Throttle<X> {
7+
public interface Throttle {
88

99
/**
1010
Request a permit about a thing
11-
@param thing the subject of the permit
1211
@return true if the thing should be passed, false otherwise
1312
*/
14-
boolean tryAcquire(final X thing);
13+
boolean tryAcquire();
1514

1615
/**
1716
Request permits about a set of things
18-
@param thing the subject of the permits
1917
@param times how many permits is requested
2018
@return how many permits are got
2119
*/
22-
int tryAcquire(final X thing, int times);
20+
int tryAcquire(int times);
2321
}

src/main/java/com/github/akurilov/concurrent/WeightThrottle.java

Lines changed: 0 additions & 74 deletions
This file was deleted.

src/main/java/com/github/akurilov/concurrent/coroutines/CoroutineBase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ protected void doStart() {
2929
*/
3030
@Override
3131
public final void invoke() {
32-
var t = System.nanoTime();
33-
invokeTimed(t);
32+
invokeTimed(System.nanoTime());
3433
}
3534

3635
/**

src/main/java/com/github/akurilov/concurrent/coroutines/CoroutinesExecutor.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,16 @@ public CoroutinesExecutor() {
3131
}
3232

3333
public CoroutinesExecutor(final boolean backgroundFlag) {
34-
final var svcThreadCount = Runtime.getRuntime().availableProcessors();
34+
final int svcThreadCount = Runtime.getRuntime().availableProcessors();
3535
executor = new ThreadPoolExecutor(
3636
svcThreadCount, svcThreadCount, 0, TimeUnit.DAYS, new ArrayBlockingQueue<>(1),
3737
new ContextAwareThreadFactory("coroutine-processor-", true, null)
3838
);
3939
this.backgroundFlag = backgroundFlag;
40-
for(var i = 0; i < svcThreadCount; i ++) {
41-
final var svcWorkerTask = new CoroutinesExecutorTask(coroutines, backgroundFlag);
40+
for(int i = 0; i < svcThreadCount; i ++) {
41+
final CoroutinesExecutorTask svcWorkerTask = new CoroutinesExecutorTask(
42+
coroutines, backgroundFlag
43+
);
4244
executor.submit(svcWorkerTask);
4345
workers.add(svcWorkerTask);
4446
svcWorkerTask.start();
@@ -54,22 +56,24 @@ public void stop(final Coroutine coroutine) {
5456
}
5557

5658
public void setThreadCount(final int threadCount) {
57-
final var newThreadCount = threadCount > 0 ?
59+
final int newThreadCount = threadCount > 0 ?
5860
threadCount : Runtime.getRuntime().availableProcessors();
59-
final var oldThreadCount = executor.getCorePoolSize();
61+
final int oldThreadCount = executor.getCorePoolSize();
6062
if(newThreadCount != oldThreadCount) {
6163
executor.setCorePoolSize(newThreadCount);
6264
executor.setMaximumPoolSize(newThreadCount);
6365
if(newThreadCount > oldThreadCount) {
64-
for(var i = oldThreadCount; i < newThreadCount; i ++) {
65-
final var execTask = new CoroutinesExecutorTask(coroutines, backgroundFlag);
66+
for(int i = oldThreadCount; i < newThreadCount; i ++) {
67+
final CoroutinesExecutorTask execTask = new CoroutinesExecutorTask(
68+
coroutines, backgroundFlag
69+
);
6670
executor.submit(execTask);
6771
workers.add(execTask);
6872
execTask.start();
6973
}
7074
} else { // less, remove some active service worker tasks
7175
try {
72-
for(var i = oldThreadCount - 1; i >= newThreadCount; i --) {
76+
for(int i = oldThreadCount - 1; i >= newThreadCount; i --) {
7377
workers.remove(i).close();
7478
}
7579
} catch (final Exception e) {

src/main/java/com/github/akurilov/concurrent/coroutines/CoroutinesExecutorTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public final void run() {
3333
break;
3434
}
3535
} else {
36-
for(final var nextCoroutine : coroutines) {
36+
for(final Coroutine nextCoroutine : coroutines) {
3737
try {
3838
if(nextCoroutine.isStarted()) {
3939
nextCoroutine.invoke();

0 commit comments

Comments
 (0)