Skip to content

feat: implement concurrent export limit #125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,8 @@ types.d.ts
frontend/generated/
frontend/index.html
src/main/dev-bundle/*
/src/main/bundles
/src/main/dev-bundle
/.flow-node-tasks.lock
/vite.config.ts
/vite.generated.ts
9 changes: 6 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@

<groupId>org.vaadin.addons.flowingcode</groupId>
<artifactId>grid-exporter-addon</artifactId>
<version>2.3.3-SNAPSHOT</version>
<version>2.4.0-SNAPSHOT</version>
<name>Grid Exporter Add-on</name>
<description>Grid Exporter Add-on for Vaadin Flow</description>

<properties>
<vaadin.version>24.3.9</vaadin.version>
<selenium.version>4.1.2</selenium.version>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<drivers.dir>${project.basedir}/drivers</drivers.dir>
Expand Down Expand Up @@ -301,6 +301,9 @@
<supportedPackagings>
<supportedPackaging>jar</supportedPackaging>
</supportedPackagings>
<systemProperties>
<com.vaadin.flow.server.pushMode>AUTOMATIC</com.vaadin.flow.server.pushMode>
</systemProperties>
</configuration>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.flowingcode.vaadin.addons.gridexporter;

import java.util.EventObject;
import java.util.Objects;

/**
* An event that is fired when a concurrent download timeout occurs in the {@link GridExporter}.
* <p>
* This event allows the handler to determine whether the event propagation should be stopped,
* preventing other listeners from processing the event.
* </p>
*
* @param <T> the type of the GridExporter source
* @see GridExporter#setConcurrentDownloadTimeout(long, java.util.concurrent.TimeUnit)
*/
@SuppressWarnings("serial")
public class ConcurrentDownloadTimeoutEvent extends EventObject {

private boolean propagationStopped;

/**
* Constructs a new ConcurrentDownloadTimeoutEvent.
*
* @param source the {@link GridExporter} that is the source of this event
* @throws IllegalArgumentException if source is null
*/
public ConcurrentDownloadTimeoutEvent(GridExporter<?> source) {
super(Objects.requireNonNull(source));
}

/**
* Returns the source of this event.
*
* @return the {@code GridExporter} that is the source of this event
*/
@Override
public GridExporter<?> getSource() {
return (GridExporter<?>) super.getSource();
}

/**
* Stops the propagation of this event. When propagation is stopped, other listeners will not be
* notified of this event.
*/
public void stopPropagation() {
propagationStopped = true;
}

/**
* Checks if the propagation of this event has been stopped.
*
* @return {@code true} if the propagation has been stopped, {@code false} otherwise
* @see #stopPropagation()
*/
public boolean isPropagationStopped() {
return propagationStopped;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package com.flowingcode.vaadin.addons.gridexporter;

import com.vaadin.flow.server.StreamResourceWriter;
import com.vaadin.flow.server.VaadinSession;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.channels.InterruptedByTimeoutException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;

/**
* An implementation of {@link StreamResourceWriter} that controls access to the
* {@link #accept(OutputStream, VaadinSession) accept} method using a semaphore to manage
* concurrency.
*
* @author Javier Godoy
*/
@SuppressWarnings("serial")
abstract class ConcurrentStreamResourceWriter implements StreamResourceWriter {

public static final float MAX_COST = 0x7FFF;

public static final float MIN_COST = 1.0f / 0x10000;

public static final float DEFAULT_COST = 1.0f;

private static final ConfigurableSemaphore semaphore = new ConfigurableSemaphore();

private static volatile boolean enabled;

private final StreamResourceWriter delegate;

private static final class ConfigurableSemaphore extends Semaphore {

private int maxPermits;

ConfigurableSemaphore() {
super(0);
}

synchronized void setPermits(int permits) {
if (permits < 0) {
throw new IllegalArgumentException();
}
int delta = permits - maxPermits;
if (delta > 0) {
super.release(delta);
} else if (delta < 0) {
super.reducePermits(-delta);
}
maxPermits = permits;
}

@Override
public String toString() {
IntFunction<String> str = permits -> {
float f = permits / (float) 0x10000;
return f == Math.floor(f) ? String.format("%.0f", f) : Float.toString(f);
};
return "Semaphore[" + str.apply(availablePermits()) + "/" + str.apply(maxPermits) + "]";
}

}

/**
* Sets the limit for the cost of concurrent downloads.
* <p>
* Finite limits are capped to {@link #MAX_COST} (32767). If the limit is
* {@link Float#POSITIVE_INFINITY POSITIVE_INFINITY}, the semaphore will not be used for
* controlling concurrent downloads.
*
* @param limit the maximum cost of concurrent downloads allowed
* @throws IllegalArgumentException if the limit is zero or negative.
*/
public static void setLimit(float limit) {
if (limit <= 0) {
throw new IllegalArgumentException();
}
if (Float.isInfinite(limit)) {
enabled = false;
return;
}

synchronized (semaphore) {
enabled = true;
semaphore.setPermits(costToPermits(limit, Integer.MAX_VALUE));
}
}

/**
* Returns the limit for the number of concurrent downloads.
*
* @return the limit for the number of concurrent downloads, or {@link Float#POSITIVE_INFINITY} if
* the semaphore is disabled.
*/
public static float getLimit() {
if (enabled) {
synchronized (semaphore) {
return (float) semaphore.maxPermits / 0x10000;
}
} else {
return Float.POSITIVE_INFINITY;
}
}

private static int costToPermits(float cost, int maxPermits) {
// restrict limit to 0x7fff to ensure the cost can be represented
// using fixed-point arithmetic with 16 fractional digits and 15 integral digits
cost = Math.min(cost, MAX_COST);
// Determine the number of permits required based on the cost, capping at maxPermits.
// If the cost is zero or negative, no permits are needed.
// Any positive cost, no matter how small, will require at least one permit.
return cost <= 0 ? 0 : Math.max(Math.min((int) (cost * 0x10000), maxPermits), 1);
}

/**
* Constructs a {@code ConcurrentStreamResourceWriter} with the specified delegate. The delegate
* is a {@link StreamResourceWriter} that performs the actual writing to the stream.
*
* @param delegate the delegate {@code InputStreamFactory}
*/
ConcurrentStreamResourceWriter(StreamResourceWriter delegate) {
this.delegate = delegate;
}

/**
* Sets the timeout for acquiring a permit to start a download when there are not enough permits
* available in the semaphore.
*
* @see GridExporter#setConcurrentDownloadTimeout(long, TimeUnit)
* @return the timeout in nanoseconds.
*/
public abstract long getTimeout();

/**
* Returns the cost of this download.
*
* Note that the method is not called under the session lock. It means that if implementation
* requires access to the application/session data then the session has to be locked explicitly.
*
* @param session vaadin session
* @see GridExporter#setConcurrentDownloadCost(float)
*/
public float getCost(VaadinSession session) {
return DEFAULT_COST;
}

/**
* Callback method that is invoked when a timeout occurs while trying to acquire a permit for
* starting a download.
* <p>
* Implementations can use this method to perform any necessary actions in response to the
* timeout, such as logging a warning or notifying the user.
* </p>
*/
protected abstract void onTimeout();

/**
* Handles {@code stream} (writes data to it) using {@code session} as a context.
* <p>
* Note that the method is not called under the session lock. It means that if implementation
* requires access to the application/session data then the session has to be locked explicitly.
* <p>
* If a semaphore has been set, it controls access to this method, enforcing a timeout. A permit
* will be acquired from the semaphore, if one becomes available within the given waiting time and
* the current thread has not been {@linkplain Thread#interrupt interrupted}.
*
* @param stream data output stream
* @param session vaadin session
* @throws IOException if an IO error occurred
* @throws InterruptedIOException if the current thread is interrupted
* @throws InterruptedByTimeoutException if the waiting time elapsed before a permit was acquired
*/
@Override
public final void accept(OutputStream stream, VaadinSession session) throws IOException {

if (!enabled) {
delegate.accept(stream, session);
} else {

try {

int permits;
float cost = getCost(session);
synchronized (semaphore) {
permits = costToPermits(cost, semaphore.maxPermits);
}

if (semaphore.tryAcquire(permits, getTimeout(), TimeUnit.NANOSECONDS)) {
try {
delegate.accept(stream, session);
} finally {
semaphore.release(permits);
}
} else {
onTimeout();
throw new InterruptedByTimeoutException();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw (IOException) new InterruptedIOException().initCause(e);
}
}
}

}
Loading
Loading