Skip to content
This repository was archived by the owner on May 14, 2018. It is now read-only.

Commit 7ebf06a

Browse files
author
Andrey Kurilov
committed
2 parents 95f6deb + 82c4f64 commit 7ebf06a

File tree

7 files changed

+118
-141
lines changed

7 files changed

+118
-141
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ executed concurrently also.
2525
## Gradle
2626

2727
```groovy
28-
compile group: 'com.github.akurilov', name: 'java-concurrent', version: '1.1.4'
28+
compile group: 'com.github.akurilov', name: 'java-concurrent', version: '2.0.3'
2929
```
3030

3131
## Implementing Basic Coroutine

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ allprojects {
1111
apply plugin: "maven"
1212
apply plugin: "signing"
1313
group = "com.github.akurilov"
14-
version = "2.0.2"
14+
version = "2.0.3"
1515
}
1616

1717
ext.moduleName = "${group}.concurrent"

src/main/java/com/github/akurilov/concurrent/throttle/SequentialWeightsThrottle.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
The throttle determines the weight for each I/O task and makes the decision.
99
The weight is used to pass the I/O task with specific ratio for the different keys.
1010
*/
11-
public final class SequentialWeightsThrottle {
11+
public final class SequentialWeightsThrottle
12+
implements IndexThrottle {
1213

1314
// initial weight map (constant)
1415
private final int[] weights;

src/test/java/com/github/akurilov/concurrent/test/coroutine/RoundRobinOutputCoroutineTest.java renamed to src/test/java/com/github/akurilov/concurrent/coroutine/RoundRobinOutputCoroutineTest.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
1-
package com.github.akurilov.concurrent.test.coroutine;
1+
package com.github.akurilov.concurrent.coroutine;
22

33
import com.github.akurilov.commons.io.Input;
44
import com.github.akurilov.commons.io.Output;
55

6-
import com.github.akurilov.concurrent.coroutine.CoroutinesExecutor;
7-
import com.github.akurilov.concurrent.coroutine.RoundRobinOutputCoroutine;
86
import org.junit.Test;
97
import org.junit.runner.RunWith;
108
import org.junit.runners.Parameterized;
11-
129
import static org.junit.Assert.assertTrue;
1310
import static org.junit.Assert.fail;
1411

src/test/java/com/github/akurilov/concurrent/test/throttle/SequentialWeightsThrottleTest.java

Lines changed: 0 additions & 110 deletions
This file was deleted.

src/test/java/com/github/akurilov/concurrent/test/throttle/RateThrottleTest.java renamed to src/test/java/com/github/akurilov/concurrent/throttle/RateThrottleTest.java

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
package com.github.akurilov.concurrent.test.throttle;
1+
package com.github.akurilov.concurrent.throttle;
22

3-
import com.github.akurilov.concurrent.throttle.RateThrottle;
43
import org.junit.Test;
54
import static org.junit.Assert.assertEquals;
65

@@ -20,13 +19,12 @@ public void testRate100mHzNonBatch()
2019
throws Exception {
2120
final double rateLimit = 0.1;
2221
final int timeLimitSec = 50;
23-
final RateThrottle throttle = new RateThrottle(rateLimit);
24-
final Object subj = new Object();
22+
final Throttle throttle = new RateThrottle(rateLimit);
2523
final LongAdder counter = new LongAdder();
2624
final Thread submThread = new Thread(
2725
() -> {
2826
while(true) {
29-
if(throttle.tryAcquire(subj)) {
27+
if(throttle.tryAcquire()) {
3028
counter.increment();
3129
} else {
3230
LockSupport.parkNanos(1);
@@ -46,13 +44,12 @@ public void testRate10HzNonBatch()
4644
throws Exception {
4745
final int rateLimit = 10;
4846
final int timeLimitSec = 10;
49-
final RateThrottle throttle = new RateThrottle(rateLimit);
50-
final Object subj = new Object();
47+
final Throttle throttle = new RateThrottle(rateLimit);
5148
final LongAdder counter = new LongAdder();
5249
final Thread submThread = new Thread(
5350
() -> {
5451
while(true) {
55-
if(throttle.tryAcquire(subj)) {
52+
if(throttle.tryAcquire()) {
5653
counter.increment();
5754
} else {
5855
LockSupport.parkNanos(1);
@@ -72,13 +69,12 @@ public void testRate100kHzNonBatch()
7269
throws Exception {
7370
final int rateLimit = 100_000;
7471
final int timeLimitSec = 20;
75-
final RateThrottle throttle = new RateThrottle(rateLimit);
76-
final Object subj = new Object();
72+
final Throttle throttle = new RateThrottle(rateLimit);
7773
final LongAdder counter = new LongAdder();
7874
final Thread submThread = new Thread(
7975
() -> {
8076
while(true) {
81-
if(throttle.tryAcquire(subj)) {
77+
if(throttle.tryAcquire()) {
8278
counter.increment();
8379
} else {
8480
LockSupport.parkNanos(1);
@@ -98,14 +94,13 @@ public void testRate1HzBatch()
9894
throws Exception {
9995
final int rateLimit = 1;
10096
final int timeLimitSec = 50;
101-
final RateThrottle throttle = new RateThrottle(rateLimit);
102-
final Object subj = new Object();
97+
final Throttle throttle = new RateThrottle(rateLimit);
10398
final LongAdder counter = new LongAdder();
10499
final Thread submThread = new Thread(
105100
() -> {
106101
int n;
107102
while(true) {
108-
n = throttle.tryAcquire(subj, 10);
103+
n = throttle.tryAcquire(10);
109104
if(n > 0) {
110105
counter.add(n);
111106
} else {
@@ -126,14 +121,13 @@ public void testRate100HzBatch()
126121
throws Exception {
127122
final int rateLimit = 100;
128123
final int timeLimitSec = 50;
129-
final RateThrottle throttle = new RateThrottle(rateLimit);
130-
final Object subj = new Object();
124+
final Throttle throttle = new RateThrottle(rateLimit);
131125
final LongAdder counter = new LongAdder();
132126
final Thread submThread = new Thread(
133127
() -> {
134128
int n;
135129
while(true) {
136-
n = throttle.tryAcquire(subj, 100);
130+
n = throttle.tryAcquire(100);
137131
if(n > 0) {
138132
counter.add(n);
139133
} else {
@@ -154,14 +148,13 @@ public void testRate1MHzBatch()
154148
throws Exception {
155149
final int rateLimit = 1_000_000;
156150
final int timeLimitSec = 10;
157-
final RateThrottle throttle = new RateThrottle(rateLimit);
158-
final Object subj = new Object();
151+
final Throttle throttle = new RateThrottle(rateLimit);
159152
final LongAdder counter = new LongAdder();
160153
final Thread submThread = new Thread(
161154
() -> {
162155
int n;
163156
while(true) {
164-
n = throttle.tryAcquire(subj, 100);
157+
n = throttle.tryAcquire(100);
165158
if(n > 0) {
166159
counter.add(n);
167160
} else {
@@ -181,8 +174,7 @@ public void testRate1MHzBatch()
181174
public void testRate1kHzBatchConcurrent() {
182175
final int rateLimit = 1_000;
183176
final int timeLimitSec = 20;
184-
final Object subj = new Object();
185-
final RateThrottle throttle = new RateThrottle(rateLimit);
177+
final Throttle throttle = new RateThrottle(rateLimit);
186178
final LongAdder counter = new LongAdder();
187179
final ExecutorService execSvc = Executors.newFixedThreadPool(4);
188180
for(int i = 0; i < 4; i ++) {
@@ -192,13 +184,13 @@ public void testRate1kHzBatchConcurrent() {
192184
int n;
193185
while(true) {
194186
if(j == 0) {
195-
if(throttle.tryAcquire(subj)) {
187+
if(throttle.tryAcquire()) {
196188
counter.increment();
197189
} else {
198190
LockSupport.parkNanos(1);
199191
}
200192
} else {
201-
n = throttle.tryAcquire(subj, 1 + j);
193+
n = throttle.tryAcquire(1 + j);
202194
if(n > 0) {
203195
counter.add(n);
204196
} else {
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package com.github.akurilov.concurrent.throttle;
2+
3+
import org.junit.Test;
4+
import static org.junit.Assert.assertEquals;
5+
6+
import java.util.concurrent.ExecutorService;
7+
import java.util.concurrent.Executors;
8+
import java.util.concurrent.TimeUnit;
9+
import java.util.concurrent.atomic.LongAdder;
10+
import java.util.concurrent.locks.LockSupport;
11+
12+
/**
13+
Created by andrey on 06.11.16.
14+
*/
15+
16+
public class SequentialWeightsThrottleTest {
17+
18+
private static final int WRITE = 0;
19+
private static final int READ = 1;
20+
21+
private final int[] weights = new int[] {
22+
80,
23+
20
24+
};
25+
private final LongAdder[] resultCounters = new LongAdder[] {
26+
new LongAdder(),
27+
new LongAdder()
28+
};
29+
30+
private final SequentialWeightsThrottle wt = new SequentialWeightsThrottle(weights);
31+
32+
private final class SubmTask
33+
implements Runnable {
34+
private final int origin;
35+
public SubmTask(final int origin) {
36+
this.origin = origin;
37+
}
38+
@Override
39+
public final void run() {
40+
while(true) {
41+
if(wt.tryAcquire(origin)) {
42+
resultCounters[origin].increment();
43+
} else {
44+
LockSupport.parkNanos(1);
45+
}
46+
}
47+
}
48+
}
49+
50+
@Test
51+
public void testRequestApprovalFor()
52+
throws Exception {
53+
final ExecutorService es = Executors.newFixedThreadPool(2);
54+
es.submit(new SubmTask(WRITE));
55+
es.submit(new SubmTask(READ));
56+
es.awaitTermination(10, TimeUnit.SECONDS);
57+
es.shutdownNow();
58+
final double writes = resultCounters[WRITE].sum();
59+
final long reads = resultCounters[READ].sum();
60+
assertEquals(80/20, writes / reads, 0.01);
61+
System.out.println("Write rate: " + writes / 10 + " Hz, read rate: " + reads / 10 + " Hz");
62+
}
63+
64+
private final class BatchSubmTask
65+
implements Runnable {
66+
private final int origin;
67+
public BatchSubmTask(final int origin) {
68+
this.origin = origin;
69+
}
70+
@Override
71+
public final void run() {
72+
int n;
73+
while(true) {
74+
n = wt.tryAcquire(origin, 128);
75+
if(n > 0) {
76+
resultCounters[origin].add(n);
77+
} else {
78+
LockSupport.parkNanos(1);
79+
}
80+
}
81+
}
82+
}
83+
84+
@Test
85+
public void testRequestBatchApprovalFor()
86+
throws Exception {
87+
final ExecutorService es = Executors.newFixedThreadPool(2);
88+
es.submit(new BatchSubmTask(WRITE));
89+
es.submit(new BatchSubmTask(READ));
90+
es.awaitTermination(10, TimeUnit.SECONDS);
91+
es.shutdownNow();
92+
final double writes = resultCounters[WRITE].sum();
93+
final long reads = resultCounters[READ].sum();
94+
assertEquals(80/20, writes / reads, 0.01);
95+
System.out.println("Write rate: " + writes / 10 + " Hz, read rate: " + reads / 10 + " Hz");
96+
}
97+
}

0 commit comments

Comments
 (0)