From 7749b981bee2c8be981e35dede1ca9fd1e698eda Mon Sep 17 00:00:00 2001 From: Vinzent Date: Wed, 28 Aug 2024 22:36:22 +0200 Subject: [PATCH 01/16] fix: use correct join payload --- packages/realtime_client/lib/src/realtime_client.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/realtime_client/lib/src/realtime_client.dart b/packages/realtime_client/lib/src/realtime_client.dart index 6873a2ca..3571f060 100644 --- a/packages/realtime_client/lib/src/realtime_client.dart +++ b/packages/realtime_client/lib/src/realtime_client.dart @@ -353,7 +353,7 @@ class RealtimeClient { for (final channel in channels) { if (token != null) { - channel.updateJoinPayload({'user_token': token}); + channel.updateJoinPayload({'access_token': token}); } if (channel.joinedOnce && channel.isJoined) { channel.push(ChannelEvents.accessToken, {'access_token': token}); From 7b1f7cffdd20da4717f7cb6aebb7dda6d1d541b0 Mon Sep 17 00:00:00 2001 From: Vinzent Date: Wed, 28 Aug 2024 22:36:43 +0200 Subject: [PATCH 02/16] fix: keep auth token valid while in background when using realtime --- packages/supabase_flutter/lib/src/supabase_auth.dart | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/supabase_flutter/lib/src/supabase_auth.dart b/packages/supabase_flutter/lib/src/supabase_auth.dart index fa3d72e8..7e42873e 100644 --- a/packages/supabase_flutter/lib/src/supabase_auth.dart +++ b/packages/supabase_flutter/lib/src/supabase_auth.dart @@ -119,7 +119,9 @@ class SupabaseAuth with WidgetsBindingObserver { case AppLifecycleState.detached: case AppLifecycleState.inactive: case AppLifecycleState.paused: - Supabase.instance.client.auth.stopAutoRefresh(); + if (Supabase.instance.client.realtime.getChannels().isEmpty) { + Supabase.instance.client.auth.stopAutoRefresh(); + } default: } } From 04c9753cdb270d0dcae623264dc1ba446b45bd2f Mon Sep 17 00:00:00 2001 From: Vinzent Date: Thu, 29 Aug 2024 16:44:54 +0200 Subject: [PATCH 03/16] fix: correct supabase_flutter init log --- packages/supabase_flutter/lib/src/supabase.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/supabase_flutter/lib/src/supabase.dart b/packages/supabase_flutter/lib/src/supabase.dart index 7cb2930e..0608707c 100644 --- a/packages/supabase_flutter/lib/src/supabase.dart +++ b/packages/supabase_flutter/lib/src/supabase.dart @@ -107,7 +107,7 @@ class Supabase { accessToken: accessToken, ); _instance._debugEnable = debug ?? kDebugMode; - _instance.log('***** Supabase init completed $_instance'); + _instance.log('***** Supabase init completed *****'); _instance._supabaseAuth = SupabaseAuth(); await _instance._supabaseAuth.initialize(options: authOptions); From 483f99a12507e171236c2393acb66922a02627ce Mon Sep 17 00:00:00 2001 From: Vinzent Date: Thu, 29 Aug 2024 16:45:38 +0200 Subject: [PATCH 04/16] refactor: add toString method to RealtimeCloseEvent --- packages/realtime_client/lib/src/realtime_client.dart | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/realtime_client/lib/src/realtime_client.dart b/packages/realtime_client/lib/src/realtime_client.dart index 3571f060..9aee37a1 100644 --- a/packages/realtime_client/lib/src/realtime_client.dart +++ b/packages/realtime_client/lib/src/realtime_client.dart @@ -45,6 +45,11 @@ class RealtimeCloseEvent { required this.code, required this.reason, }); + + @override + String toString() { + return 'RealtimeCloseEvent{code: $code, reason: $reason}'; + } } class RealtimeClient { From 36c248e15d26255f0337bcd90249621224d7f0b9 Mon Sep 17 00:00:00 2001 From: Vinzent Date: Thu, 29 Aug 2024 16:46:20 +0200 Subject: [PATCH 05/16] fix: reload data from postgrest after new realtime connection --- .../lib/src/supabase_stream_builder.dart | 41 ++++++++++++++++--- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/packages/supabase/lib/src/supabase_stream_builder.dart b/packages/supabase/lib/src/supabase_stream_builder.dart index 0807c734..57f5bf1e 100644 --- a/packages/supabase/lib/src/supabase_stream_builder.dart +++ b/packages/supabase/lib/src/supabase_stream_builder.dart @@ -31,6 +31,18 @@ class _Order { final bool ascending; } +class RealtimeSubscribeException implements Exception { + RealtimeSubscribeException(this.status, [this.details]); + + final RealtimeSubscribeStatus status; + final Object? details; + + @override + String toString() { + return 'RealtimeSubscribeException(status: $status, details: $details)'; + } +} + typedef SupabaseStreamEvent = List>; class SupabaseStreamBuilder extends Stream { @@ -195,12 +207,29 @@ class SupabaseStreamBuilder extends Stream { } }) .subscribe((status, [error]) { - if (error != null) { - _addException(error); + switch (status) { + case RealtimeSubscribeStatus.subscribed: + // Get first data when realtime is subscribed and reload all data + // from postgrest if e.g. got a channel error and is resubscribed + _getPostgrestData(); + break; + case RealtimeSubscribeStatus.closed: + if (!(_streamController?.isClosed ?? true)) { + _streamController?.close(); + } + break; + case RealtimeSubscribeStatus.timedOut: + _addException(RealtimeSubscribeException(status, error)); + break; + case RealtimeSubscribeStatus.channelError: + _addException(RealtimeSubscribeException(status, error)); + break; } }); + } - PostgrestFilterBuilder query = _queryBuilder.select(); + Future _getPostgrestData() async { + PostgrestFilterBuilder query = _queryBuilder.select(); if (_streamFilter != null) { switch (_streamFilter!.type) { case PostgresChangeFilterType.eq: @@ -226,7 +255,7 @@ class SupabaseStreamBuilder extends Stream { break; } } - PostgrestTransformBuilder? transformQuery; + PostgrestTransformBuilder? transformQuery; if (_orderBy != null) { transformQuery = query.order(_orderBy!.column, ascending: _orderBy!.ascending); @@ -237,8 +266,8 @@ class SupabaseStreamBuilder extends Stream { try { final data = await (transformQuery ?? query); - final rows = SupabaseStreamEvent.from(data as List); - _streamData.addAll(rows); + final rows = SupabaseStreamEvent.from(data); + _streamData = rows; _addStream(); } catch (error, stackTrace) { _addException(error, stackTrace); From d27dfda86838c62e3db731d8d2e4292452d01db0 Mon Sep 17 00:00:00 2001 From: Vinzent Date: Thu, 29 Aug 2024 20:46:34 +0200 Subject: [PATCH 06/16] test: fix mock test by increasing delay --- packages/supabase/test/mock_test.dart | 2 +- packages/supabase_flutter/lib/src/supabase_auth.dart | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/supabase/test/mock_test.dart b/packages/supabase/test/mock_test.dart index 6c458c5f..65f1bac9 100644 --- a/packages/supabase/test/mock_test.dart +++ b/packages/supabase/test/mock_test.dart @@ -165,7 +165,7 @@ void main() { webSocket!.add(replyString); // Send an insert event - await Future.delayed(Duration(milliseconds: 10)); + await Future.delayed(Duration(milliseconds: 100)); final insertString = jsonEncode({ 'topic': topic, 'event': 'postgres_changes', diff --git a/packages/supabase_flutter/lib/src/supabase_auth.dart b/packages/supabase_flutter/lib/src/supabase_auth.dart index 7e42873e..fe401ec5 100644 --- a/packages/supabase_flutter/lib/src/supabase_auth.dart +++ b/packages/supabase_flutter/lib/src/supabase_auth.dart @@ -119,6 +119,9 @@ class SupabaseAuth with WidgetsBindingObserver { case AppLifecycleState.detached: case AppLifecycleState.inactive: case AppLifecycleState.paused: + // Realtime channels are kept alive in the background for some amount + // of time after the app is paused. If we stop refreshing the token + // here, the channels will be closed. if (Supabase.instance.client.realtime.getChannels().isEmpty) { Supabase.instance.client.auth.stopAutoRefresh(); } From 3047e9cfc69f1451f3aa40293fb65619457581fa Mon Sep 17 00:00:00 2001 From: Vinzent Date: Mon, 2 Sep 2024 13:59:45 +0200 Subject: [PATCH 07/16] fix: load postgrest before realtime conn and close realtime on error --- .../lib/src/supabase_stream_builder.dart | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/packages/supabase/lib/src/supabase_stream_builder.dart b/packages/supabase/lib/src/supabase_stream_builder.dart index 57f5bf1e..292605bc 100644 --- a/packages/supabase/lib/src/supabase_stream_builder.dart +++ b/packages/supabase/lib/src/supabase_stream_builder.dart @@ -76,6 +76,9 @@ class SupabaseStreamBuilder extends Stream { /// Count of record to be returned int? _limit; + /// Flag if the stream has at least one time been subscribed to realtime + bool _gotSubscribed = false; + SupabaseStreamBuilder({ required PostgrestQueryBuilder queryBuilder, required String realtimeTopic, @@ -209,14 +212,15 @@ class SupabaseStreamBuilder extends Stream { .subscribe((status, [error]) { switch (status) { case RealtimeSubscribeStatus.subscribed: - // Get first data when realtime is subscribed and reload all data - // from postgrest if e.g. got a channel error and is resubscribed - _getPostgrestData(); + // Reload all data after a reconnect from postgrest + // First data from postgrest gets loaded before the realtime connect + if (_gotSubscribed) { + _getPostgrestData(); + } + _gotSubscribed = true; break; case RealtimeSubscribeStatus.closed: - if (!(_streamController?.isClosed ?? true)) { - _streamController?.close(); - } + _streamController?.close(); break; case RealtimeSubscribeStatus.timedOut: _addException(RealtimeSubscribeException(status, error)); @@ -226,6 +230,7 @@ class SupabaseStreamBuilder extends Stream { break; } }); + _getPostgrestData(); } Future _getPostgrestData() async { @@ -271,6 +276,10 @@ class SupabaseStreamBuilder extends Stream { _addStream(); } catch (error, stackTrace) { _addException(error, stackTrace); + // In case the postgrest call fails, there is no need to keep the + // realtime connection open + _channel?.unsubscribe(); + _streamController?.close(); } } From 0bdc6fa0056c95562e439bf64dad2efdcac8aa48 Mon Sep 17 00:00:00 2001 From: Vinzent Date: Mon, 2 Sep 2024 14:07:30 +0200 Subject: [PATCH 08/16] test: restore delay and expect access_token instead of user_token --- packages/realtime_client/test/socket_test.dart | 2 +- packages/supabase/test/mock_test.dart | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/realtime_client/test/socket_test.dart b/packages/realtime_client/test/socket_test.dart index a5463ab3..331cde23 100644 --- a/packages/realtime_client/test/socket_test.dart +++ b/packages/realtime_client/test/socket_test.dart @@ -423,7 +423,7 @@ void main() { }); group('setAuth', () { - final updateJoinPayload = {'user_token': 'token123'}; + final updateJoinPayload = {'acess_token': 'token123'}; final pushPayload = {'access_token': 'token123'}; test( diff --git a/packages/supabase/test/mock_test.dart b/packages/supabase/test/mock_test.dart index 65f1bac9..6c458c5f 100644 --- a/packages/supabase/test/mock_test.dart +++ b/packages/supabase/test/mock_test.dart @@ -165,7 +165,7 @@ void main() { webSocket!.add(replyString); // Send an insert event - await Future.delayed(Duration(milliseconds: 100)); + await Future.delayed(Duration(milliseconds: 10)); final insertString = jsonEncode({ 'topic': topic, 'event': 'postgres_changes', From fd736189357b83163d65c352caf08bfab7167b8c Mon Sep 17 00:00:00 2001 From: Vinzent Date: Mon, 2 Sep 2024 14:11:25 +0200 Subject: [PATCH 09/16] test: fix typo --- packages/realtime_client/test/socket_test.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/realtime_client/test/socket_test.dart b/packages/realtime_client/test/socket_test.dart index 331cde23..8a61d0c9 100644 --- a/packages/realtime_client/test/socket_test.dart +++ b/packages/realtime_client/test/socket_test.dart @@ -423,7 +423,7 @@ void main() { }); group('setAuth', () { - final updateJoinPayload = {'acess_token': 'token123'}; + final updateJoinPayload = {'access_token': 'token123'}; final pushPayload = {'access_token': 'token123'}; test( From aefd638f0394959770fd425d6234a83546669142 Mon Sep 17 00:00:00 2001 From: Vinzent Date: Mon, 2 Sep 2024 14:25:48 +0200 Subject: [PATCH 10/16] refactor: small rename --- packages/supabase/lib/src/supabase_stream_builder.dart | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/supabase/lib/src/supabase_stream_builder.dart b/packages/supabase/lib/src/supabase_stream_builder.dart index 292605bc..5e0aac6a 100644 --- a/packages/supabase/lib/src/supabase_stream_builder.dart +++ b/packages/supabase/lib/src/supabase_stream_builder.dart @@ -76,8 +76,8 @@ class SupabaseStreamBuilder extends Stream { /// Count of record to be returned int? _limit; - /// Flag if the stream has at least one time been subscribed to realtime - bool _gotSubscribed = false; + /// Flag that the stream has at least one time been subscribed to realtime + bool _wasSubscribed = false; SupabaseStreamBuilder({ required PostgrestQueryBuilder queryBuilder, @@ -214,10 +214,10 @@ class SupabaseStreamBuilder extends Stream { case RealtimeSubscribeStatus.subscribed: // Reload all data after a reconnect from postgrest // First data from postgrest gets loaded before the realtime connect - if (_gotSubscribed) { + if (_wasSubscribed) { _getPostgrestData(); } - _gotSubscribed = true; + _wasSubscribed = true; break; case RealtimeSubscribeStatus.closed: _streamController?.close(); From 8e090c97c4f034fe1db36c279c2cdd4101406a78 Mon Sep 17 00:00:00 2001 From: Vinzent Date: Mon, 16 Sep 2024 20:09:46 +0200 Subject: [PATCH 11/16] fix: wait for conn being ready and re-add error to _triggerChanError --- .../lib/src/realtime_client.dart | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/packages/realtime_client/lib/src/realtime_client.dart b/packages/realtime_client/lib/src/realtime_client.dart index 9aee37a1..5529029e 100644 --- a/packages/realtime_client/lib/src/realtime_client.dart +++ b/packages/realtime_client/lib/src/realtime_client.dart @@ -158,8 +158,13 @@ class RealtimeClient { connState = SocketStates.connecting; conn = transport(endPointURL, headers); - // handle connection errors - conn!.ready.catchError(_onConnError); + try { + await conn!.ready; + } catch (error) { + _onConnError(error); + reconnectTimer.scheduleTimeout(); + return; + } connState = SocketStates.open; @@ -187,17 +192,20 @@ class RealtimeClient { void disconnect({int? code, String? reason}) { final conn = this.conn; if (conn != null) { + final connectionWasOpen = connState == SocketStates.open; connState = SocketStates.disconnected; - if (code != null) { - conn.sink.close(code, reason ?? ''); - } else { - conn.sink.close(); + if (connectionWasOpen) { + if (code != null) { + conn.sink.close(code, reason ?? ''); + } else { + conn.sink.close(); + } + reconnectTimer.reset(); } this.conn = null; // remove open handles if (heartbeatTimer != null) heartbeatTimer?.cancel(); - reconnectTimer.reset(); } } @@ -404,7 +412,7 @@ class RealtimeClient { /// SocketStates.disconnected: by user with socket.disconnect() /// SocketStates.closed: NOT by user, should try to reconnect if (connState == SocketStates.closed) { - _triggerChanError(); + _triggerChanError(event); reconnectTimer.scheduleTimeout(); } if (heartbeatTimer != null) heartbeatTimer!.cancel(); @@ -415,15 +423,15 @@ class RealtimeClient { void _onConnError(dynamic error) { log('transport', error.toString()); - _triggerChanError(); + _triggerChanError(error); for (final callback in stateChangeCallbacks['error']!) { callback(error); } } - void _triggerChanError() { + void _triggerChanError([dynamic error]) { for (final channel in channels) { - channel.trigger(ChannelEvents.error.eventName()); + channel.trigger(ChannelEvents.error.eventName(), error); } } From 463fdea3192aa659296d50f9c15b77877d058220 Mon Sep 17 00:00:00 2001 From: Vinzent Date: Mon, 16 Sep 2024 21:19:51 +0200 Subject: [PATCH 12/16] fix: don't stringify errors and fix tests --- packages/realtime_client/lib/src/realtime_channel.dart | 6 +++--- packages/realtime_client/lib/src/realtime_client.dart | 2 +- packages/realtime_client/test/mock_test.dart | 5 ++++- packages/realtime_client/test/socket_test.dart | 6 +++++- 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/packages/realtime_client/lib/src/realtime_channel.dart b/packages/realtime_client/lib/src/realtime_channel.dart index 42a2470c..3212eb2d 100644 --- a/packages/realtime_client/lib/src/realtime_channel.dart +++ b/packages/realtime_client/lib/src/realtime_channel.dart @@ -70,7 +70,7 @@ class RealtimeChannel { socket.remove(this); }); - _onError((String? reason) { + _onError((reason) { if (isLeaving || isClosed) { return; } @@ -260,9 +260,9 @@ class RealtimeChannel { } /// Registers a callback that will be executed when the channel encounteres an error. - void _onError(void Function(String?) callback) { + void _onError(Function callback) { onEvents(ChannelEvents.error.eventName(), ChannelFilter(), - (reason, [ref]) => callback(reason?.toString())); + (reason, [ref]) => callback(reason)); } /// Sets up a listener on your Supabase database. diff --git a/packages/realtime_client/lib/src/realtime_client.dart b/packages/realtime_client/lib/src/realtime_client.dart index 5529029e..de4ea76c 100644 --- a/packages/realtime_client/lib/src/realtime_client.dart +++ b/packages/realtime_client/lib/src/realtime_client.dart @@ -48,7 +48,7 @@ class RealtimeCloseEvent { @override String toString() { - return 'RealtimeCloseEvent{code: $code, reason: $reason}'; + return 'RealtimeCloseEvent(code: $code, reason: $reason)'; } } diff --git a/packages/realtime_client/test/mock_test.dart b/packages/realtime_client/test/mock_test.dart index ccd77089..81b3b64f 100644 --- a/packages/realtime_client/test/mock_test.dart +++ b/packages/realtime_client/test/mock_test.dart @@ -268,7 +268,9 @@ void main() { final subscribeCallback = expectAsync2((RealtimeSubscribeStatus event, error) { if (event == RealtimeSubscribeStatus.channelError) { - expect(error, isNull); + expect(error, isA()); + error as RealtimeCloseEvent; + expect(error.reason, "heartbeat timeout"); } else { expect(event, RealtimeSubscribeStatus.closed); } @@ -285,6 +287,7 @@ void main() { channel.subscribe(subscribeCallback); + await Future.delayed(Duration(milliseconds: 200)); await client.conn!.sink .close(Constants.wsCloseNormal, "heartbeat timeout"); }); diff --git a/packages/realtime_client/test/socket_test.dart b/packages/realtime_client/test/socket_test.dart index 8a61d0c9..84f9eb2f 100644 --- a/packages/realtime_client/test/socket_test.dart +++ b/packages/realtime_client/test/socket_test.dart @@ -171,6 +171,7 @@ void main() { }); socket.connect(); + await Future.delayed(const Duration(milliseconds: 200)); expect(opens, 1); socket.sendHeartbeat(); @@ -229,7 +230,7 @@ void main() { expect(closes, 1); }); - test('calls connection close callback', () { + test('calls connection close callback', () async { final mockedSocketChannel = MockIOWebSocketChannel(); final mockedSocket = RealtimeClient( socketEndpoint, @@ -247,7 +248,10 @@ void main() { const tReason = 'reason'; mockedSocket.connect(); + mockedSocket.connState = SocketStates.open; + await Future.delayed(const Duration(milliseconds: 200)); mockedSocket.disconnect(code: tCode, reason: tReason); + await Future.delayed(const Duration(milliseconds: 200)); verify( () => mockedSink.close( From 7a018dd72dfe3450724736c03c41dc8ae86b1ac9 Mon Sep 17 00:00:00 2001 From: Vinzent Date: Mon, 16 Sep 2024 21:24:40 +0200 Subject: [PATCH 13/16] test: close conn from server --- packages/realtime_client/test/mock_test.dart | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/realtime_client/test/mock_test.dart b/packages/realtime_client/test/mock_test.dart index 81b3b64f..89455fb7 100644 --- a/packages/realtime_client/test/mock_test.dart +++ b/packages/realtime_client/test/mock_test.dart @@ -288,8 +288,7 @@ void main() { channel.subscribe(subscribeCallback); await Future.delayed(Duration(milliseconds: 200)); - await client.conn!.sink - .close(Constants.wsCloseNormal, "heartbeat timeout"); + await webSocket?.close(Constants.wsCloseNormal, "heartbeat timeout"); }); }); From a069db62d1f27776caba1519f46af30a1220d820 Mon Sep 17 00:00:00 2001 From: Vinzent Date: Wed, 18 Sep 2024 17:37:33 +0200 Subject: [PATCH 14/16] fix: disconnect when in background and await connecting to be ready --- .../realtime_client/lib/realtime_client.dart | 3 +- .../realtime_client/lib/src/constants.dart | 4 +- .../lib/src/realtime_channel.dart | 13 ++++ .../lib/src/realtime_client.dart | 51 ++++++++++----- .../realtime_client/test/socket_test.dart | 4 +- .../lib/src/supabase_auth.dart | 64 ++++++++++++++++--- 6 files changed, 109 insertions(+), 30 deletions(-) diff --git a/packages/realtime_client/lib/realtime_client.dart b/packages/realtime_client/lib/realtime_client.dart index 660dd2f6..84f158c5 100644 --- a/packages/realtime_client/lib/realtime_client.dart +++ b/packages/realtime_client/lib/realtime_client.dart @@ -1,4 +1,5 @@ -export 'src/constants.dart' show RealtimeConstants, RealtimeLogLevel; +export 'src/constants.dart' + show RealtimeConstants, RealtimeLogLevel, SocketStates; export 'src/realtime_channel.dart'; export 'src/realtime_client.dart'; export 'src/realtime_presence.dart'; diff --git a/packages/realtime_client/lib/src/constants.dart b/packages/realtime_client/lib/src/constants.dart index 1d14f9d6..1f9e0b87 100644 --- a/packages/realtime_client/lib/src/constants.dart +++ b/packages/realtime_client/lib/src/constants.dart @@ -18,8 +18,8 @@ enum SocketStates { /// Connection is live and connected open, - /// Socket is closing. - closing, + /// Socket is closing by the user + disconnecting, /// Socket being close not by the user. Realtime should attempt to reconnect. closed, diff --git a/packages/realtime_client/lib/src/realtime_channel.dart b/packages/realtime_client/lib/src/realtime_channel.dart index 3212eb2d..30ab04a2 100644 --- a/packages/realtime_client/lib/src/realtime_channel.dart +++ b/packages/realtime_client/lib/src/realtime_channel.dart @@ -646,6 +646,19 @@ class RealtimeChannel { joinPush.resend(timeout ?? _timeout); } + /// Usually a rejoin only happens when the channel timeouts or errors out. + /// When manually disconnecting, the channel is still marked as + /// [ChannelStates.joined]. Calling [RealtimeClient.leaveOpenTopic] will + /// unsubscribe itself, which causes issues when trying to rejoin. This method + /// therefore doesn't call [RealtimeClient.leaveOpenTopic]. + void forceRejoin([Duration? timeout]) { + if (isLeaving) { + return; + } + _state = ChannelStates.joining; + joinPush.resend(timeout ?? _timeout); + } + void trigger(String type, [dynamic payload, String? ref]) { final typeLower = type.toLowerCase(); diff --git a/packages/realtime_client/lib/src/realtime_client.dart b/packages/realtime_client/lib/src/realtime_client.dart index de4ea76c..fe7eefd3 100644 --- a/packages/realtime_client/lib/src/realtime_client.dart +++ b/packages/realtime_client/lib/src/realtime_client.dart @@ -139,9 +139,9 @@ class RealtimeClient { (String payload, Function(dynamic result) callback) => callback(json.decode(payload)); reconnectTimer = RetryTimer( - () { - disconnect(); - connect(); + () async { + await disconnect(); + await connect(); }, this.reconnectAfterMs, ); @@ -149,7 +149,7 @@ class RealtimeClient { /// Connects the socket. @internal - void connect() async { + Future connect() async { if (conn != null) { return; } @@ -161,8 +161,15 @@ class RealtimeClient { try { await conn!.ready; } catch (error) { - _onConnError(error); - reconnectTimer.scheduleTimeout(); + // Don't schedule a reconnect and emit error if connection has been + // closed by the user or [disconnect] waits for the connection to be + // ready before closing it. + if (connState != SocketStates.disconnected && + connState != SocketStates.disconnecting) { + connState = SocketStates.closed; + _onConnError(error); + reconnectTimer.scheduleTimeout(); + } return; } @@ -176,7 +183,8 @@ class RealtimeClient { onError: _onConnError, onDone: () { // communication has been closed - if (connState != SocketStates.disconnected) { + if (connState != SocketStates.disconnected && + connState != SocketStates.disconnecting) { connState = SocketStates.closed; } _onConnClose(); @@ -189,17 +197,26 @@ class RealtimeClient { } /// Disconnects the socket with status [code] and [reason] for the disconnect - void disconnect({int? code, String? reason}) { + Future disconnect({int? code, String? reason}) async { final conn = this.conn; if (conn != null) { - final connectionWasOpen = connState == SocketStates.open; - connState = SocketStates.disconnected; - if (connectionWasOpen) { + final oldState = connState; + connState = SocketStates.disconnecting; + + // Connection cannot be closed while it's still connecting. Wait for connection to + // be ready and then close it. + if (oldState == SocketStates.connecting) { + await conn.ready.catchError((_) {}); + } + + if (oldState == SocketStates.open || + oldState == SocketStates.connecting) { if (code != null) { - conn.sink.close(code, reason ?? ''); + await conn.sink.close(code, reason ?? ''); } else { - conn.sink.close(); + await conn.sink.close(); } + connState = SocketStates.disconnected; reconnectTimer.reset(); } this.conn = null; @@ -264,8 +281,8 @@ class RealtimeClient { return 'connecting'; case SocketStates.open: return 'open'; - case SocketStates.closing: - return 'closing'; + case SocketStates.disconnecting: + return 'disconnecting'; case SocketStates.disconnected: return 'disconnected'; case SocketStates.closed: @@ -275,7 +292,7 @@ class RealtimeClient { } /// Retuns `true` is the connection is open. - bool get isConnected => connectionState == 'open'; + bool get isConnected => connState == SocketStates.open; /// Removes a subscription from the socket. @internal @@ -374,7 +391,7 @@ class RealtimeClient { } } - /// Unsubscribe from channels with the specified topic. + /// Unsubscribe from joined or joining channels with the specified topic. @internal void leaveOpenTopic(String topic) { final dupChannel = channels.firstWhereOrNull( diff --git a/packages/realtime_client/test/socket_test.dart b/packages/realtime_client/test/socket_test.dart index 84f9eb2f..79fe1306 100644 --- a/packages/realtime_client/test/socket_test.dart +++ b/packages/realtime_client/test/socket_test.dart @@ -215,8 +215,8 @@ void main() { }); test('removes existing connection', () async { - socket.connect(); - socket.disconnect(); + await socket.connect(); + await socket.disconnect(); expect(socket.conn, null); }); diff --git a/packages/supabase_flutter/lib/src/supabase_auth.dart b/packages/supabase_flutter/lib/src/supabase_auth.dart index fe401ec5..3808636a 100644 --- a/packages/supabase_flutter/lib/src/supabase_auth.dart +++ b/packages/supabase_flutter/lib/src/supabase_auth.dart @@ -4,6 +4,7 @@ import 'dart:io' show Platform; import 'dart:math'; import 'package:app_links/app_links.dart'; +import 'package:async/async.dart'; import 'package:flutter/foundation.dart' show kIsWeb; import 'package:flutter/material.dart'; import 'package:flutter/services.dart'; @@ -29,6 +30,8 @@ class SupabaseAuth with WidgetsBindingObserver { StreamSubscription? _deeplinkSubscription; + CancelableOperation? _realtimeReconnectOperation; + final _appLinks = AppLinks(); /// - Obtains session from local storage and sets it as the current session @@ -113,22 +116,67 @@ class SupabaseAuth with WidgetsBindingObserver { void didChangeAppLifecycleState(AppLifecycleState state) { switch (state) { case AppLifecycleState.resumed: - if (_autoRefreshToken) { - Supabase.instance.client.auth.startAutoRefresh(); - } + onResumed(); case AppLifecycleState.detached: - case AppLifecycleState.inactive: case AppLifecycleState.paused: - // Realtime channels are kept alive in the background for some amount - // of time after the app is paused. If we stop refreshing the token - // here, the channels will be closed. - if (Supabase.instance.client.realtime.getChannels().isEmpty) { + if (kIsWeb || Platform.isAndroid || Platform.isIOS) { Supabase.instance.client.auth.stopAutoRefresh(); + _realtimeReconnectOperation?.cancel(); + Supabase.instance.client.realtime.disconnect(); } default: } } + Future onResumed() async { + if (_autoRefreshToken) { + Supabase.instance.client.auth.startAutoRefresh(); + } + final realtime = Supabase.instance.client.realtime; + if (realtime.channels.isNotEmpty) { + if (realtime.connState == SocketStates.disconnecting) { + // If the socket is still disconnecting from e.g. + // [AppLifecycleState.paused] we should wait for it to finish before + // reconnecting. + + bool cancel = false; + final connectFuture = realtime.conn!.sink.done.then( + (_) { + // Make this connect cancelable so that it does not connect if the + // disconnect took so long that the app is already in background + // again. + + // ignore: invalid_use_of_internal_member + if (!cancel) return realtime.connect(); + }, + onError: (error) {}, + ); + _realtimeReconnectOperation = CancelableOperation.fromFuture( + connectFuture, + onCancel: () => cancel = true, + ); + } else if (!realtime.isConnected) { + // Reconnect if the socket is currently not connected. + // When coming from [AppLifecycleState.paused] this should be the case, + // but when coming from [AppLifecycleState.inactive] no disconnect + // happened and therefore connection should still be intanct and we + // should not reconnect. + + // ignore: invalid_use_of_internal_member + await realtime.connect(); + for (final channel in realtime.channels) { + // Only rejoin channels that think they are still joined and not + // which were manually unsubscribed by the user while in background + + // ignore: invalid_use_of_internal_member + if (channel.isJoined) { + channel.forceRejoin(); + } + } + } + } + } + void _onAuthStateChange(AuthChangeEvent event, Session? session) { Supabase.instance.log('**** onAuthStateChange: $event'); if (session != null) { From 2c81776bbeac40158247d57786b94bfea0a33210 Mon Sep 17 00:00:00 2001 From: Vinzent Date: Thu, 19 Sep 2024 00:45:47 +0200 Subject: [PATCH 15/16] fix: rejoin channels in edge case --- .../supabase_flutter/lib/src/supabase_auth.dart | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/packages/supabase_flutter/lib/src/supabase_auth.dart b/packages/supabase_flutter/lib/src/supabase_auth.dart index 3808636a..80cf6248 100644 --- a/packages/supabase_flutter/lib/src/supabase_auth.dart +++ b/packages/supabase_flutter/lib/src/supabase_auth.dart @@ -141,13 +141,21 @@ class SupabaseAuth with WidgetsBindingObserver { bool cancel = false; final connectFuture = realtime.conn!.sink.done.then( - (_) { + (_) async { // Make this connect cancelable so that it does not connect if the // disconnect took so long that the app is already in background // again. - // ignore: invalid_use_of_internal_member - if (!cancel) return realtime.connect(); + if (!cancel) { + // ignore: invalid_use_of_internal_member + await realtime.connect(); + for (final channel in realtime.channels) { + // ignore: invalid_use_of_internal_member + if (channel.isJoined) { + channel.forceRejoin(); + } + } + } }, onError: (error) {}, ); From 6de5b1ad36c443748ff72a9b85ad4506e4a72eea Mon Sep 17 00:00:00 2001 From: Vinzent Date: Thu, 26 Sep 2024 14:54:14 +0200 Subject: [PATCH 16/16] docs: improve method comments --- packages/realtime_client/lib/src/realtime_channel.dart | 6 +++++- packages/supabase/lib/src/supabase_query_builder.dart | 9 +++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/packages/realtime_client/lib/src/realtime_channel.dart b/packages/realtime_client/lib/src/realtime_channel.dart index 30ab04a2..7c37d800 100644 --- a/packages/realtime_client/lib/src/realtime_channel.dart +++ b/packages/realtime_client/lib/src/realtime_channel.dart @@ -646,11 +646,15 @@ class RealtimeChannel { joinPush.resend(timeout ?? _timeout); } - /// Usually a rejoin only happens when the channel timeouts or errors out. + /// Resends [joinPush] to tell the server we join this channel again and marks + /// the channel as [ChannelStates.joining]. + /// + /// Usually [rejoin] only happens when the channel timeouts or errors out. /// When manually disconnecting, the channel is still marked as /// [ChannelStates.joined]. Calling [RealtimeClient.leaveOpenTopic] will /// unsubscribe itself, which causes issues when trying to rejoin. This method /// therefore doesn't call [RealtimeClient.leaveOpenTopic]. + @internal void forceRejoin([Duration? timeout]) { if (isLeaving) { return; diff --git a/packages/supabase/lib/src/supabase_query_builder.dart b/packages/supabase/lib/src/supabase_query_builder.dart index 1c9b4bcb..dd31713a 100644 --- a/packages/supabase/lib/src/supabase_query_builder.dart +++ b/packages/supabase/lib/src/supabase_query_builder.dart @@ -23,11 +23,16 @@ class SupabaseQueryBuilder extends PostgrestQueryBuilder { url: Uri.parse(url), ); - /// Returns real-time data from your table as a `Stream`. + /// Combines the current state of your table from PostgREST with changes from the realtime server to return real-time data from your table as a [Stream]. /// /// Realtime is disabled by default for new tables. You can turn it on by [managing replication](https://supabase.com/docs/guides/realtime/extensions/postgres-changes#replication-setup). /// - /// Pass the list of primary key column names to [primaryKey], which will be used to updating and deleting the proper records internally as the library receives real-time updates. + /// Pass the list of primary key column names to [primaryKey], which will be used to update and delete the proper records internally as the stream receives real-time updates. + /// + /// It handles the lifecycle of the realtime connection and automatically refetches data from PostgREST when needed. + /// + /// Make sure to provide `onError` and `onDone` callbacks to [Stream.listen] to handle errors and completion of the stream. + /// The stream gets closed when the realtime connection is closed. /// /// ```dart /// supabase.from('chats').stream(primaryKey: ['id']).listen(_onChatsReceived);