Skip to content

Commit 9d62691

Browse files
javier-godoymlopezFC
authored andcommitted
feat: add ConcurrentDownloadTimeoutEvent
1 parent 014338e commit 9d62691

File tree

7 files changed

+169
-6
lines changed

7 files changed

+169
-6
lines changed

pom.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,9 @@
301301
<supportedPackagings>
302302
<supportedPackaging>jar</supportedPackaging>
303303
</supportedPackagings>
304+
<systemProperties>
305+
<com.vaadin.flow.server.pushMode>AUTOMATIC</com.vaadin.flow.server.pushMode>
306+
</systemProperties>
304307
</configuration>
305308
</plugin>
306309
</plugins>
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.flowingcode.vaadin.addons.gridexporter;
2+
3+
import java.util.EventObject;
4+
import java.util.Objects;
5+
6+
/**
7+
* An event that is fired when a concurrent download timeout occurs in the {@link GridExporter}.
8+
* <p>
9+
* This event allows the handler to determine whether the event propagation should be stopped,
10+
* preventing other listeners from processing the event.
11+
* </p>
12+
*
13+
* @param <T> the type of the GridExporter source
14+
* @see GridExporter#setConcurrentDownloadTimeout(long, java.util.concurrent.TimeUnit)
15+
*/
16+
@SuppressWarnings("serial")
17+
public class ConcurrentDownloadTimeoutEvent extends EventObject {
18+
19+
private boolean propagationStopped;
20+
21+
/**
22+
* Constructs a new ConcurrentDownloadTimeoutEvent.
23+
*
24+
* @param source the {@link GridExporter} that is the source of this event
25+
* @throws IllegalArgumentException if source is null
26+
*/
27+
public ConcurrentDownloadTimeoutEvent(GridExporter<?> source) {
28+
super(Objects.requireNonNull(source));
29+
}
30+
31+
/**
32+
* Returns the source of this event.
33+
*
34+
* @return the {@code GridExporter} that is the source of this event
35+
*/
36+
@Override
37+
public GridExporter<?> getSource() {
38+
return (GridExporter<?>) super.getSource();
39+
}
40+
41+
/**
42+
* Stops the propagation of this event. When propagation is stopped, other listeners will not be
43+
* notified of this event.
44+
*/
45+
public void stopPropagation() {
46+
propagationStopped = true;
47+
}
48+
49+
/**
50+
* Checks if the propagation of this event has been stopped.
51+
*
52+
* @return {@code true} if the propagation has been stopped, {@code false} otherwise
53+
* @see #stopPropagation()
54+
*/
55+
public boolean isPropagationStopped() {
56+
return propagationStopped;
57+
}
58+
}

src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentStreamResourceWriter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,16 @@ public float getCost(VaadinSession session) {
147147
return DEFAULT_COST;
148148
}
149149

150+
/**
151+
* Callback method that is invoked when a timeout occurs while trying to acquire a permit for
152+
* starting a download.
153+
* <p>
154+
* Implementations can use this method to perform any necessary actions in response to the
155+
* timeout, such as logging a warning or notifying the user.
156+
* </p>
157+
*/
158+
protected abstract void onTimeout();
159+
150160
/**
151161
* Handles {@code stream} (writes data to it) using {@code session} as a context.
152162
* <p>
@@ -185,6 +195,7 @@ public final void accept(OutputStream stream, VaadinSession session) throws IOEx
185195
semaphore.release(permits);
186196
}
187197
} else {
198+
onTimeout();
188199
throw new InterruptedByTimeoutException();
189200
}
190201
} catch (InterruptedException e) {

src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@
3232
import com.vaadin.flow.data.renderer.BasicRenderer;
3333
import com.vaadin.flow.data.renderer.LitRenderer;
3434
import com.vaadin.flow.data.renderer.Renderer;
35+
import com.vaadin.flow.function.SerializableConsumer;
3536
import com.vaadin.flow.function.SerializableSupplier;
3637
import com.vaadin.flow.function.ValueProvider;
3738
import com.vaadin.flow.server.StreamResource;
3839
import com.vaadin.flow.server.StreamResourceWriter;
3940
import com.vaadin.flow.server.VaadinSession;
41+
import com.vaadin.flow.shared.Registration;
4042
import java.io.Serializable;
4143
import java.lang.reflect.Field;
4244
import java.lang.reflect.InvocationTargetException;
@@ -49,8 +51,10 @@
4951
import java.util.List;
5052
import java.util.Map;
5153
import java.util.Optional;
54+
import java.util.concurrent.CopyOnWriteArrayList;
5255
import java.util.concurrent.TimeUnit;
5356
import java.util.stream.Collectors;
57+
import java.util.stream.Stream;
5458
import org.slf4j.Logger;
5559
import org.slf4j.LoggerFactory;
5660

@@ -76,6 +80,10 @@ public class GridExporter<T> implements Serializable {
7680

7781
private static long concurrentDownloadTimeoutNanos = 0L;
7882
private float concurrentDownloadCost = DEFAULT_COST;
83+
private final List<SerializableConsumer<ConcurrentDownloadTimeoutEvent>> instanceDownloadTimeoutListeners =
84+
new CopyOnWriteArrayList<>();
85+
private static final List<SerializableConsumer<ConcurrentDownloadTimeoutEvent>> globalDownloadTimeoutListeners =
86+
new CopyOnWriteArrayList<>();
7987

8088
static final String COLUMN_VALUE_PROVIDER_DATA = "column-value-provider-data";
8189
static final String COLUMN_EXPORTED_PROVIDER_DATA = "column-value-exported-data";
@@ -328,9 +336,69 @@ public long getTimeout() {
328336
return concurrentDownloadTimeoutNanos;
329337
}
330338

339+
@Override
340+
protected void onTimeout() {
341+
fireConcurrentDownloadTimeout();
342+
}
343+
344+
331345
};
332346
}
333347

348+
/**
349+
* Handles the timeout event by notifying all registered listeners.
350+
* <p>
351+
* This method is called when a timeout occurs during a concurrent download. It creates a
352+
* {@link ConcurrentDownloadTimeoutEvent} and notifies all instance and global listeners. If any
353+
* listener stops the event propagation, subsequent listeners will not be notified.
354+
*/
355+
private void fireConcurrentDownloadTimeout() {
356+
if (!instanceDownloadTimeoutListeners.isEmpty() || !globalDownloadTimeoutListeners.isEmpty()) {
357+
grid.getUI().ifPresent(ui -> ui.access(() -> {
358+
ConcurrentDownloadTimeoutEvent ev = new ConcurrentDownloadTimeoutEvent(GridExporter.this);
359+
Stream.concat(instanceDownloadTimeoutListeners.stream(),
360+
globalDownloadTimeoutListeners.stream()).forEach(listener -> {
361+
if (!ev.isPropagationStopped()) {
362+
listener.accept(ev);
363+
}
364+
});
365+
}));
366+
}
367+
}
368+
369+
/**
370+
* Adds a listener for concurrent download timeout events specific to this instance.
371+
* <p>
372+
* The listener will be called whenever a concurrent download timeout event occurs.
373+
*
374+
* @param listener the listener to be added
375+
* @return a {@link Registration} object that can be used to remove the listener
376+
*/
377+
public Registration addConcurrentDownloadTimeoutEvent(
378+
SerializableConsumer<ConcurrentDownloadTimeoutEvent> listener) {
379+
instanceDownloadTimeoutListeners.add(0, listener);
380+
return () -> instanceDownloadTimeoutListeners.remove(listener);
381+
}
382+
383+
/**
384+
* Adds a global listener for concurrent download timeout events.
385+
* <p>
386+
* The listener will be called whenever a concurrent download timeout event occurs.
387+
* <p>
388+
* Note that instance-specific listeners take precedence over global listeners. If an instance
389+
* listener stops the event propagation by calling
390+
* {@link ConcurrentDownloadTimeoutEvent#stopPropagation() stopPropagation()}, the global
391+
* listeners will not be notified.
392+
*
393+
* @param listener the listener to be added
394+
* @return a {@link Registration} object that can be used to remove the listener
395+
*/
396+
public static Registration addGlobalConcurrentDownloadTimeoutEvent(
397+
SerializableConsumer<ConcurrentDownloadTimeoutEvent> listener) {
398+
globalDownloadTimeoutListeners.add(0, listener);
399+
return () -> globalDownloadTimeoutListeners.remove(listener);
400+
}
401+
334402
/**
335403
* Sets the limit for the {@linkplain #setConcurrentDownloadCost(float) cost of concurrent
336404
* downloads}. If all the downloads have a cost of {@link #DEFAULT_COST}, the limit represents the

src/test/java/com/flowingcode/vaadin/addons/gridexporter/GridExporterBigDatasetDemo.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ public GridExporterBigDatasetDemo() throws EncryptedDocumentException, IOExcepti
9494
9595
Additionally, <code>setConcurrentDownloadTimeout</code> enforces a timeout for acquiring the necessary permits
9696
during a download operation. If the permits are not obtained within the specified timeframe, the download
97-
request will be aborted, preventing prolonged waiting periods, especially during peak system loads.
97+
request will be aborted and the <code>DownloadTimeoutEvent</code> listener will execute, preventing prolonged
98+
waiting periods, especially during peak system loads.
9899
</div>""");
99100
add(concurrent);
100101
// #endif

src/test/java/com/flowingcode/vaadin/addons/gridexporter/VaadinServiceInitListenerImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.flowingcode.vaadin.addons.gridexporter;
22

3+
import com.vaadin.flow.component.notification.Notification;
4+
import com.vaadin.flow.component.notification.NotificationVariant;
35
import com.vaadin.flow.server.ServiceInitEvent;
46
import com.vaadin.flow.server.VaadinServiceInitListener;
57
import java.util.concurrent.TimeUnit;
@@ -10,6 +12,10 @@ public class VaadinServiceInitListenerImpl implements VaadinServiceInitListener
1012
public void serviceInit(ServiceInitEvent event) {
1113
GridExporter.setConcurrentDownloadLimit(10);
1214
GridExporter.setConcurrentDownloadTimeout(5, TimeUnit.SECONDS);
15+
GridExporter.addGlobalConcurrentDownloadTimeoutEvent(ev -> {
16+
Notification.show("System is busy. Please try again later.")
17+
.addThemeVariants(NotificationVariant.LUMO_ERROR);
18+
});
1319
}
1420

1521
}

src/test/java/com/flowingcode/vaadin/addons/gridexporter/test/ConcurrentExportTests.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static org.hamcrest.MatcherAssert.assertThat;
55
import static org.hamcrest.Matchers.equalTo;
66
import static org.hamcrest.Matchers.nullValue;
7+
import static org.junit.Assert.assertTrue;
78
import com.flowingcode.vaadin.addons.gridexporter.ConfigurableConcurrentStreamResourceWriter;
89
import com.flowingcode.vaadin.addons.gridexporter.GridExporter;
910
import com.vaadin.flow.server.StreamResourceWriter;
@@ -29,17 +30,24 @@ public class ConcurrentExportTests {
2930

3031
private static final int TEST_TIMEOUT = 5000;
3132

32-
private static Matcher<Throwable> interruptedByTimeout() {
33+
private static Matcher<Throwable> throwsInterruptedByTimeout() {
3334
return Matchers.instanceOf(InterruptedByTimeoutException.class);
3435
}
3536

3637
private class ConcurrentStreamResourceWriter
3738
extends ConfigurableConcurrentStreamResourceWriter {
3839

40+
private boolean interruptedByTimeout;
41+
3942
public ConcurrentStreamResourceWriter(StreamResourceWriter delegate) {
4043
super(delegate);
4144
}
4245

46+
@Override
47+
protected void onTimeout() {
48+
interruptedByTimeout = true;
49+
}
50+
4351
}
4452

4553
private CyclicBarrier barrier;
@@ -68,6 +76,8 @@ private interface MockDownload {
6876
MockDownload await() throws InterruptedException;
6977

7078
MockDownload start();
79+
80+
boolean wasInterruptedByTimeout();
7181
}
7282

7383
private MockDownload newDownload() {
@@ -143,6 +153,11 @@ public MockDownload withCost(float cost) {
143153
writer.setCost(cost);
144154
return this;
145155
}
156+
157+
@Override
158+
public boolean wasInterruptedByTimeout() {
159+
return writer.interruptedByTimeout;
160+
}
146161
};
147162
}
148163

@@ -244,7 +259,8 @@ public void testInterruptedByTimeout1()
244259
await(barrier);
245260

246261
assertThat(q1.get(), nullValue());
247-
assertThat(q2.get(), interruptedByTimeout());
262+
assertThat(q2.get(), throwsInterruptedByTimeout());
263+
assertTrue(q2.wasInterruptedByTimeout());
248264
}
249265

250266

@@ -262,7 +278,7 @@ public void testInterruptedByTimeout2()
262278

263279
assertThat(q1.get(), nullValue());
264280
assertThat(q2.get(), nullValue());
265-
assertThat(q3.get(), interruptedByTimeout());
281+
assertThat(q3.get(), throwsInterruptedByTimeout());
266282
}
267283

268284
@Test(timeout = TEST_TIMEOUT)
@@ -278,8 +294,8 @@ public void testInterruptedByTimeout3()
278294
await(barrier);
279295

280296
assertThat(q1.get(), nullValue());
281-
assertThat(q2.get(), interruptedByTimeout());
282-
assertThat(q3.get(), interruptedByTimeout());
297+
assertThat(q2.get(), throwsInterruptedByTimeout());
298+
assertThat(q3.get(), throwsInterruptedByTimeout());
283299
}
284300

285301
}

0 commit comments

Comments
 (0)