Skip to content

Commit 4a8b641

Browse files
authored
fix: Better stream and access token management (#1019)
* fix: use correct join payload * fix: keep auth token valid while in background when using realtime * fix: correct supabase_flutter init log * refactor: add toString method to RealtimeCloseEvent * fix: reload data from postgrest after new realtime connection * test: fix mock test by increasing delay * fix: load postgrest before realtime conn and close realtime on error * test: restore delay and expect access_token instead of user_token * test: fix typo * refactor: small rename * fix: wait for conn being ready and re-add error to _triggerChanError * fix: don't stringify errors and fix tests * test: close conn from server * fix: disconnect when in background and await connecting to be ready * fix: rejoin channels in edge case * docs: improve method comments
1 parent e095c14 commit 4a8b641

File tree

10 files changed

+208
-50
lines changed

10 files changed

+208
-50
lines changed

packages/realtime_client/lib/realtime_client.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
export 'src/constants.dart' show RealtimeConstants, RealtimeLogLevel;
1+
export 'src/constants.dart'
2+
show RealtimeConstants, RealtimeLogLevel, SocketStates;
23
export 'src/realtime_channel.dart';
34
export 'src/realtime_client.dart';
45
export 'src/realtime_presence.dart';

packages/realtime_client/lib/src/constants.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ enum SocketStates {
1818
/// Connection is live and connected
1919
open,
2020

21-
/// Socket is closing.
22-
closing,
21+
/// Socket is closing by the user
22+
disconnecting,
2323

2424
/// Socket being close not by the user. Realtime should attempt to reconnect.
2525
closed,

packages/realtime_client/lib/src/realtime_channel.dart

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class RealtimeChannel {
7070
socket.remove(this);
7171
});
7272

73-
_onError((String? reason) {
73+
_onError((reason) {
7474
if (isLeaving || isClosed) {
7575
return;
7676
}
@@ -260,9 +260,9 @@ class RealtimeChannel {
260260
}
261261

262262
/// Registers a callback that will be executed when the channel encounteres an error.
263-
void _onError(void Function(String?) callback) {
263+
void _onError(Function callback) {
264264
onEvents(ChannelEvents.error.eventName(), ChannelFilter(),
265-
(reason, [ref]) => callback(reason?.toString()));
265+
(reason, [ref]) => callback(reason));
266266
}
267267

268268
/// Sets up a listener on your Supabase database.
@@ -646,6 +646,23 @@ class RealtimeChannel {
646646
joinPush.resend(timeout ?? _timeout);
647647
}
648648

649+
/// Resends [joinPush] to tell the server we join this channel again and marks
650+
/// the channel as [ChannelStates.joining].
651+
///
652+
/// Usually [rejoin] only happens when the channel timeouts or errors out.
653+
/// When manually disconnecting, the channel is still marked as
654+
/// [ChannelStates.joined]. Calling [RealtimeClient.leaveOpenTopic] will
655+
/// unsubscribe itself, which causes issues when trying to rejoin. This method
656+
/// therefore doesn't call [RealtimeClient.leaveOpenTopic].
657+
@internal
658+
void forceRejoin([Duration? timeout]) {
659+
if (isLeaving) {
660+
return;
661+
}
662+
_state = ChannelStates.joining;
663+
joinPush.resend(timeout ?? _timeout);
664+
}
665+
649666
void trigger(String type, [dynamic payload, String? ref]) {
650667
final typeLower = type.toLowerCase();
651668

packages/realtime_client/lib/src/realtime_client.dart

Lines changed: 53 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ class RealtimeCloseEvent {
4545
required this.code,
4646
required this.reason,
4747
});
48+
49+
@override
50+
String toString() {
51+
return 'RealtimeCloseEvent(code: $code, reason: $reason)';
52+
}
4853
}
4954

5055
class RealtimeClient {
@@ -134,17 +139,17 @@ class RealtimeClient {
134139
(String payload, Function(dynamic result) callback) =>
135140
callback(json.decode(payload));
136141
reconnectTimer = RetryTimer(
137-
() {
138-
disconnect();
139-
connect();
142+
() async {
143+
await disconnect();
144+
await connect();
140145
},
141146
this.reconnectAfterMs,
142147
);
143148
}
144149

145150
/// Connects the socket.
146151
@internal
147-
void connect() async {
152+
Future<void> connect() async {
148153
if (conn != null) {
149154
return;
150155
}
@@ -153,8 +158,20 @@ class RealtimeClient {
153158
connState = SocketStates.connecting;
154159
conn = transport(endPointURL, headers);
155160

156-
// handle connection errors
157-
conn!.ready.catchError(_onConnError);
161+
try {
162+
await conn!.ready;
163+
} catch (error) {
164+
// Don't schedule a reconnect and emit error if connection has been
165+
// closed by the user or [disconnect] waits for the connection to be
166+
// ready before closing it.
167+
if (connState != SocketStates.disconnected &&
168+
connState != SocketStates.disconnecting) {
169+
connState = SocketStates.closed;
170+
_onConnError(error);
171+
reconnectTimer.scheduleTimeout();
172+
}
173+
return;
174+
}
158175

159176
connState = SocketStates.open;
160177

@@ -166,7 +183,8 @@ class RealtimeClient {
166183
onError: _onConnError,
167184
onDone: () {
168185
// communication has been closed
169-
if (connState != SocketStates.disconnected) {
186+
if (connState != SocketStates.disconnected &&
187+
connState != SocketStates.disconnecting) {
170188
connState = SocketStates.closed;
171189
}
172190
_onConnClose();
@@ -179,20 +197,32 @@ class RealtimeClient {
179197
}
180198

181199
/// Disconnects the socket with status [code] and [reason] for the disconnect
182-
void disconnect({int? code, String? reason}) {
200+
Future<void> disconnect({int? code, String? reason}) async {
183201
final conn = this.conn;
184202
if (conn != null) {
185-
connState = SocketStates.disconnected;
186-
if (code != null) {
187-
conn.sink.close(code, reason ?? '');
188-
} else {
189-
conn.sink.close();
203+
final oldState = connState;
204+
connState = SocketStates.disconnecting;
205+
206+
// Connection cannot be closed while it's still connecting. Wait for connection to
207+
// be ready and then close it.
208+
if (oldState == SocketStates.connecting) {
209+
await conn.ready.catchError((_) {});
210+
}
211+
212+
if (oldState == SocketStates.open ||
213+
oldState == SocketStates.connecting) {
214+
if (code != null) {
215+
await conn.sink.close(code, reason ?? '');
216+
} else {
217+
await conn.sink.close();
218+
}
219+
connState = SocketStates.disconnected;
220+
reconnectTimer.reset();
190221
}
191222
this.conn = null;
192223

193224
// remove open handles
194225
if (heartbeatTimer != null) heartbeatTimer?.cancel();
195-
reconnectTimer.reset();
196226
}
197227
}
198228

@@ -251,8 +281,8 @@ class RealtimeClient {
251281
return 'connecting';
252282
case SocketStates.open:
253283
return 'open';
254-
case SocketStates.closing:
255-
return 'closing';
284+
case SocketStates.disconnecting:
285+
return 'disconnecting';
256286
case SocketStates.disconnected:
257287
return 'disconnected';
258288
case SocketStates.closed:
@@ -262,7 +292,7 @@ class RealtimeClient {
262292
}
263293

264294
/// Retuns `true` is the connection is open.
265-
bool get isConnected => connectionState == 'open';
295+
bool get isConnected => connState == SocketStates.open;
266296

267297
/// Removes a subscription from the socket.
268298
@internal
@@ -353,15 +383,15 @@ class RealtimeClient {
353383

354384
for (final channel in channels) {
355385
if (token != null) {
356-
channel.updateJoinPayload({'user_token': token});
386+
channel.updateJoinPayload({'access_token': token});
357387
}
358388
if (channel.joinedOnce && channel.isJoined) {
359389
channel.push(ChannelEvents.accessToken, {'access_token': token});
360390
}
361391
}
362392
}
363393

364-
/// Unsubscribe from channels with the specified topic.
394+
/// Unsubscribe from joined or joining channels with the specified topic.
365395
@internal
366396
void leaveOpenTopic(String topic) {
367397
final dupChannel = channels.firstWhereOrNull(
@@ -399,7 +429,7 @@ class RealtimeClient {
399429
/// SocketStates.disconnected: by user with socket.disconnect()
400430
/// SocketStates.closed: NOT by user, should try to reconnect
401431
if (connState == SocketStates.closed) {
402-
_triggerChanError();
432+
_triggerChanError(event);
403433
reconnectTimer.scheduleTimeout();
404434
}
405435
if (heartbeatTimer != null) heartbeatTimer!.cancel();
@@ -410,15 +440,15 @@ class RealtimeClient {
410440

411441
void _onConnError(dynamic error) {
412442
log('transport', error.toString());
413-
_triggerChanError();
443+
_triggerChanError(error);
414444
for (final callback in stateChangeCallbacks['error']!) {
415445
callback(error);
416446
}
417447
}
418448

419-
void _triggerChanError() {
449+
void _triggerChanError([dynamic error]) {
420450
for (final channel in channels) {
421-
channel.trigger(ChannelEvents.error.eventName());
451+
channel.trigger(ChannelEvents.error.eventName(), error);
422452
}
423453
}
424454

packages/realtime_client/test/mock_test.dart

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,9 @@ void main() {
268268
final subscribeCallback =
269269
expectAsync2((RealtimeSubscribeStatus event, error) {
270270
if (event == RealtimeSubscribeStatus.channelError) {
271-
expect(error, isNull);
271+
expect(error, isA<RealtimeCloseEvent>());
272+
error as RealtimeCloseEvent;
273+
expect(error.reason, "heartbeat timeout");
272274
} else {
273275
expect(event, RealtimeSubscribeStatus.closed);
274276
}
@@ -285,8 +287,8 @@ void main() {
285287

286288
channel.subscribe(subscribeCallback);
287289

288-
await client.conn!.sink
289-
.close(Constants.wsCloseNormal, "heartbeat timeout");
290+
await Future.delayed(Duration(milliseconds: 200));
291+
await webSocket?.close(Constants.wsCloseNormal, "heartbeat timeout");
290292
});
291293
});
292294

packages/realtime_client/test/socket_test.dart

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ void main() {
171171
});
172172

173173
socket.connect();
174+
await Future.delayed(const Duration(milliseconds: 200));
174175
expect(opens, 1);
175176

176177
socket.sendHeartbeat();
@@ -214,8 +215,8 @@ void main() {
214215
});
215216

216217
test('removes existing connection', () async {
217-
socket.connect();
218-
socket.disconnect();
218+
await socket.connect();
219+
await socket.disconnect();
219220

220221
expect(socket.conn, null);
221222
});
@@ -229,7 +230,7 @@ void main() {
229230
expect(closes, 1);
230231
});
231232

232-
test('calls connection close callback', () {
233+
test('calls connection close callback', () async {
233234
final mockedSocketChannel = MockIOWebSocketChannel();
234235
final mockedSocket = RealtimeClient(
235236
socketEndpoint,
@@ -247,7 +248,10 @@ void main() {
247248
const tReason = 'reason';
248249

249250
mockedSocket.connect();
251+
mockedSocket.connState = SocketStates.open;
252+
await Future.delayed(const Duration(milliseconds: 200));
250253
mockedSocket.disconnect(code: tCode, reason: tReason);
254+
await Future.delayed(const Duration(milliseconds: 200));
251255

252256
verify(
253257
() => mockedSink.close(
@@ -423,7 +427,7 @@ void main() {
423427
});
424428

425429
group('setAuth', () {
426-
final updateJoinPayload = {'user_token': 'token123'};
430+
final updateJoinPayload = {'access_token': 'token123'};
427431
final pushPayload = {'access_token': 'token123'};
428432

429433
test(

packages/supabase/lib/src/supabase_query_builder.dart

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,16 @@ class SupabaseQueryBuilder extends PostgrestQueryBuilder {
2323
url: Uri.parse(url),
2424
);
2525

26-
/// Returns real-time data from your table as a `Stream`.
26+
/// Combines the current state of your table from PostgREST with changes from the realtime server to return real-time data from your table as a [Stream].
2727
///
2828
/// Realtime is disabled by default for new tables. You can turn it on by [managing replication](https://supabase.com/docs/guides/realtime/extensions/postgres-changes#replication-setup).
2929
///
30-
/// Pass the list of primary key column names to [primaryKey], which will be used to updating and deleting the proper records internally as the library receives real-time updates.
30+
/// Pass the list of primary key column names to [primaryKey], which will be used to update and delete the proper records internally as the stream receives real-time updates.
31+
///
32+
/// It handles the lifecycle of the realtime connection and automatically refetches data from PostgREST when needed.
33+
///
34+
/// Make sure to provide `onError` and `onDone` callbacks to [Stream.listen] to handle errors and completion of the stream.
35+
/// The stream gets closed when the realtime connection is closed.
3136
///
3237
/// ```dart
3338
/// supabase.from('chats').stream(primaryKey: ['id']).listen(_onChatsReceived);

0 commit comments

Comments
 (0)