1
+ package com .flowingcode .vaadin .addons .gridexporter .test ;
2
+
3
+ import static org .apache .commons .io .output .NullOutputStream .NULL_OUTPUT_STREAM ;
4
+ import static org .hamcrest .MatcherAssert .assertThat ;
5
+ import static org .hamcrest .Matchers .equalTo ;
6
+ import static org .hamcrest .Matchers .nullValue ;
7
+ import com .flowingcode .vaadin .addons .gridexporter .ConfigurableConcurrentStreamResourceWriter ;
8
+ import com .flowingcode .vaadin .addons .gridexporter .GridExporter ;
9
+ import com .vaadin .flow .server .StreamResourceWriter ;
10
+ import com .vaadin .flow .server .VaadinService ;
11
+ import com .vaadin .flow .server .VaadinServletService ;
12
+ import com .vaadin .flow .server .VaadinSession ;
13
+ import java .io .IOException ;
14
+ import java .nio .channels .InterruptedByTimeoutException ;
15
+ import java .util .concurrent .CountDownLatch ;
16
+ import java .util .concurrent .CyclicBarrier ;
17
+ import java .util .concurrent .Exchanger ;
18
+ import java .util .concurrent .TimeUnit ;
19
+ import java .util .concurrent .locks .Lock ;
20
+ import java .util .concurrent .locks .ReentrantLock ;
21
+ import org .hamcrest .Matcher ;
22
+ import org .hamcrest .Matchers ;
23
+ import org .junit .Assert ;
24
+ import org .junit .Before ;
25
+ import org .junit .Test ;
26
+
27
+ @ SuppressWarnings ("serial" )
28
+ public class ConcurrentExportTests {
29
+
30
+ private static final int TEST_TIMEOUT = 5000 ;
31
+
32
+ private static Matcher <Throwable > interruptedByTimeout () {
33
+ return Matchers .instanceOf (InterruptedByTimeoutException .class );
34
+ }
35
+
36
+ private class ConcurrentStreamResourceWriter
37
+ extends ConfigurableConcurrentStreamResourceWriter {
38
+
39
+ public ConcurrentStreamResourceWriter (StreamResourceWriter delegate ) {
40
+ super (delegate );
41
+ }
42
+
43
+ }
44
+
45
+ private CyclicBarrier barrier ;
46
+
47
+ private void initializeCyclicBarrier (int parties ) {
48
+ barrier = new CyclicBarrier (parties );
49
+ }
50
+
51
+ @ Before
52
+ public void before () {
53
+ barrier = null ;
54
+ }
55
+
56
+ @ SuppressWarnings ("unchecked" )
57
+ private static <E extends Throwable > void sneakyThrow (Throwable e ) throws E {
58
+ throw (E ) e ;
59
+ }
60
+
61
+ private interface MockDownload {
62
+ MockDownload withTimeout (long timeout );
63
+
64
+ MockDownload withCost (float cost );
65
+
66
+ Throwable get () throws InterruptedException ;
67
+
68
+ MockDownload await () throws InterruptedException ;
69
+
70
+ MockDownload start ();
71
+ }
72
+
73
+ private MockDownload newDownload () {
74
+
75
+ CountDownLatch latch = new CountDownLatch (1 );
76
+
77
+ ConcurrentStreamResourceWriter writer =
78
+ new ConcurrentStreamResourceWriter ((stream , session ) -> {
79
+ latch .countDown ();
80
+ await (barrier );
81
+ });
82
+
83
+ Exchanger <Throwable > exchanger = new Exchanger <>();
84
+
85
+ Thread thread = new Thread (() -> {
86
+
87
+ Throwable throwable = null ;
88
+ try {
89
+ writer .accept (NULL_OUTPUT_STREAM , createSession ());
90
+ } catch (Throwable t ) {
91
+ throwable = t ;
92
+ }
93
+
94
+ latch .countDown ();
95
+ try {
96
+ exchanger .exchange (throwable );
97
+ } catch (InterruptedException e ) {
98
+ e .printStackTrace ();
99
+ }
100
+ });
101
+
102
+ return new MockDownload () {
103
+ @ Override
104
+ public Throwable get () throws InterruptedException {
105
+ if (thread .getState () == Thread .State .NEW ) {
106
+ throw new IllegalStateException ("Download has not started" );
107
+ }
108
+ return exchanger .exchange (null );
109
+ }
110
+
111
+ @ Override
112
+ public MockDownload start () {
113
+ if (thread .getState () == Thread .State .NEW ) {
114
+ thread .start ();
115
+ }
116
+
117
+ try {
118
+ latch .await (1 , TimeUnit .MILLISECONDS );
119
+ } catch (InterruptedException e ) {
120
+ sneakyThrow (e );
121
+ }
122
+
123
+ return this ;
124
+ }
125
+
126
+ @ Override
127
+ public MockDownload await () throws InterruptedException {
128
+ if (thread .getState () == Thread .State .NEW ) {
129
+ thread .start ();
130
+ }
131
+ latch .await ();
132
+ return this ;
133
+ }
134
+
135
+ @ Override
136
+ public MockDownload withTimeout (long timeout ) {
137
+ writer .setTimeout (TimeUnit .MILLISECONDS .toNanos (timeout ));
138
+ return this ;
139
+ }
140
+
141
+ @ Override
142
+ public MockDownload withCost (float cost ) {
143
+ writer .setCost (cost );
144
+ return this ;
145
+ }
146
+ };
147
+ }
148
+
149
+ private static void await (CyclicBarrier barrier ) {
150
+ try {
151
+ barrier .await ();
152
+ } catch (Exception e ) {
153
+ sneakyThrow (e );
154
+ }
155
+ }
156
+
157
+ private VaadinSession createSession () {
158
+ Lock lock = new ReentrantLock ();
159
+ VaadinService service = new VaadinServletService (null , null );
160
+ return new VaadinSession (service ) {
161
+ @ Override
162
+ public Lock getLockInstance () {
163
+ return lock ;
164
+ }
165
+ };
166
+ }
167
+
168
+ @ Test
169
+ public void testSetLimit () throws IOException {
170
+
171
+ float [] costs = new float [] {0.5f , 1 , 2 , ConcurrentStreamResourceWriter .MIN_COST ,
172
+ 2 * ConcurrentStreamResourceWriter .MIN_COST , ConcurrentStreamResourceWriter .MAX_COST ,
173
+ Float .POSITIVE_INFINITY };
174
+
175
+ // increment permits
176
+ for (float cost : costs ) {
177
+ ConcurrentStreamResourceWriter .setLimit (cost );
178
+ Assert .assertEquals (cost , ConcurrentStreamResourceWriter .getLimit (), 0 );
179
+ }
180
+
181
+ // shrink permits
182
+ for (int i = costs .length ; i -- > 0 ;) {
183
+ ConcurrentStreamResourceWriter .setLimit (costs [i ]);
184
+ Assert .assertEquals (costs [i ], ConcurrentStreamResourceWriter .getLimit (), 0 );
185
+ }
186
+
187
+ //finite costs are capped to MAX_COST
188
+ ConcurrentStreamResourceWriter .setLimit (0x10000 );
189
+ Assert .assertEquals (GridExporter .MAX_COST , ConcurrentStreamResourceWriter .getLimit (), 0 );
190
+
191
+ //Any positive cost, no matter how small, will require at least one permit.
192
+ ConcurrentStreamResourceWriter .setLimit (Float .MIN_NORMAL );
193
+ Assert .assertEquals (GridExporter .MIN_COST , ConcurrentStreamResourceWriter .getLimit (), 0 );
194
+
195
+ ConcurrentStreamResourceWriter .setLimit (0.99f / 0x10000 );
196
+ Assert .assertEquals (GridExporter .MIN_COST , ConcurrentStreamResourceWriter .getLimit (), 0 );
197
+ }
198
+
199
+ @ Test (expected = IllegalArgumentException .class )
200
+ public void testSetLimitWithZero () {
201
+ ConcurrentStreamResourceWriter .setLimit (0 );
202
+ }
203
+
204
+ @ Test (expected = IllegalArgumentException .class )
205
+ public void testSetLimitWithNegative () {
206
+ ConcurrentStreamResourceWriter .setLimit (-1 );
207
+ }
208
+
209
+ @ Test (timeout = TEST_TIMEOUT )
210
+ public void testUnlimitedDownloads ()
211
+ throws InterruptedException {
212
+ ConcurrentStreamResourceWriter .setLimit (Float .POSITIVE_INFINITY );
213
+ initializeCyclicBarrier (2 );
214
+
215
+ var q1 = newDownload ().await ();
216
+ var q2 = newDownload ().await ();
217
+
218
+ assertThat (q1 .get (), nullValue ());
219
+ assertThat (q2 .get (), nullValue ());
220
+ }
221
+
222
+ @ Test (timeout = TEST_TIMEOUT )
223
+ public void testConcurrentSuccess ()
224
+ throws InterruptedException {
225
+ ConcurrentStreamResourceWriter .setLimit (2 );
226
+ initializeCyclicBarrier (2 );
227
+
228
+ var q1 = newDownload ().await ();
229
+ var q2 = newDownload ().await ();
230
+
231
+ assertThat (q1 .get (), nullValue ());
232
+ assertThat (q2 .get (), nullValue ());
233
+ }
234
+
235
+ @ Test (timeout = TEST_TIMEOUT )
236
+ public void testInterruptedByTimeout1 ()
237
+ throws InterruptedException {
238
+ ConcurrentStreamResourceWriter .setLimit (1 );
239
+ initializeCyclicBarrier (2 );
240
+
241
+ var q1 = newDownload ().await ();
242
+ var q2 = newDownload ().start ();
243
+ assertThat (barrier .getNumberWaiting (), equalTo (1 ));
244
+ await (barrier );
245
+
246
+ assertThat (q1 .get (), nullValue ());
247
+ assertThat (q2 .get (), interruptedByTimeout ());
248
+ }
249
+
250
+
251
+ @ Test (timeout = TEST_TIMEOUT )
252
+ public void testInterruptedByTimeout2 ()
253
+ throws InterruptedException {
254
+ ConcurrentStreamResourceWriter .setLimit (2 );
255
+ initializeCyclicBarrier (3 );
256
+
257
+ var q1 = newDownload ().await ();
258
+ var q2 = newDownload ().await ();
259
+ var q3 = newDownload ().withCost (2 ).start ();
260
+ assertThat (barrier .getNumberWaiting (), equalTo (2 ));
261
+ await (barrier );
262
+
263
+ assertThat (q1 .get (), nullValue ());
264
+ assertThat (q2 .get (), nullValue ());
265
+ assertThat (q3 .get (), interruptedByTimeout ());
266
+ }
267
+
268
+ @ Test (timeout = TEST_TIMEOUT )
269
+ public void testInterruptedByTimeout3 ()
270
+ throws InterruptedException {
271
+ ConcurrentStreamResourceWriter .setLimit (2 );
272
+ initializeCyclicBarrier (2 );
273
+
274
+ var q1 = newDownload ().withCost (2 ).await ();
275
+ var q2 = newDownload ().start ();
276
+ var q3 = newDownload ().start ();
277
+ assertThat (barrier .getNumberWaiting (), equalTo (1 ));
278
+ await (barrier );
279
+
280
+ assertThat (q1 .get (), nullValue ());
281
+ assertThat (q2 .get (), interruptedByTimeout ());
282
+ assertThat (q3 .get (), interruptedByTimeout ());
283
+ }
284
+
285
+ }
0 commit comments