Skip to content

Commit e19e77b

Browse files
committed
feat: implement concurrent export limit
Close #117
1 parent f770996 commit e19e77b

File tree

2 files changed

+311
-3
lines changed

2 files changed

+311
-3
lines changed
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
package com.flowingcode.vaadin.addons.gridexporter;
2+
3+
import com.vaadin.flow.server.StreamResourceWriter;
4+
import com.vaadin.flow.server.VaadinSession;
5+
import java.io.IOException;
6+
import java.io.InterruptedIOException;
7+
import java.io.OutputStream;
8+
import java.nio.channels.InterruptedByTimeoutException;
9+
import java.util.concurrent.Semaphore;
10+
import java.util.concurrent.TimeUnit;
11+
import java.util.function.IntFunction;
12+
13+
/**
14+
* An implementation of {@link StreamResourceWriter} that controls access to the
15+
* {@link #accept(OutputStream, VaadinSession) accept} method using a semaphore to manage
16+
* concurrency.
17+
*
18+
* @author Javier Godoy
19+
*/
20+
@SuppressWarnings("serial")
21+
abstract class ConcurrentStreamResourceWriter implements StreamResourceWriter {
22+
23+
public static final float MAX_COST = 0x7FFF;
24+
25+
public static final float MIN_COST = 1.0f / 0x10000;
26+
27+
public static final float DEFAULT_COST = 1.0f;
28+
29+
private static final ConfigurableSemaphore semaphore = new ConfigurableSemaphore();
30+
31+
private static volatile boolean enabled;
32+
33+
private final StreamResourceWriter delegate;
34+
35+
private static final class ConfigurableSemaphore extends Semaphore {
36+
37+
private int maxPermits;
38+
39+
ConfigurableSemaphore() {
40+
super(0);
41+
}
42+
43+
synchronized void setPermits(int permits) {
44+
if (permits < 0) {
45+
throw new IllegalArgumentException();
46+
}
47+
int delta = permits - maxPermits;
48+
if (delta > 0) {
49+
super.release(delta);
50+
} else if (delta < 0) {
51+
super.reducePermits(-delta);
52+
}
53+
maxPermits = permits;
54+
}
55+
56+
@Override
57+
public String toString() {
58+
IntFunction<String> str = permits -> {
59+
float f = permits / (float) 0x10000;
60+
return f == Math.floor(f) ? String.format("%.0f", f) : Float.toString(f);
61+
};
62+
return "Semaphore[" + str.apply(availablePermits()) + "/" + str.apply(maxPermits) + "]";
63+
}
64+
65+
}
66+
67+
/**
68+
* Sets the limit for the cost of concurrent downloads.
69+
* <p>
70+
* Finite limits are capped to {@link #MAX_COST} (32767). If the limit is
71+
* {@link Float#POSITIVE_INFINITY POSITIVE_INFINITY}, the semaphore will not be used for
72+
* controlling concurrent downloads.
73+
*
74+
* @param limit the maximum cost of concurrent downloads allowed
75+
* @throws IllegalArgumentException if the limit is zero or negative.
76+
*/
77+
public static void setLimit(float limit) {
78+
if (limit <= 0) {
79+
throw new IllegalArgumentException();
80+
}
81+
if (Float.isInfinite(limit)) {
82+
enabled = false;
83+
return;
84+
}
85+
86+
synchronized (semaphore) {
87+
enabled = true;
88+
semaphore.setPermits(costToPermits(limit, Integer.MAX_VALUE));
89+
}
90+
}
91+
92+
/**
93+
* Returns the limit for the number of concurrent downloads.
94+
*
95+
* @return the limit for the number of concurrent downloads, or {@link Float#POSITIVE_INFINITY} if
96+
* the semaphore is disabled.
97+
*/
98+
public static float getLimit() {
99+
if (enabled) {
100+
synchronized (semaphore) {
101+
return (float) semaphore.maxPermits / 0x10000;
102+
}
103+
} else {
104+
return Float.POSITIVE_INFINITY;
105+
}
106+
}
107+
108+
private static int costToPermits(float cost, int maxPermits) {
109+
// restrict limit to 0x7fff to ensure the cost can be represented
110+
// using fixed-point arithmetic with 16 fractional digits and 15 integral digits
111+
cost = Math.min(cost, MAX_COST);
112+
// Determine the number of permits required based on the cost, capping at maxPermits.
113+
// If the cost is zero or negative, no permits are needed.
114+
// Any positive cost, no matter how small, will require at least one permit.
115+
return cost <= 0 ? 0 : Math.max(Math.min((int) (cost * 0x10000), maxPermits), 1);
116+
}
117+
118+
/**
119+
* Constructs a {@code ConcurrentStreamResourceWriter} with the specified delegate. The delegate
120+
* is a {@link StreamResourceWriter} that performs the actual writing to the stream.
121+
*
122+
* @param delegate the delegate {@code InputStreamFactory}
123+
*/
124+
ConcurrentStreamResourceWriter(StreamResourceWriter delegate) {
125+
this.delegate = delegate;
126+
}
127+
128+
/**
129+
* Sets the timeout for acquiring a permit to start a download when there are not enough permits
130+
* available in the semaphore.
131+
*
132+
* @see GridExporter#setConcurrentDownloadTimeout(long, TimeUnit)
133+
* @return the timeout in nanoseconds.
134+
*/
135+
public abstract long getTimeout();
136+
137+
/**
138+
* Returns the cost of this download.
139+
*
140+
* Note that the method is not called under the session lock. It means that if implementation
141+
* requires access to the application/session data then the session has to be locked explicitly.
142+
*
143+
* @param session vaadin session
144+
* @see GridExporter#setConcurrentDownloadCost(float)
145+
*/
146+
public float getCost(VaadinSession session) {
147+
return DEFAULT_COST;
148+
}
149+
150+
/**
151+
* Handles {@code stream} (writes data to it) using {@code session} as a context.
152+
* <p>
153+
* Note that the method is not called under the session lock. It means that if implementation
154+
* requires access to the application/session data then the session has to be locked explicitly.
155+
* <p>
156+
* If a semaphore has been set, it controls access to this method, enforcing a timeout. A permit
157+
* will be acquired from the semaphore, if one becomes available within the given waiting time and
158+
* the current thread has not been {@linkplain Thread#interrupt interrupted}.
159+
*
160+
* @param stream data output stream
161+
* @param session vaadin session
162+
* @throws IOException if an IO error occurred
163+
* @throws InterruptedIOException if the current thread is interrupted
164+
* @throws InterruptedByTimeoutException if the waiting time elapsed before a permit was acquired
165+
*/
166+
@Override
167+
public final void accept(OutputStream stream, VaadinSession session) throws IOException {
168+
169+
if (!enabled) {
170+
delegate.accept(stream, session);
171+
} else {
172+
173+
try {
174+
175+
int permits;
176+
float cost = getCost(session);
177+
synchronized (semaphore) {
178+
permits = costToPermits(cost, semaphore.maxPermits);
179+
}
180+
181+
if (semaphore.tryAcquire(permits, getTimeout(), TimeUnit.NANOSECONDS)) {
182+
try {
183+
delegate.accept(stream, session);
184+
} finally {
185+
semaphore.release(permits);
186+
}
187+
} else {
188+
throw new InterruptedByTimeoutException();
189+
}
190+
} catch (InterruptedException e) {
191+
Thread.currentThread().interrupt();
192+
throw (IOException) new InterruptedIOException().initCause(e);
193+
}
194+
}
195+
}
196+
197+
}

src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import com.vaadin.flow.function.SerializableSupplier;
3636
import com.vaadin.flow.function.ValueProvider;
3737
import com.vaadin.flow.server.StreamResource;
38+
import com.vaadin.flow.server.StreamResourceWriter;
39+
import com.vaadin.flow.server.VaadinSession;
3840
import java.io.Serializable;
3941
import java.lang.reflect.Field;
4042
import java.lang.reflect.InvocationTargetException;
@@ -47,6 +49,7 @@
4749
import java.util.List;
4850
import java.util.Map;
4951
import java.util.Optional;
52+
import java.util.concurrent.TimeUnit;
5053
import java.util.stream.Collectors;
5154
import org.slf4j.Logger;
5255
import org.slf4j.LoggerFactory;
@@ -62,6 +65,18 @@ public class GridExporter<T> implements Serializable {
6265
private boolean csvExportEnabled = true;
6366
private boolean autoSizeColumns = true;
6467

68+
/** Represents all the permits available to the semaphore. */
69+
public static final float MAX_COST = ConcurrentStreamResourceWriter.MAX_COST;
70+
71+
/** A fractional cost that acquires only one permit. */
72+
public static final float MIN_COST = ConcurrentStreamResourceWriter.MIN_COST;
73+
74+
/** The standard unit of resource usage for concurrent downloads. */
75+
public static final float DEFAULT_COST = 1.0f;
76+
77+
private static long concurrentDownloadTimeoutNanos = 0L;
78+
private float concurrentDownloadCost = DEFAULT_COST;
79+
6580
static final String COLUMN_VALUE_PROVIDER_DATA = "column-value-provider-data";
6681
static final String COLUMN_EXPORTED_PROVIDER_DATA = "column-value-exported-data";
6782
static final String COLUMN_PARSING_FORMAT_PATTERN_DATA = "column-parsing-format-pattern-data";
@@ -268,15 +283,17 @@ public StreamResource getDocxStreamResource() {
268283
}
269284

270285
public StreamResource getDocxStreamResource(String template) {
271-
return new StreamResource(fileName + ".docx", new DocxStreamResourceWriter<>(this, template));
286+
return new StreamResource(fileName + ".docx",
287+
makeConcurrentWriter(new DocxStreamResourceWriter<>(this, template)));
272288
}
273289

274290
public StreamResource getPdfStreamResource() {
275291
return getPdfStreamResource(null);
276292
}
277293

278294
public StreamResource getPdfStreamResource(String template) {
279-
return new StreamResource(fileName + ".pdf", new PdfStreamResourceWriter<>(this, template));
295+
return new StreamResource(fileName + ".pdf",
296+
makeConcurrentWriter(new PdfStreamResourceWriter<>(this, template)));
280297
}
281298

282299
public StreamResource getCsvStreamResource() {
@@ -288,7 +305,101 @@ public StreamResource getExcelStreamResource() {
288305
}
289306

290307
public StreamResource getExcelStreamResource(String template) {
291-
return new StreamResource(fileName + ".xlsx", new ExcelStreamResourceWriter<>(this, template));
308+
return new StreamResource(fileName + ".xlsx",
309+
makeConcurrentWriter(new ExcelStreamResourceWriter<>(this, template)));
310+
}
311+
312+
private StreamResourceWriter makeConcurrentWriter(StreamResourceWriter writer) {
313+
return new ConcurrentStreamResourceWriter(writer) {
314+
@Override
315+
public float getCost(VaadinSession session) {
316+
return concurrentDownloadCost;
317+
}
318+
319+
@Override
320+
public long getTimeout() {
321+
// It would have been possible to specify a different timeout for each instance but I cannot
322+
// figure out a good use case for that. The timeout returned herebecomes relevant when the
323+
// semaphore has been acquired by any other download, so the timeout must reflect how long
324+
// it is reasonable to wait for "any other download" to complete and release the semaphore.
325+
//
326+
// Since the reasonable timeout would depend on the duration of "any other download", it
327+
// makes sense that it's a global setting instead of a per-instance setting.
328+
return concurrentDownloadTimeoutNanos;
329+
}
330+
331+
};
332+
}
333+
334+
/**
335+
* Sets the limit for the {@linkplain #setConcurrentDownloadCost(float) cost of concurrent
336+
* downloads}. If all the downloads have a cost of {@link #DEFAULT_COST}, the limit represents the
337+
* number of concurrent downloads that are allowed.
338+
* <p>
339+
* Finite limits are capped to {@link #MAX_COST} (32767). If the limit is
340+
* {@link Float#POSITIVE_INFINITY POSITIVE_INFINITY}, concurrent downloads will not be limited.
341+
*
342+
* @param limit the maximum cost of concurrent downloads allowed
343+
* @throws IllegalArgumentException if the limit is zero or negative.
344+
*/
345+
public static void setConcurrentDownloadLimit(float limit) {
346+
ConcurrentStreamResourceWriter.setLimit(limit);
347+
}
348+
349+
/**
350+
* Returns the limit for the number of concurrent downloads.
351+
*
352+
* @return the limit for the number of concurrent downloads, or {@link Float#POSITIVE_INFINITY} if
353+
* concurrent downloads are not limited.
354+
*/
355+
public static float getConcurrentDownloadLimit() {
356+
return ConcurrentStreamResourceWriter.getLimit();
357+
}
358+
359+
/**
360+
* Sets the timeout for acquiring a permit to start a download when the
361+
* {@linkplain #setConcurrentDownloadLimit(int) maximum number of concurrent downloads} is
362+
* reached. If the timeout is less than or equal to zero, the downloads will fail immediately if
363+
* no enough permits can be acquired.
364+
*
365+
* This timeout is crucial for preventing the system from hanging indefinitely while waiting for
366+
* available resources. If the timeout expires before a permit can be acquired, the download is
367+
* cancelled.
368+
*
369+
* @param timeout the maximum time to wait for a permit
370+
* @param unit the time unit of the {@code timeout} argument
371+
*/
372+
public static void setConcurrentDownloadTimeout(long timeout, TimeUnit unit) {
373+
GridExporter.concurrentDownloadTimeoutNanos = unit.toNanos(timeout);
374+
}
375+
376+
/**
377+
* Sets the cost for concurrent downloads. This cost is used to determine the number of permits
378+
* required for downloads to proceed, thereby controlling the concurrency level. At any given
379+
* time, the sum of the costs of all concurrent downloads will not exceed the limit set by
380+
* {@link #setConcurrentDownloadLimit(float)}.
381+
* <p>
382+
*
383+
* The cost is represented as a float to allow for more granular control over resource usage. By
384+
* using a floating-point number, fractional costs can be expressed, providing flexibility in
385+
* determining the resource consumption for different downloads.
386+
* <p>
387+
*
388+
* The cost is converted to a number of permits by capping it to stay within the limit. A cost of
389+
* 1.0 ({@link #DEFAULT_COST}) represents a standard unit of resource usage, while a cost of 0.5
390+
* represents half a unit, and a cost above 1.0 indicates higher than normal resource usage.
391+
* <p>
392+
*
393+
* If the cost is zero or negative, no permits are needed. However, any positive cost, no matter
394+
* how small, will require at least one permit to prevent downloads with very low costs from
395+
* bypassing the semaphore. {@link #MIN_COST} represents the minimal fractional cost that acquires
396+
* only one permit (hence {@code 2*MIN_COST} acquires two permits and so on). A cost of
397+
* {@link #MAX_COST} prevents any other downloads from acquiring permits simultaneously.
398+
*
399+
* @param concurrentDownloadCost the cost associated with concurrent downloads for this instance.
400+
*/
401+
public void setConcurrentDownloadCost(float concurrentDownloadCost) {
402+
this.concurrentDownloadCost = concurrentDownloadCost;
292403
}
293404

294405
public String getTitle() {

0 commit comments

Comments
 (0)