From f770996898c0a1cfa431e5994724fd16348e41df Mon Sep 17 00:00:00 2001 From: Javier Godoy <11554739+javier-godoy@users.noreply.github.com> Date: Sat, 8 Jun 2024 22:48:50 -0300 Subject: [PATCH 1/7] build: upgrade compiler target to Java 17 --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 04702c9..225578b 100644 --- a/pom.xml +++ b/pom.xml @@ -11,8 +11,8 @@ 24.3.9 4.1.2 - 11 - 11 + 17 + 17 UTF-8 UTF-8 ${project.basedir}/drivers From e19e77b3c576a6cb879efd47dd40cd5f05a45d09 Mon Sep 17 00:00:00 2001 From: Javier Godoy <11554739+javier-godoy@users.noreply.github.com> Date: Sat, 8 Jun 2024 22:49:28 -0300 Subject: [PATCH 2/7] feat: implement concurrent export limit Close #117 --- .../ConcurrentStreamResourceWriter.java | 197 ++++++++++++++++++ .../addons/gridexporter/GridExporter.java | 117 ++++++++++- 2 files changed, 311 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentStreamResourceWriter.java diff --git a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentStreamResourceWriter.java b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentStreamResourceWriter.java new file mode 100644 index 0000000..6e00420 --- /dev/null +++ b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentStreamResourceWriter.java @@ -0,0 +1,197 @@ +package com.flowingcode.vaadin.addons.gridexporter; + +import com.vaadin.flow.server.StreamResourceWriter; +import com.vaadin.flow.server.VaadinSession; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.nio.channels.InterruptedByTimeoutException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.function.IntFunction; + +/** + * An implementation of {@link StreamResourceWriter} that controls access to the + * {@link #accept(OutputStream, VaadinSession) accept} method using a semaphore to manage + * concurrency. + * + * @author Javier Godoy + */ +@SuppressWarnings("serial") +abstract class ConcurrentStreamResourceWriter implements StreamResourceWriter { + + public static final float MAX_COST = 0x7FFF; + + public static final float MIN_COST = 1.0f / 0x10000; + + public static final float DEFAULT_COST = 1.0f; + + private static final ConfigurableSemaphore semaphore = new ConfigurableSemaphore(); + + private static volatile boolean enabled; + + private final StreamResourceWriter delegate; + + private static final class ConfigurableSemaphore extends Semaphore { + + private int maxPermits; + + ConfigurableSemaphore() { + super(0); + } + + synchronized void setPermits(int permits) { + if (permits < 0) { + throw new IllegalArgumentException(); + } + int delta = permits - maxPermits; + if (delta > 0) { + super.release(delta); + } else if (delta < 0) { + super.reducePermits(-delta); + } + maxPermits = permits; + } + + @Override + public String toString() { + IntFunction str = permits -> { + float f = permits / (float) 0x10000; + return f == Math.floor(f) ? String.format("%.0f", f) : Float.toString(f); + }; + return "Semaphore[" + str.apply(availablePermits()) + "/" + str.apply(maxPermits) + "]"; + } + + } + + /** + * Sets the limit for the cost of concurrent downloads. + *

+ * Finite limits are capped to {@link #MAX_COST} (32767). If the limit is + * {@link Float#POSITIVE_INFINITY POSITIVE_INFINITY}, the semaphore will not be used for + * controlling concurrent downloads. + * + * @param limit the maximum cost of concurrent downloads allowed + * @throws IllegalArgumentException if the limit is zero or negative. + */ + public static void setLimit(float limit) { + if (limit <= 0) { + throw new IllegalArgumentException(); + } + if (Float.isInfinite(limit)) { + enabled = false; + return; + } + + synchronized (semaphore) { + enabled = true; + semaphore.setPermits(costToPermits(limit, Integer.MAX_VALUE)); + } + } + + /** + * Returns the limit for the number of concurrent downloads. + * + * @return the limit for the number of concurrent downloads, or {@link Float#POSITIVE_INFINITY} if + * the semaphore is disabled. + */ + public static float getLimit() { + if (enabled) { + synchronized (semaphore) { + return (float) semaphore.maxPermits / 0x10000; + } + } else { + return Float.POSITIVE_INFINITY; + } + } + + private static int costToPermits(float cost, int maxPermits) { + // restrict limit to 0x7fff to ensure the cost can be represented + // using fixed-point arithmetic with 16 fractional digits and 15 integral digits + cost = Math.min(cost, MAX_COST); + // Determine the number of permits required based on the cost, capping at maxPermits. + // If the cost is zero or negative, no permits are needed. + // Any positive cost, no matter how small, will require at least one permit. + return cost <= 0 ? 0 : Math.max(Math.min((int) (cost * 0x10000), maxPermits), 1); + } + + /** + * Constructs a {@code ConcurrentStreamResourceWriter} with the specified delegate. The delegate + * is a {@link StreamResourceWriter} that performs the actual writing to the stream. + * + * @param delegate the delegate {@code InputStreamFactory} + */ + ConcurrentStreamResourceWriter(StreamResourceWriter delegate) { + this.delegate = delegate; + } + + /** + * Sets the timeout for acquiring a permit to start a download when there are not enough permits + * available in the semaphore. + * + * @see GridExporter#setConcurrentDownloadTimeout(long, TimeUnit) + * @return the timeout in nanoseconds. + */ + public abstract long getTimeout(); + + /** + * Returns the cost of this download. + * + * Note that the method is not called under the session lock. It means that if implementation + * requires access to the application/session data then the session has to be locked explicitly. + * + * @param session vaadin session + * @see GridExporter#setConcurrentDownloadCost(float) + */ + public float getCost(VaadinSession session) { + return DEFAULT_COST; + } + + /** + * Handles {@code stream} (writes data to it) using {@code session} as a context. + *

+ * Note that the method is not called under the session lock. It means that if implementation + * requires access to the application/session data then the session has to be locked explicitly. + *

+ * If a semaphore has been set, it controls access to this method, enforcing a timeout. A permit + * will be acquired from the semaphore, if one becomes available within the given waiting time and + * the current thread has not been {@linkplain Thread#interrupt interrupted}. + * + * @param stream data output stream + * @param session vaadin session + * @throws IOException if an IO error occurred + * @throws InterruptedIOException if the current thread is interrupted + * @throws InterruptedByTimeoutException if the waiting time elapsed before a permit was acquired + */ + @Override + public final void accept(OutputStream stream, VaadinSession session) throws IOException { + + if (!enabled) { + delegate.accept(stream, session); + } else { + + try { + + int permits; + float cost = getCost(session); + synchronized (semaphore) { + permits = costToPermits(cost, semaphore.maxPermits); + } + + if (semaphore.tryAcquire(permits, getTimeout(), TimeUnit.NANOSECONDS)) { + try { + delegate.accept(stream, session); + } finally { + semaphore.release(permits); + } + } else { + throw new InterruptedByTimeoutException(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw (IOException) new InterruptedIOException().initCause(e); + } + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java index f62b2dd..4fad17a 100644 --- a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java +++ b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java @@ -35,6 +35,8 @@ import com.vaadin.flow.function.SerializableSupplier; import com.vaadin.flow.function.ValueProvider; import com.vaadin.flow.server.StreamResource; +import com.vaadin.flow.server.StreamResourceWriter; +import com.vaadin.flow.server.VaadinSession; import java.io.Serializable; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; @@ -47,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +65,18 @@ public class GridExporter implements Serializable { private boolean csvExportEnabled = true; private boolean autoSizeColumns = true; + /** Represents all the permits available to the semaphore. */ + public static final float MAX_COST = ConcurrentStreamResourceWriter.MAX_COST; + + /** A fractional cost that acquires only one permit. */ + public static final float MIN_COST = ConcurrentStreamResourceWriter.MIN_COST; + + /** The standard unit of resource usage for concurrent downloads. */ + public static final float DEFAULT_COST = 1.0f; + + private static long concurrentDownloadTimeoutNanos = 0L; + private float concurrentDownloadCost = DEFAULT_COST; + static final String COLUMN_VALUE_PROVIDER_DATA = "column-value-provider-data"; static final String COLUMN_EXPORTED_PROVIDER_DATA = "column-value-exported-data"; static final String COLUMN_PARSING_FORMAT_PATTERN_DATA = "column-parsing-format-pattern-data"; @@ -268,7 +283,8 @@ public StreamResource getDocxStreamResource() { } public StreamResource getDocxStreamResource(String template) { - return new StreamResource(fileName + ".docx", new DocxStreamResourceWriter<>(this, template)); + return new StreamResource(fileName + ".docx", + makeConcurrentWriter(new DocxStreamResourceWriter<>(this, template))); } public StreamResource getPdfStreamResource() { @@ -276,7 +292,8 @@ public StreamResource getPdfStreamResource() { } public StreamResource getPdfStreamResource(String template) { - return new StreamResource(fileName + ".pdf", new PdfStreamResourceWriter<>(this, template)); + return new StreamResource(fileName + ".pdf", + makeConcurrentWriter(new PdfStreamResourceWriter<>(this, template))); } public StreamResource getCsvStreamResource() { @@ -288,7 +305,101 @@ public StreamResource getExcelStreamResource() { } public StreamResource getExcelStreamResource(String template) { - return new StreamResource(fileName + ".xlsx", new ExcelStreamResourceWriter<>(this, template)); + return new StreamResource(fileName + ".xlsx", + makeConcurrentWriter(new ExcelStreamResourceWriter<>(this, template))); + } + + private StreamResourceWriter makeConcurrentWriter(StreamResourceWriter writer) { + return new ConcurrentStreamResourceWriter(writer) { + @Override + public float getCost(VaadinSession session) { + return concurrentDownloadCost; + } + + @Override + public long getTimeout() { + // It would have been possible to specify a different timeout for each instance but I cannot + // figure out a good use case for that. The timeout returned herebecomes relevant when the + // semaphore has been acquired by any other download, so the timeout must reflect how long + // it is reasonable to wait for "any other download" to complete and release the semaphore. + // + // Since the reasonable timeout would depend on the duration of "any other download", it + // makes sense that it's a global setting instead of a per-instance setting. + return concurrentDownloadTimeoutNanos; + } + + }; + } + + /** + * Sets the limit for the {@linkplain #setConcurrentDownloadCost(float) cost of concurrent + * downloads}. If all the downloads have a cost of {@link #DEFAULT_COST}, the limit represents the + * number of concurrent downloads that are allowed. + *

+ * Finite limits are capped to {@link #MAX_COST} (32767). If the limit is + * {@link Float#POSITIVE_INFINITY POSITIVE_INFINITY}, concurrent downloads will not be limited. + * + * @param limit the maximum cost of concurrent downloads allowed + * @throws IllegalArgumentException if the limit is zero or negative. + */ + public static void setConcurrentDownloadLimit(float limit) { + ConcurrentStreamResourceWriter.setLimit(limit); + } + + /** + * Returns the limit for the number of concurrent downloads. + * + * @return the limit for the number of concurrent downloads, or {@link Float#POSITIVE_INFINITY} if + * concurrent downloads are not limited. + */ + public static float getConcurrentDownloadLimit() { + return ConcurrentStreamResourceWriter.getLimit(); + } + + /** + * Sets the timeout for acquiring a permit to start a download when the + * {@linkplain #setConcurrentDownloadLimit(int) maximum number of concurrent downloads} is + * reached. If the timeout is less than or equal to zero, the downloads will fail immediately if + * no enough permits can be acquired. + * + * This timeout is crucial for preventing the system from hanging indefinitely while waiting for + * available resources. If the timeout expires before a permit can be acquired, the download is + * cancelled. + * + * @param timeout the maximum time to wait for a permit + * @param unit the time unit of the {@code timeout} argument + */ + public static void setConcurrentDownloadTimeout(long timeout, TimeUnit unit) { + GridExporter.concurrentDownloadTimeoutNanos = unit.toNanos(timeout); + } + + /** + * Sets the cost for concurrent downloads. This cost is used to determine the number of permits + * required for downloads to proceed, thereby controlling the concurrency level. At any given + * time, the sum of the costs of all concurrent downloads will not exceed the limit set by + * {@link #setConcurrentDownloadLimit(float)}. + *

+ * + * The cost is represented as a float to allow for more granular control over resource usage. By + * using a floating-point number, fractional costs can be expressed, providing flexibility in + * determining the resource consumption for different downloads. + *

+ * + * The cost is converted to a number of permits by capping it to stay within the limit. A cost of + * 1.0 ({@link #DEFAULT_COST}) represents a standard unit of resource usage, while a cost of 0.5 + * represents half a unit, and a cost above 1.0 indicates higher than normal resource usage. + *

+ * + * If the cost is zero or negative, no permits are needed. However, any positive cost, no matter + * how small, will require at least one permit to prevent downloads with very low costs from + * bypassing the semaphore. {@link #MIN_COST} represents the minimal fractional cost that acquires + * only one permit (hence {@code 2*MIN_COST} acquires two permits and so on). A cost of + * {@link #MAX_COST} prevents any other downloads from acquiring permits simultaneously. + * + * @param concurrentDownloadCost the cost associated with concurrent downloads for this instance. + */ + public void setConcurrentDownloadCost(float concurrentDownloadCost) { + this.concurrentDownloadCost = concurrentDownloadCost; } public String getTitle() { From 86b1d0eba1007b41eee8057e8f69d9734b43330c Mon Sep 17 00:00:00 2001 From: Javier Godoy <11554739+javier-godoy@users.noreply.github.com> Date: Sat, 8 Jun 2024 22:49:53 -0300 Subject: [PATCH 3/7] test: add unit test for concurrent limit --- ...gurableConcurrentStreamResourceWriter.java | 35 +++ .../test/ConcurrentExportTests.java | 285 ++++++++++++++++++ 2 files changed, 320 insertions(+) create mode 100644 src/test/java/com/flowingcode/vaadin/addons/gridexporter/ConfigurableConcurrentStreamResourceWriter.java create mode 100644 src/test/java/com/flowingcode/vaadin/addons/gridexporter/test/ConcurrentExportTests.java diff --git a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/ConfigurableConcurrentStreamResourceWriter.java b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/ConfigurableConcurrentStreamResourceWriter.java new file mode 100644 index 0000000..8a3d685 --- /dev/null +++ b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/ConfigurableConcurrentStreamResourceWriter.java @@ -0,0 +1,35 @@ +package com.flowingcode.vaadin.addons.gridexporter; + +import com.vaadin.flow.server.StreamResourceWriter; +import com.vaadin.flow.server.VaadinSession; + +@SuppressWarnings("serial") +public abstract class ConfigurableConcurrentStreamResourceWriter + extends ConcurrentStreamResourceWriter { + + public ConfigurableConcurrentStreamResourceWriter(StreamResourceWriter delegate) { + super(delegate); + } + + private float cost = GridExporter.DEFAULT_COST; + private long timeout = 0L; + + @Override + public float getCost(VaadinSession session) { + return cost; + } + + public void setCost(float cost) { + this.cost = cost; + } + + @Override + public long getTimeout() { + return timeout; + } + + public void setTimeout(long timeout) { + this.timeout = timeout; + } + +} diff --git a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/test/ConcurrentExportTests.java b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/test/ConcurrentExportTests.java new file mode 100644 index 0000000..353e1e2 --- /dev/null +++ b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/test/ConcurrentExportTests.java @@ -0,0 +1,285 @@ +package com.flowingcode.vaadin.addons.gridexporter.test; + +import static org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; +import com.flowingcode.vaadin.addons.gridexporter.ConfigurableConcurrentStreamResourceWriter; +import com.flowingcode.vaadin.addons.gridexporter.GridExporter; +import com.vaadin.flow.server.StreamResourceWriter; +import com.vaadin.flow.server.VaadinService; +import com.vaadin.flow.server.VaadinServletService; +import com.vaadin.flow.server.VaadinSession; +import java.io.IOException; +import java.nio.channels.InterruptedByTimeoutException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Exchanger; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.hamcrest.Matcher; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +@SuppressWarnings("serial") +public class ConcurrentExportTests { + + private static final int TEST_TIMEOUT = 5000; + + private static Matcher interruptedByTimeout() { + return Matchers.instanceOf(InterruptedByTimeoutException.class); + } + + private class ConcurrentStreamResourceWriter + extends ConfigurableConcurrentStreamResourceWriter { + + public ConcurrentStreamResourceWriter(StreamResourceWriter delegate) { + super(delegate); + } + + } + + private CyclicBarrier barrier; + + private void initializeCyclicBarrier(int parties) { + barrier = new CyclicBarrier(parties); + } + + @Before + public void before() { + barrier = null; + } + + @SuppressWarnings("unchecked") + private static void sneakyThrow(Throwable e) throws E { + throw (E) e; + } + + private interface MockDownload { + MockDownload withTimeout(long timeout); + + MockDownload withCost(float cost); + + Throwable get() throws InterruptedException; + + MockDownload await() throws InterruptedException; + + MockDownload start(); + } + + private MockDownload newDownload() { + + CountDownLatch latch = new CountDownLatch(1); + + ConcurrentStreamResourceWriter writer = + new ConcurrentStreamResourceWriter((stream, session) -> { + latch.countDown(); + await(barrier); + }); + + Exchanger exchanger = new Exchanger<>(); + + Thread thread = new Thread(() -> { + + Throwable throwable = null; + try { + writer.accept(NULL_OUTPUT_STREAM, createSession()); + } catch (Throwable t) { + throwable = t; + } + + latch.countDown(); + try { + exchanger.exchange(throwable); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + return new MockDownload() { + @Override + public Throwable get() throws InterruptedException { + if (thread.getState() == Thread.State.NEW) { + throw new IllegalStateException("Download has not started"); + } + return exchanger.exchange(null); + } + + @Override + public MockDownload start() { + if (thread.getState() == Thread.State.NEW) { + thread.start(); + } + + try { + latch.await(1, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + sneakyThrow(e); + } + + return this; + } + + @Override + public MockDownload await() throws InterruptedException { + if (thread.getState() == Thread.State.NEW) { + thread.start(); + } + latch.await(); + return this; + } + + @Override + public MockDownload withTimeout(long timeout) { + writer.setTimeout(TimeUnit.MILLISECONDS.toNanos(timeout)); + return this; + } + + @Override + public MockDownload withCost(float cost) { + writer.setCost(cost); + return this; + } + }; + } + + private static void await(CyclicBarrier barrier) { + try { + barrier.await(); + } catch (Exception e) { + sneakyThrow(e); + } + } + + private VaadinSession createSession() { + Lock lock = new ReentrantLock(); + VaadinService service = new VaadinServletService(null, null); + return new VaadinSession(service) { + @Override + public Lock getLockInstance() { + return lock; + } + }; + } + + @Test + public void testSetLimit() throws IOException { + + float[] costs = new float[] {0.5f, 1, 2, ConcurrentStreamResourceWriter.MIN_COST, + 2 * ConcurrentStreamResourceWriter.MIN_COST, ConcurrentStreamResourceWriter.MAX_COST, + Float.POSITIVE_INFINITY}; + + // increment permits + for (float cost : costs) { + ConcurrentStreamResourceWriter.setLimit(cost); + Assert.assertEquals(cost, ConcurrentStreamResourceWriter.getLimit(), 0); + } + + // shrink permits + for (int i = costs.length; i-- > 0;) { + ConcurrentStreamResourceWriter.setLimit(costs[i]); + Assert.assertEquals(costs[i], ConcurrentStreamResourceWriter.getLimit(), 0); + } + + //finite costs are capped to MAX_COST + ConcurrentStreamResourceWriter.setLimit(0x10000); + Assert.assertEquals(GridExporter.MAX_COST, ConcurrentStreamResourceWriter.getLimit(), 0); + + //Any positive cost, no matter how small, will require at least one permit. + ConcurrentStreamResourceWriter.setLimit(Float.MIN_NORMAL); + Assert.assertEquals(GridExporter.MIN_COST, ConcurrentStreamResourceWriter.getLimit(), 0); + + ConcurrentStreamResourceWriter.setLimit(0.99f / 0x10000); + Assert.assertEquals(GridExporter.MIN_COST, ConcurrentStreamResourceWriter.getLimit(), 0); + } + + @Test(expected = IllegalArgumentException.class) + public void testSetLimitWithZero() { + ConcurrentStreamResourceWriter.setLimit(0); + } + + @Test(expected = IllegalArgumentException.class) + public void testSetLimitWithNegative() { + ConcurrentStreamResourceWriter.setLimit(-1); + } + + @Test(timeout = TEST_TIMEOUT) + public void testUnlimitedDownloads() + throws InterruptedException { + ConcurrentStreamResourceWriter.setLimit(Float.POSITIVE_INFINITY); + initializeCyclicBarrier(2); + + var q1 = newDownload().await(); + var q2 = newDownload().await(); + + assertThat(q1.get(), nullValue()); + assertThat(q2.get(), nullValue()); + } + + @Test(timeout = TEST_TIMEOUT) + public void testConcurrentSuccess() + throws InterruptedException { + ConcurrentStreamResourceWriter.setLimit(2); + initializeCyclicBarrier(2); + + var q1 = newDownload().await(); + var q2 = newDownload().await(); + + assertThat(q1.get(), nullValue()); + assertThat(q2.get(), nullValue()); + } + + @Test(timeout = TEST_TIMEOUT) + public void testInterruptedByTimeout1() + throws InterruptedException { + ConcurrentStreamResourceWriter.setLimit(1); + initializeCyclicBarrier(2); + + var q1 = newDownload().await(); + var q2 = newDownload().start(); + assertThat(barrier.getNumberWaiting(), equalTo(1)); + await(barrier); + + assertThat(q1.get(), nullValue()); + assertThat(q2.get(), interruptedByTimeout()); + } + + + @Test(timeout = TEST_TIMEOUT) + public void testInterruptedByTimeout2() + throws InterruptedException { + ConcurrentStreamResourceWriter.setLimit(2); + initializeCyclicBarrier(3); + + var q1 = newDownload().await(); + var q2 = newDownload().await(); + var q3 = newDownload().withCost(2).start(); + assertThat(barrier.getNumberWaiting(), equalTo(2)); + await(barrier); + + assertThat(q1.get(), nullValue()); + assertThat(q2.get(), nullValue()); + assertThat(q3.get(), interruptedByTimeout()); + } + + @Test(timeout = TEST_TIMEOUT) + public void testInterruptedByTimeout3() + throws InterruptedException { + ConcurrentStreamResourceWriter.setLimit(2); + initializeCyclicBarrier(2); + + var q1 = newDownload().withCost(2).await(); + var q2 = newDownload().start(); + var q3 = newDownload().start(); + assertThat(barrier.getNumberWaiting(), equalTo(1)); + await(barrier); + + assertThat(q1.get(), nullValue()); + assertThat(q2.get(), interruptedByTimeout()); + assertThat(q3.get(), interruptedByTimeout()); + } + +} \ No newline at end of file From 268b5ae05d362dc6b206dd359a573695af28a670 Mon Sep 17 00:00:00 2001 From: Javier Godoy <11554739+javier-godoy@users.noreply.github.com> Date: Sat, 8 Jun 2024 22:50:18 -0300 Subject: [PATCH 4/7] feat(demo): add concurrent limit to BigDataset demo --- .../GridExporterBigDatasetDemo.java | 42 +++++++++++++++---- .../VaadinServiceInitListenerImpl.java | 15 +++++++ ...adin.flow.server.VaadinServiceInitListener | 1 + 3 files changed, 50 insertions(+), 8 deletions(-) create mode 100644 src/test/java/com/flowingcode/vaadin/addons/gridexporter/VaadinServiceInitListenerImpl.java create mode 100644 src/test/resources/META-INF/services/com.vaadin.flow.server.VaadinServiceInitListener diff --git a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/GridExporterBigDatasetDemo.java b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/GridExporterBigDatasetDemo.java index 0d79858..7e079de 100644 --- a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/GridExporterBigDatasetDemo.java +++ b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/GridExporterBigDatasetDemo.java @@ -19,6 +19,15 @@ */ package com.flowingcode.vaadin.addons.gridexporter; +import com.flowingcode.vaadin.addons.demo.DemoSource; +import com.flowingcode.vaadin.addons.demo.SourceCodeViewer; +import com.github.javafaker.Faker; +import com.vaadin.flow.component.Html; +import com.vaadin.flow.component.grid.Grid; +import com.vaadin.flow.component.grid.Grid.Column; +import com.vaadin.flow.component.html.Div; +import com.vaadin.flow.router.PageTitle; +import com.vaadin.flow.router.Route; import java.io.IOException; import java.math.BigDecimal; import java.text.SimpleDateFormat; @@ -27,15 +36,9 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.poi.EncryptedDocumentException; -import com.flowingcode.vaadin.addons.demo.DemoSource; -import com.github.javafaker.Faker; -import com.vaadin.flow.component.grid.Grid; -import com.vaadin.flow.component.grid.Grid.Column; -import com.vaadin.flow.component.html.Div; -import com.vaadin.flow.router.PageTitle; -import com.vaadin.flow.router.Route; @DemoSource +@DemoSource("/src/test/java/com/flowingcode/vaadin/addons/gridexporter/VaadinServiceInitListenerImpl.java") @PageTitle("Grid Exporter Addon Big Dataset Demo") @Route(value = "gridexporter/bigdataset", layout = GridExporterDemoView.class) @SuppressWarnings("serial") @@ -68,7 +71,7 @@ public GridExporterBigDatasetDemo() throws EncryptedDocumentException, IOExcepti }).collect(Collectors.toList()); grid.setItems(query->persons.stream().skip(query.getOffset()).limit(query.getLimit())); grid.setWidthFull(); - this.setSizeFull(); + setSizeFull(); GridExporter exporter = GridExporter.createFor(grid); exporter.setAutoSizeColumns(false); exporter.setExportValue(budgetCol, item -> "" + item.getBudget()); @@ -76,6 +79,29 @@ public GridExporterBigDatasetDemo() throws EncryptedDocumentException, IOExcepti exporter.setTitle("People information"); exporter.setFileName( "GridExport" + new SimpleDateFormat("yyyyddMM").format(Calendar.getInstance().getTime())); + + // begin-block concurrent + // #if vaadin eq 0 + Html concurrent = new Html( + """ +

+ This configuration prepares the exporter for the BigDataset demo, enabling it to manage resource-intensive + document generation tasks effectively. In this setup, an upper limit of 10 is established for the cost of + concurrent downloads, and the big dataset exporter is configured with a cost of 9, while other exporters + handling smaller datasets retain the default cost of 1. This customization allows a combination of one large + dataset download alongside one small dataset download, or up to 10 concurrent downloads of smaller datasets + when no big dataset is being exported.

+ + Additionally, setConcurrentDownloadTimeout enforces a timeout for acquiring the necessary permits + during a download operation. If the permits are not obtained within the specified timeframe, the download + request will be aborted, preventing prolonged waiting periods, especially during peak system loads. +

"""); + add(concurrent); + // #endif + SourceCodeViewer.highlightOnHover(concurrent, "concurrent"); + exporter.setConcurrentDownloadCost(9); + // end-block + add(grid); } } diff --git a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/VaadinServiceInitListenerImpl.java b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/VaadinServiceInitListenerImpl.java new file mode 100644 index 0000000..9ba348e --- /dev/null +++ b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/VaadinServiceInitListenerImpl.java @@ -0,0 +1,15 @@ +package com.flowingcode.vaadin.addons.gridexporter; + +import com.vaadin.flow.server.ServiceInitEvent; +import com.vaadin.flow.server.VaadinServiceInitListener; +import java.util.concurrent.TimeUnit; + +public class VaadinServiceInitListenerImpl implements VaadinServiceInitListener { + + @Override + public void serviceInit(ServiceInitEvent event) { + GridExporter.setConcurrentDownloadLimit(10); + GridExporter.setConcurrentDownloadTimeout(5, TimeUnit.SECONDS); + } + +} diff --git a/src/test/resources/META-INF/services/com.vaadin.flow.server.VaadinServiceInitListener b/src/test/resources/META-INF/services/com.vaadin.flow.server.VaadinServiceInitListener new file mode 100644 index 0000000..72f5354 --- /dev/null +++ b/src/test/resources/META-INF/services/com.vaadin.flow.server.VaadinServiceInitListener @@ -0,0 +1 @@ +com.flowingcode.vaadin.addons.gridexporter.VaadinServiceInitListenerImpl \ No newline at end of file From e46cd6cb38b3dddde973ef60afe76fa6e20efecf Mon Sep 17 00:00:00 2001 From: Javier Godoy <11554739+javier-godoy@users.noreply.github.com> Date: Sat, 8 Jun 2024 22:51:13 -0300 Subject: [PATCH 5/7] chore: update .gitignore --- .gitignore | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.gitignore b/.gitignore index 407d08e..e2e413d 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,8 @@ types.d.ts frontend/generated/ frontend/index.html src/main/dev-bundle/* +/src/main/bundles +/src/main/dev-bundle +/.flow-node-tasks.lock +/vite.config.ts +/vite.generated.ts From 84e3744297fab6b6de3aa425c60015a042921afc Mon Sep 17 00:00:00 2001 From: Javier Godoy <11554739+javier-godoy@users.noreply.github.com> Date: Sun, 9 Jun 2024 03:07:46 -0300 Subject: [PATCH 6/7] build: set version to 2.4.0-SNAPSHOT --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 225578b..d22e618 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ org.vaadin.addons.flowingcode grid-exporter-addon - 2.3.3-SNAPSHOT + 2.4.0-SNAPSHOT Grid Exporter Add-on Grid Exporter Add-on for Vaadin Flow From 17576de7545808fb9b93b455a617efb77f43a510 Mon Sep 17 00:00:00 2001 From: Javier Godoy <11554739+javier-godoy@users.noreply.github.com> Date: Mon, 10 Jun 2024 14:16:13 -0300 Subject: [PATCH 7/7] feat: add ConcurrentDownloadTimeoutEvent --- pom.xml | 3 + .../ConcurrentDownloadTimeoutEvent.java | 58 ++++++++++++++++ .../ConcurrentStreamResourceWriter.java | 11 +++ .../addons/gridexporter/GridExporter.java | 68 +++++++++++++++++++ .../GridExporterBigDatasetDemo.java | 3 +- .../VaadinServiceInitListenerImpl.java | 6 ++ .../test/ConcurrentExportTests.java | 26 +++++-- 7 files changed, 169 insertions(+), 6 deletions(-) create mode 100644 src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentDownloadTimeoutEvent.java diff --git a/pom.xml b/pom.xml index d22e618..c3946a2 100644 --- a/pom.xml +++ b/pom.xml @@ -301,6 +301,9 @@ jar + + AUTOMATIC + diff --git a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentDownloadTimeoutEvent.java b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentDownloadTimeoutEvent.java new file mode 100644 index 0000000..e3c5c1c --- /dev/null +++ b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentDownloadTimeoutEvent.java @@ -0,0 +1,58 @@ +package com.flowingcode.vaadin.addons.gridexporter; + +import java.util.EventObject; +import java.util.Objects; + +/** + * An event that is fired when a concurrent download timeout occurs in the {@link GridExporter}. + *

+ * This event allows the handler to determine whether the event propagation should be stopped, + * preventing other listeners from processing the event. + *

+ * + * @param the type of the GridExporter source + * @see GridExporter#setConcurrentDownloadTimeout(long, java.util.concurrent.TimeUnit) + */ +@SuppressWarnings("serial") +public class ConcurrentDownloadTimeoutEvent extends EventObject { + + private boolean propagationStopped; + + /** + * Constructs a new ConcurrentDownloadTimeoutEvent. + * + * @param source the {@link GridExporter} that is the source of this event + * @throws IllegalArgumentException if source is null + */ + public ConcurrentDownloadTimeoutEvent(GridExporter source) { + super(Objects.requireNonNull(source)); + } + + /** + * Returns the source of this event. + * + * @return the {@code GridExporter} that is the source of this event + */ + @Override + public GridExporter getSource() { + return (GridExporter) super.getSource(); + } + + /** + * Stops the propagation of this event. When propagation is stopped, other listeners will not be + * notified of this event. + */ + public void stopPropagation() { + propagationStopped = true; + } + + /** + * Checks if the propagation of this event has been stopped. + * + * @return {@code true} if the propagation has been stopped, {@code false} otherwise + * @see #stopPropagation() + */ + public boolean isPropagationStopped() { + return propagationStopped; + } +} diff --git a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentStreamResourceWriter.java b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentStreamResourceWriter.java index 6e00420..88bcc69 100644 --- a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentStreamResourceWriter.java +++ b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentStreamResourceWriter.java @@ -147,6 +147,16 @@ public float getCost(VaadinSession session) { return DEFAULT_COST; } + /** + * Callback method that is invoked when a timeout occurs while trying to acquire a permit for + * starting a download. + *

+ * Implementations can use this method to perform any necessary actions in response to the + * timeout, such as logging a warning or notifying the user. + *

+ */ + protected abstract void onTimeout(); + /** * Handles {@code stream} (writes data to it) using {@code session} as a context. *

@@ -185,6 +195,7 @@ public final void accept(OutputStream stream, VaadinSession session) throws IOEx semaphore.release(permits); } } else { + onTimeout(); throw new InterruptedByTimeoutException(); } } catch (InterruptedException e) { diff --git a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java index 4fad17a..0bc6829 100644 --- a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java +++ b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java @@ -32,11 +32,13 @@ import com.vaadin.flow.data.renderer.BasicRenderer; import com.vaadin.flow.data.renderer.LitRenderer; import com.vaadin.flow.data.renderer.Renderer; +import com.vaadin.flow.function.SerializableConsumer; import com.vaadin.flow.function.SerializableSupplier; import com.vaadin.flow.function.ValueProvider; import com.vaadin.flow.server.StreamResource; import com.vaadin.flow.server.StreamResourceWriter; import com.vaadin.flow.server.VaadinSession; +import com.vaadin.flow.shared.Registration; import java.io.Serializable; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; @@ -49,8 +51,10 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,6 +80,10 @@ public class GridExporter implements Serializable { private static long concurrentDownloadTimeoutNanos = 0L; private float concurrentDownloadCost = DEFAULT_COST; + private final List> instanceDownloadTimeoutListeners = + new CopyOnWriteArrayList<>(); + private static final List> globalDownloadTimeoutListeners = + new CopyOnWriteArrayList<>(); static final String COLUMN_VALUE_PROVIDER_DATA = "column-value-provider-data"; static final String COLUMN_EXPORTED_PROVIDER_DATA = "column-value-exported-data"; @@ -328,9 +336,69 @@ public long getTimeout() { return concurrentDownloadTimeoutNanos; } + @Override + protected void onTimeout() { + fireConcurrentDownloadTimeout(); + } + + }; } + /** + * Handles the timeout event by notifying all registered listeners. + *

+ * This method is called when a timeout occurs during a concurrent download. It creates a + * {@link ConcurrentDownloadTimeoutEvent} and notifies all instance and global listeners. If any + * listener stops the event propagation, subsequent listeners will not be notified. + */ + private void fireConcurrentDownloadTimeout() { + if (!instanceDownloadTimeoutListeners.isEmpty() || !globalDownloadTimeoutListeners.isEmpty()) { + grid.getUI().ifPresent(ui -> ui.access(() -> { + ConcurrentDownloadTimeoutEvent ev = new ConcurrentDownloadTimeoutEvent(GridExporter.this); + Stream.concat(instanceDownloadTimeoutListeners.stream(), + globalDownloadTimeoutListeners.stream()).forEach(listener -> { + if (!ev.isPropagationStopped()) { + listener.accept(ev); + } + }); + })); + } + } + + /** + * Adds a listener for concurrent download timeout events specific to this instance. + *

+ * The listener will be called whenever a concurrent download timeout event occurs. + * + * @param listener the listener to be added + * @return a {@link Registration} object that can be used to remove the listener + */ + public Registration addConcurrentDownloadTimeoutEvent( + SerializableConsumer listener) { + instanceDownloadTimeoutListeners.add(0, listener); + return () -> instanceDownloadTimeoutListeners.remove(listener); + } + + /** + * Adds a global listener for concurrent download timeout events. + *

+ * The listener will be called whenever a concurrent download timeout event occurs. + *

+ * Note that instance-specific listeners take precedence over global listeners. If an instance + * listener stops the event propagation by calling + * {@link ConcurrentDownloadTimeoutEvent#stopPropagation() stopPropagation()}, the global + * listeners will not be notified. + * + * @param listener the listener to be added + * @return a {@link Registration} object that can be used to remove the listener + */ + public static Registration addGlobalConcurrentDownloadTimeoutEvent( + SerializableConsumer listener) { + globalDownloadTimeoutListeners.add(0, listener); + return () -> globalDownloadTimeoutListeners.remove(listener); + } + /** * Sets the limit for the {@linkplain #setConcurrentDownloadCost(float) cost of concurrent * downloads}. If all the downloads have a cost of {@link #DEFAULT_COST}, the limit represents the diff --git a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/GridExporterBigDatasetDemo.java b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/GridExporterBigDatasetDemo.java index 7e079de..65a303d 100644 --- a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/GridExporterBigDatasetDemo.java +++ b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/GridExporterBigDatasetDemo.java @@ -94,7 +94,8 @@ public GridExporterBigDatasetDemo() throws EncryptedDocumentException, IOExcepti Additionally, setConcurrentDownloadTimeout enforces a timeout for acquiring the necessary permits during a download operation. If the permits are not obtained within the specified timeframe, the download - request will be aborted, preventing prolonged waiting periods, especially during peak system loads. + request will be aborted and the DownloadTimeoutEvent listener will execute, preventing prolonged + waiting periods, especially during peak system loads. """); add(concurrent); // #endif diff --git a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/VaadinServiceInitListenerImpl.java b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/VaadinServiceInitListenerImpl.java index 9ba348e..845ccb9 100644 --- a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/VaadinServiceInitListenerImpl.java +++ b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/VaadinServiceInitListenerImpl.java @@ -1,5 +1,7 @@ package com.flowingcode.vaadin.addons.gridexporter; +import com.vaadin.flow.component.notification.Notification; +import com.vaadin.flow.component.notification.NotificationVariant; import com.vaadin.flow.server.ServiceInitEvent; import com.vaadin.flow.server.VaadinServiceInitListener; import java.util.concurrent.TimeUnit; @@ -10,6 +12,10 @@ public class VaadinServiceInitListenerImpl implements VaadinServiceInitListener public void serviceInit(ServiceInitEvent event) { GridExporter.setConcurrentDownloadLimit(10); GridExporter.setConcurrentDownloadTimeout(5, TimeUnit.SECONDS); + GridExporter.addGlobalConcurrentDownloadTimeoutEvent(ev -> { + Notification.show("System is busy. Please try again later.") + .addThemeVariants(NotificationVariant.LUMO_ERROR); + }); } } diff --git a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/test/ConcurrentExportTests.java b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/test/ConcurrentExportTests.java index 353e1e2..21f69ff 100644 --- a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/test/ConcurrentExportTests.java +++ b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/test/ConcurrentExportTests.java @@ -4,6 +4,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertTrue; import com.flowingcode.vaadin.addons.gridexporter.ConfigurableConcurrentStreamResourceWriter; import com.flowingcode.vaadin.addons.gridexporter.GridExporter; import com.vaadin.flow.server.StreamResourceWriter; @@ -29,17 +30,24 @@ public class ConcurrentExportTests { private static final int TEST_TIMEOUT = 5000; - private static Matcher interruptedByTimeout() { + private static Matcher throwsInterruptedByTimeout() { return Matchers.instanceOf(InterruptedByTimeoutException.class); } private class ConcurrentStreamResourceWriter extends ConfigurableConcurrentStreamResourceWriter { + private boolean interruptedByTimeout; + public ConcurrentStreamResourceWriter(StreamResourceWriter delegate) { super(delegate); } + @Override + protected void onTimeout() { + interruptedByTimeout = true; + } + } private CyclicBarrier barrier; @@ -68,6 +76,8 @@ private interface MockDownload { MockDownload await() throws InterruptedException; MockDownload start(); + + boolean wasInterruptedByTimeout(); } private MockDownload newDownload() { @@ -143,6 +153,11 @@ public MockDownload withCost(float cost) { writer.setCost(cost); return this; } + + @Override + public boolean wasInterruptedByTimeout() { + return writer.interruptedByTimeout; + } }; } @@ -244,7 +259,8 @@ public void testInterruptedByTimeout1() await(barrier); assertThat(q1.get(), nullValue()); - assertThat(q2.get(), interruptedByTimeout()); + assertThat(q2.get(), throwsInterruptedByTimeout()); + assertTrue(q2.wasInterruptedByTimeout()); } @@ -262,7 +278,7 @@ public void testInterruptedByTimeout2() assertThat(q1.get(), nullValue()); assertThat(q2.get(), nullValue()); - assertThat(q3.get(), interruptedByTimeout()); + assertThat(q3.get(), throwsInterruptedByTimeout()); } @Test(timeout = TEST_TIMEOUT) @@ -278,8 +294,8 @@ public void testInterruptedByTimeout3() await(barrier); assertThat(q1.get(), nullValue()); - assertThat(q2.get(), interruptedByTimeout()); - assertThat(q3.get(), interruptedByTimeout()); + assertThat(q2.get(), throwsInterruptedByTimeout()); + assertThat(q3.get(), throwsInterruptedByTimeout()); } } \ No newline at end of file