|
21 | 21 | import java.util.Collections;
|
22 | 22 | import java.util.LinkedHashSet;
|
23 | 23 | import java.util.Set;
|
| 24 | +import java.util.concurrent.Executors; |
| 25 | +import java.util.concurrent.ScheduledExecutorService; |
| 26 | +import java.util.concurrent.ScheduledFuture; |
| 27 | +import java.util.concurrent.TimeUnit; |
24 | 28 | import java.util.concurrent.locks.Lock;
|
25 | 29 | import java.util.concurrent.locks.ReentrantLock;
|
26 | 30 |
|
@@ -52,21 +56,58 @@ public class SseEmitter extends ResponseBodyEmitter {
|
52 | 56 | private final Lock writeLock = new ReentrantLock();
|
53 | 57 |
|
54 | 58 | /**
|
55 |
| - * Create a new SseEmitter instance. |
| 59 | + * The interval (in milliseconds) at which heartbeat messages are sent to the client. |
| 60 | + * A value of 0 means no heartbeat messages will be sent. |
| 61 | + */ |
| 62 | + private final long heartbeatInterval; |
| 63 | + |
| 64 | + /** |
| 65 | + * The scheduled future for the heartbeat task. Used to cancel the task when needed. |
| 66 | + */ |
| 67 | + @Nullable |
| 68 | + private ScheduledFuture<?> heartbeatFuture; |
| 69 | + |
| 70 | + @Nullable |
| 71 | + private ScheduledExecutorService scheduler; |
| 72 | + |
| 73 | + /** |
| 74 | + * Create a new {@code SseEmitter} instance. |
| 75 | + * <p>By default, the timeout is not set (i.e., it depends on the MVC configuration), |
| 76 | + * and no heartbeat messages are sent. |
56 | 77 | */
|
57 | 78 | public SseEmitter() {
|
| 79 | + this.heartbeatInterval = 0; |
58 | 80 | }
|
59 | 81 |
|
60 | 82 | /**
|
61 | 83 | * Create a SseEmitter with a custom timeout value.
|
62 | 84 | * <p>By default not set in which case the default configured in the MVC
|
| 85 | + * <p>No heartbeat messages will be sent unless specified. |
63 | 86 | * Java Config or the MVC namespace is used, or if that's not set, then the
|
64 | 87 | * timeout depends on the default of the underlying server.
|
65 | 88 | * @param timeout the timeout value in milliseconds
|
66 | 89 | * @since 4.2.2
|
67 | 90 | */
|
68 | 91 | public SseEmitter(Long timeout) {
|
69 | 92 | super(timeout);
|
| 93 | + heartbeatInterval = 0; |
| 94 | + } |
| 95 | + |
| 96 | + /** |
| 97 | + * Create a new {@code SseEmitter} instance with a custom timeout and heartbeat interval. |
| 98 | + * @param timeout the timeout value in milliseconds |
| 99 | + * @param heartbeatInterval the interval (in milliseconds) at which heartbeat messages are sent. |
| 100 | + * A value of 0 means no heartbeat messages will be sent. |
| 101 | + */ |
| 102 | + public SseEmitter(Long timeout, long heartbeatInterval) { |
| 103 | + super(timeout); |
| 104 | + this.heartbeatInterval = heartbeatInterval; |
| 105 | + if (heartbeatInterval > 0) { |
| 106 | + startHeartbeat(); |
| 107 | + onCompletion(this::stopHeartbeat); |
| 108 | + onTimeout(this::stopHeartbeat); |
| 109 | + onError(ex -> stopHeartbeat()); |
| 110 | + } |
70 | 111 | }
|
71 | 112 |
|
72 | 113 |
|
@@ -139,6 +180,40 @@ public void send(SseEventBuilder builder) throws IOException {
|
139 | 180 | }
|
140 | 181 | }
|
141 | 182 |
|
| 183 | + /** |
| 184 | + * Start sending heartbeat messages at the specified interval. |
| 185 | + * <p>Heartbeat messages are sent as comments (":heartbeat") to keep the connection alive |
| 186 | + * and to detect client disconnects. |
| 187 | + */ |
| 188 | + private void startHeartbeat() { |
| 189 | + if (heartbeatInterval > 0) { |
| 190 | + this.scheduler = Executors.newSingleThreadScheduledExecutor(); |
| 191 | + this.heartbeatFuture = this.scheduler.scheduleAtFixedRate(() -> { |
| 192 | + try { |
| 193 | + send(SseEmitter.event().comment("heartbeat")); |
| 194 | + } catch (IOException ex) { |
| 195 | + completeWithError(ex); |
| 196 | + stopHeartbeat(); |
| 197 | + } |
| 198 | + }, heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS); |
| 199 | + } |
| 200 | + } |
| 201 | + |
| 202 | + /** |
| 203 | + * Stop sending heartbeat messages. |
| 204 | + * <p>Cancels the scheduled heartbeat task and shuts down the scheduler to release resources. |
| 205 | + */ |
| 206 | + private void stopHeartbeat() { |
| 207 | + if (heartbeatFuture != null) { |
| 208 | + heartbeatFuture.cancel(true); |
| 209 | + this.heartbeatFuture = null; |
| 210 | + } |
| 211 | + if (this.scheduler != null) { |
| 212 | + this.scheduler.shutdown(); |
| 213 | + this.scheduler = null; |
| 214 | + } |
| 215 | + } |
| 216 | + |
142 | 217 | @Override
|
143 | 218 | public String toString() {
|
144 | 219 | return "SseEmitter@" + ObjectUtils.getIdentityHexString(this);
|
|
0 commit comments