+ * This event allows the handler to determine whether the event propagation should be stopped, + * preventing other listeners from processing the event. + *
+ * + * @param+ * Finite limits are capped to {@link #MAX_COST} (32767). If the limit is + * {@link Float#POSITIVE_INFINITY POSITIVE_INFINITY}, the semaphore will not be used for + * controlling concurrent downloads. + * + * @param limit the maximum cost of concurrent downloads allowed + * @throws IllegalArgumentException if the limit is zero or negative. + */ + public static void setLimit(float limit) { + if (limit <= 0) { + throw new IllegalArgumentException(); + } + if (Float.isInfinite(limit)) { + enabled = false; + return; + } + + synchronized (semaphore) { + enabled = true; + semaphore.setPermits(costToPermits(limit, Integer.MAX_VALUE)); + } + } + + /** + * Returns the limit for the number of concurrent downloads. + * + * @return the limit for the number of concurrent downloads, or {@link Float#POSITIVE_INFINITY} if + * the semaphore is disabled. + */ + public static float getLimit() { + if (enabled) { + synchronized (semaphore) { + return (float) semaphore.maxPermits / 0x10000; + } + } else { + return Float.POSITIVE_INFINITY; + } + } + + private static int costToPermits(float cost, int maxPermits) { + // restrict limit to 0x7fff to ensure the cost can be represented + // using fixed-point arithmetic with 16 fractional digits and 15 integral digits + cost = Math.min(cost, MAX_COST); + // Determine the number of permits required based on the cost, capping at maxPermits. + // If the cost is zero or negative, no permits are needed. + // Any positive cost, no matter how small, will require at least one permit. + return cost <= 0 ? 0 : Math.max(Math.min((int) (cost * 0x10000), maxPermits), 1); + } + + /** + * Constructs a {@code ConcurrentStreamResourceWriter} with the specified delegate. The delegate + * is a {@link StreamResourceWriter} that performs the actual writing to the stream. + * + * @param delegate the delegate {@code InputStreamFactory} + */ + ConcurrentStreamResourceWriter(StreamResourceWriter delegate) { + this.delegate = delegate; + } + + /** + * Sets the timeout for acquiring a permit to start a download when there are not enough permits + * available in the semaphore. + * + * @see GridExporter#setConcurrentDownloadTimeout(long, TimeUnit) + * @return the timeout in nanoseconds. + */ + public abstract long getTimeout(); + + /** + * Returns the cost of this download. + * + * Note that the method is not called under the session lock. It means that if implementation + * requires access to the application/session data then the session has to be locked explicitly. + * + * @param session vaadin session + * @see GridExporter#setConcurrentDownloadCost(float) + */ + public float getCost(VaadinSession session) { + return DEFAULT_COST; + } + + /** + * Callback method that is invoked when a timeout occurs while trying to acquire a permit for + * starting a download. + *
+ * Implementations can use this method to perform any necessary actions in response to the + * timeout, such as logging a warning or notifying the user. + *
+ */ + protected abstract void onTimeout(); + + /** + * Handles {@code stream} (writes data to it) using {@code session} as a context. + *+ * Note that the method is not called under the session lock. It means that if implementation + * requires access to the application/session data then the session has to be locked explicitly. + *
+ * If a semaphore has been set, it controls access to this method, enforcing a timeout. A permit
+ * will be acquired from the semaphore, if one becomes available within the given waiting time and
+ * the current thread has not been {@linkplain Thread#interrupt interrupted}.
+ *
+ * @param stream data output stream
+ * @param session vaadin session
+ * @throws IOException if an IO error occurred
+ * @throws InterruptedIOException if the current thread is interrupted
+ * @throws InterruptedByTimeoutException if the waiting time elapsed before a permit was acquired
+ */
+ @Override
+ public final void accept(OutputStream stream, VaadinSession session) throws IOException {
+
+ if (!enabled) {
+ delegate.accept(stream, session);
+ } else {
+
+ try {
+
+ int permits;
+ float cost = getCost(session);
+ synchronized (semaphore) {
+ permits = costToPermits(cost, semaphore.maxPermits);
+ }
+
+ if (semaphore.tryAcquire(permits, getTimeout(), TimeUnit.NANOSECONDS)) {
+ try {
+ delegate.accept(stream, session);
+ } finally {
+ semaphore.release(permits);
+ }
+ } else {
+ onTimeout();
+ throw new InterruptedByTimeoutException();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw (IOException) new InterruptedIOException().initCause(e);
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java
index f62b2dd..0bc6829 100644
--- a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java
+++ b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java
@@ -32,9 +32,13 @@
import com.vaadin.flow.data.renderer.BasicRenderer;
import com.vaadin.flow.data.renderer.LitRenderer;
import com.vaadin.flow.data.renderer.Renderer;
+import com.vaadin.flow.function.SerializableConsumer;
import com.vaadin.flow.function.SerializableSupplier;
import com.vaadin.flow.function.ValueProvider;
import com.vaadin.flow.server.StreamResource;
+import com.vaadin.flow.server.StreamResourceWriter;
+import com.vaadin.flow.server.VaadinSession;
+import com.vaadin.flow.shared.Registration;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
@@ -47,7 +51,10 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,6 +69,22 @@ public class GridExporter
+ * This method is called when a timeout occurs during a concurrent download. It creates a
+ * {@link ConcurrentDownloadTimeoutEvent} and notifies all instance and global listeners. If any
+ * listener stops the event propagation, subsequent listeners will not be notified.
+ */
+ private void fireConcurrentDownloadTimeout() {
+ if (!instanceDownloadTimeoutListeners.isEmpty() || !globalDownloadTimeoutListeners.isEmpty()) {
+ grid.getUI().ifPresent(ui -> ui.access(() -> {
+ ConcurrentDownloadTimeoutEvent ev = new ConcurrentDownloadTimeoutEvent(GridExporter.this);
+ Stream.concat(instanceDownloadTimeoutListeners.stream(),
+ globalDownloadTimeoutListeners.stream()).forEach(listener -> {
+ if (!ev.isPropagationStopped()) {
+ listener.accept(ev);
+ }
+ });
+ }));
+ }
+ }
+
+ /**
+ * Adds a listener for concurrent download timeout events specific to this instance.
+ *
+ * The listener will be called whenever a concurrent download timeout event occurs.
+ *
+ * @param listener the listener to be added
+ * @return a {@link Registration} object that can be used to remove the listener
+ */
+ public Registration addConcurrentDownloadTimeoutEvent(
+ SerializableConsumer
+ * The listener will be called whenever a concurrent download timeout event occurs.
+ *
+ * Note that instance-specific listeners take precedence over global listeners. If an instance
+ * listener stops the event propagation by calling
+ * {@link ConcurrentDownloadTimeoutEvent#stopPropagation() stopPropagation()}, the global
+ * listeners will not be notified.
+ *
+ * @param listener the listener to be added
+ * @return a {@link Registration} object that can be used to remove the listener
+ */
+ public static Registration addGlobalConcurrentDownloadTimeoutEvent(
+ SerializableConsumer
+ * Finite limits are capped to {@link #MAX_COST} (32767). If the limit is
+ * {@link Float#POSITIVE_INFINITY POSITIVE_INFINITY}, concurrent downloads will not be limited.
+ *
+ * @param limit the maximum cost of concurrent downloads allowed
+ * @throws IllegalArgumentException if the limit is zero or negative.
+ */
+ public static void setConcurrentDownloadLimit(float limit) {
+ ConcurrentStreamResourceWriter.setLimit(limit);
+ }
+
+ /**
+ * Returns the limit for the number of concurrent downloads.
+ *
+ * @return the limit for the number of concurrent downloads, or {@link Float#POSITIVE_INFINITY} if
+ * concurrent downloads are not limited.
+ */
+ public static float getConcurrentDownloadLimit() {
+ return ConcurrentStreamResourceWriter.getLimit();
+ }
+
+ /**
+ * Sets the timeout for acquiring a permit to start a download when the
+ * {@linkplain #setConcurrentDownloadLimit(int) maximum number of concurrent downloads} is
+ * reached. If the timeout is less than or equal to zero, the downloads will fail immediately if
+ * no enough permits can be acquired.
+ *
+ * This timeout is crucial for preventing the system from hanging indefinitely while waiting for
+ * available resources. If the timeout expires before a permit can be acquired, the download is
+ * cancelled.
+ *
+ * @param timeout the maximum time to wait for a permit
+ * @param unit the time unit of the {@code timeout} argument
+ */
+ public static void setConcurrentDownloadTimeout(long timeout, TimeUnit unit) {
+ GridExporter.concurrentDownloadTimeoutNanos = unit.toNanos(timeout);
+ }
+
+ /**
+ * Sets the cost for concurrent downloads. This cost is used to determine the number of permits
+ * required for downloads to proceed, thereby controlling the concurrency level. At any given
+ * time, the sum of the costs of all concurrent downloads will not exceed the limit set by
+ * {@link #setConcurrentDownloadLimit(float)}.
+ *
+ *
+ * The cost is represented as a float to allow for more granular control over resource usage. By
+ * using a floating-point number, fractional costs can be expressed, providing flexibility in
+ * determining the resource consumption for different downloads.
+ *
+ *
+ * The cost is converted to a number of permits by capping it to stay within the limit. A cost of
+ * 1.0 ({@link #DEFAULT_COST}) represents a standard unit of resource usage, while a cost of 0.5
+ * represents half a unit, and a cost above 1.0 indicates higher than normal resource usage.
+ *
+ *
+ * If the cost is zero or negative, no permits are needed. However, any positive cost, no matter
+ * how small, will require at least one permit to prevent downloads with very low costs from
+ * bypassing the semaphore. {@link #MIN_COST} represents the minimal fractional cost that acquires
+ * only one permit (hence {@code 2*MIN_COST} acquires two permits and so on). A cost of
+ * {@link #MAX_COST} prevents any other downloads from acquiring permits simultaneously.
+ *
+ * @param concurrentDownloadCost the cost associated with concurrent downloads for this instance.
+ */
+ public void setConcurrentDownloadCost(float concurrentDownloadCost) {
+ this.concurrentDownloadCost = concurrentDownloadCost;
}
public String getTitle() {
diff --git a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/ConfigurableConcurrentStreamResourceWriter.java b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/ConfigurableConcurrentStreamResourceWriter.java
new file mode 100644
index 0000000..8a3d685
--- /dev/null
+++ b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/ConfigurableConcurrentStreamResourceWriter.java
@@ -0,0 +1,35 @@
+package com.flowingcode.vaadin.addons.gridexporter;
+
+import com.vaadin.flow.server.StreamResourceWriter;
+import com.vaadin.flow.server.VaadinSession;
+
+@SuppressWarnings("serial")
+public abstract class ConfigurableConcurrentStreamResourceWriter
+ extends ConcurrentStreamResourceWriter {
+
+ public ConfigurableConcurrentStreamResourceWriter(StreamResourceWriter delegate) {
+ super(delegate);
+ }
+
+ private float cost = GridExporter.DEFAULT_COST;
+ private long timeout = 0L;
+
+ @Override
+ public float getCost(VaadinSession session) {
+ return cost;
+ }
+
+ public void setCost(float cost) {
+ this.cost = cost;
+ }
+
+ @Override
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+}
diff --git a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/GridExporterBigDatasetDemo.java b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/GridExporterBigDatasetDemo.java
index 0d79858..65a303d 100644
--- a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/GridExporterBigDatasetDemo.java
+++ b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/GridExporterBigDatasetDemo.java
@@ -19,6 +19,15 @@
*/
package com.flowingcode.vaadin.addons.gridexporter;
+import com.flowingcode.vaadin.addons.demo.DemoSource;
+import com.flowingcode.vaadin.addons.demo.SourceCodeViewer;
+import com.github.javafaker.Faker;
+import com.vaadin.flow.component.Html;
+import com.vaadin.flow.component.grid.Grid;
+import com.vaadin.flow.component.grid.Grid.Column;
+import com.vaadin.flow.component.html.Div;
+import com.vaadin.flow.router.PageTitle;
+import com.vaadin.flow.router.Route;
import java.io.IOException;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
@@ -27,15 +36,9 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.poi.EncryptedDocumentException;
-import com.flowingcode.vaadin.addons.demo.DemoSource;
-import com.github.javafaker.Faker;
-import com.vaadin.flow.component.grid.Grid;
-import com.vaadin.flow.component.grid.Grid.Column;
-import com.vaadin.flow.component.html.Div;
-import com.vaadin.flow.router.PageTitle;
-import com.vaadin.flow.router.Route;
@DemoSource
+@DemoSource("/src/test/java/com/flowingcode/vaadin/addons/gridexporter/VaadinServiceInitListenerImpl.java")
@PageTitle("Grid Exporter Addon Big Dataset Demo")
@Route(value = "gridexporter/bigdataset", layout = GridExporterDemoView.class)
@SuppressWarnings("serial")
@@ -68,7 +71,7 @@ public GridExporterBigDatasetDemo() throws EncryptedDocumentException, IOExcepti
}).collect(Collectors.toList());
grid.setItems(query->persons.stream().skip(query.getOffset()).limit(query.getLimit()));
grid.setWidthFull();
- this.setSizeFull();
+ setSizeFull();
GridExporter
+
+ Additionally, setConcurrentDownloadTimeout
enforces a timeout for acquiring the necessary permits
+ during a download operation. If the permits are not obtained within the specified timeframe, the download
+ request will be aborted and the DownloadTimeoutEvent
listener will execute, preventing prolonged
+ waiting periods, especially during peak system loads.
+