Skip to content

Commit 8305fef

Browse files
dshukertjrbdlukaa
andauthored
feat(realtime_client): Add support for authorized realtime channels with broadcast and presence. (#970)
* Add private channel support * remove unused variables * update test * chore: add comment to _private in realtime_channel * Update packages/realtime_client/lib/src/realtime_channel.dart Co-authored-by: Bruno D'Luka <45696119+bdlukaa@users.noreply.github.com> * Update packages/realtime_client/lib/src/realtime_channel.dart Co-authored-by: Bruno D'Luka <45696119+bdlukaa@users.noreply.github.com> * format code --------- Co-authored-by: Bruno D'Luka <45696119+bdlukaa@users.noreply.github.com>
1 parent 74747f0 commit 8305fef

File tree

7 files changed

+81
-17
lines changed

7 files changed

+81
-17
lines changed

packages/realtime_client/lib/src/realtime_channel.dart

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ class RealtimeChannel {
3232
@internal
3333
final RealtimeClient socket;
3434

35+
/// Defines if the channel is private or not and if RLS policies will be used to check data
36+
late final bool _private;
37+
3538
RealtimeChannel(
3639
this.topic,
3740
this.socket, {
@@ -40,7 +43,8 @@ class RealtimeChannel {
4043
params = params.toMap(),
4144
subTopic = topic.replaceFirst(
4245
RegExp(r"^realtime:", caseSensitive: false), "") {
43-
broadcastEndpointURL = _broadcastEndpointURL;
46+
broadcastEndpointURL = '${httpEndpointURL(socket.endPoint)}/api/broadcast';
47+
_private = params.private;
4448

4549
joinPush = Push(
4650
this,
@@ -117,6 +121,7 @@ class RealtimeChannel {
117121
} else {
118122
final broadcast = params['config']['broadcast'];
119123
final presence = params['config']['presence'];
124+
final isPrivate = params['config']['private'];
120125

121126
_onError((e) {
122127
if (callback != null) callback(RealtimeSubscribeStatus.channelError, e);
@@ -131,6 +136,7 @@ class RealtimeChannel {
131136
'presence': presence,
132137
'postgres_changes':
133138
_bindings['postgres_changes']?.map((r) => r.filter).toList() ?? [],
139+
'private': isPrivate == true,
134140
};
135141

136142
if (socket.accessToken != null) {
@@ -483,13 +489,20 @@ class RealtimeChannel {
483489
}
484490

485491
if (!canPush && type == RealtimeListenTypes.broadcast) {
486-
final headers = {'Content-Type': 'application/json', ...socket.headers};
492+
final headers = <String, String>{
493+
'Content-Type': 'application/json',
494+
if (socket.params['apikey'] != null) 'apikey': socket.params['apikey']!,
495+
...socket.headers,
496+
if (socket.accessToken != null)
497+
'Authorization': 'Bearer ${socket.accessToken}',
498+
};
487499
final body = {
488500
'messages': [
489501
{
490502
'topic': subTopic,
491503
'payload': payload,
492504
'event': event,
505+
'private': _private,
493506
}
494507
]
495508
};
@@ -595,18 +608,6 @@ class RealtimeChannel {
595608
return completer.future;
596609
}
597610

598-
String get _broadcastEndpointURL {
599-
var url = socket.endPoint;
600-
url = url.replaceFirst(RegExp(r'^ws', caseSensitive: false), 'http');
601-
url = url.replaceAll(
602-
RegExp(r'(/socket/websocket|/socket|/websocket)/?$',
603-
caseSensitive: false),
604-
'',
605-
);
606-
url = '${url.replaceAll(RegExp(r'/+$'), '')}/api/broadcast';
607-
return url;
608-
}
609-
610611
/// Overridable message hook
611612
///
612613
/// Receives all events for specialized message handling before dispatching to the channel callbacks.

packages/realtime_client/lib/src/realtime_client.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class RealtimeClient {
5151
String? accessToken;
5252
List<RealtimeChannel> channels = [];
5353
final String endPoint;
54+
5455
final Map<String, String> headers;
5556
final Map<String, dynamic> params;
5657
final Duration timeout;
@@ -85,6 +86,7 @@ class RealtimeClient {
8586
/// Initializes the Socket
8687
///
8788
/// `endPoint` The string WebSocket endpoint, ie, "ws://example.com/socket", "wss://example.com", "/socket" (inherited host & protocol)
89+
/// `httpEndpoint` The string HTTP endpoint, ie, "https://example.com", "/" (inherited host & protocol)
8890
/// `transport` The Websocket Transport, for example WebSocket.
8991
/// `timeout` The default timeout in milliseconds to trigger push timeouts.
9092
/// `params` The optional params to pass when connecting.

packages/realtime_client/lib/src/transformers.dart

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,3 +332,20 @@ Map<String, Map<String, dynamic>> getPayloadRecords(
332332

333333
return records;
334334
}
335+
336+
/// Converts a WebSocket URL to an HTTP URL.
337+
String httpEndpointURL(String socketUrl) {
338+
var url = socketUrl;
339+
340+
// Replace 'ws' or 'wss' with 'http' or 'https' respectively
341+
url = url.replaceFirst(RegExp(r'^ws', caseSensitive: false), 'http');
342+
343+
// Remove WebSocket-specific endings
344+
url = url.replaceFirst(
345+
RegExp(r'(/socket/websocket|/socket|/websocket)/?$', caseSensitive: false),
346+
'',
347+
);
348+
349+
// Remove trailing slashes
350+
return url.replaceAll(RegExp(r'/+$'), '');
351+
}

packages/realtime_client/lib/src/types.dart

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,14 @@ class RealtimeChannelConfig {
148148
/// [key] option is used to track presence payload across clients
149149
final String key;
150150

151+
/// Defines if the channel is private or not and if RLS policies will be used to check data
152+
final bool private;
153+
151154
const RealtimeChannelConfig({
152155
this.ack = false,
153156
this.self = false,
154157
this.key = '',
158+
this.private = false,
155159
});
156160

157161
Map<String, dynamic> toMap() {
@@ -164,6 +168,7 @@ class RealtimeChannelConfig {
164168
'presence': {
165169
'key': key,
166170
},
171+
'private': private,
167172
}
168173
};
169174
}

packages/realtime_client/test/channel_test.dart

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import 'dart:io';
44

55
import 'package:realtime_client/realtime_client.dart';
66
import 'package:realtime_client/src/constants.dart';
7+
import 'package:realtime_client/src/push.dart';
78
import 'package:realtime_client/src/types.dart';
89
import 'package:test/test.dart';
910

@@ -33,11 +34,31 @@ void main() {
3334
expect(channel.params, {
3435
'config': {
3536
'broadcast': {'ack': false, 'self': false},
36-
'presence': {'key': ''}
37+
'presence': {'key': ''},
38+
'private': false,
3739
}
3840
});
3941
expect(channel.socket, socket);
4042
});
43+
44+
test('sets up joinPush object with private defined', () {
45+
channel = RealtimeChannel(
46+
'topic',
47+
socket,
48+
params: RealtimeChannelConfig(
49+
private: true,
50+
),
51+
);
52+
final Push joinPush = channel.joinPush;
53+
54+
expect(joinPush.payload, {
55+
'config': {
56+
'broadcast': {'ack': false, 'self': false},
57+
'presence': {'key': ''},
58+
'private': true,
59+
},
60+
});
61+
});
4162
});
4263

4364
group('join', () {
@@ -252,7 +273,7 @@ void main() {
252273
params: {'apikey': 'supabaseKey'},
253274
);
254275

255-
channel = socket.channel('myTopic');
276+
channel = socket.channel('myTopic', RealtimeChannelConfig(private: true));
256277
});
257278

258279
tearDown(() async {
@@ -311,9 +332,11 @@ void main() {
311332
final body = json.decode(await utf8.decodeStream(req));
312333
final message = body['messages'][0];
313334
final payload = message['payload'];
335+
final private = message['private'];
314336

315337
expect(payload, containsPair('myKey', 'myValue'));
316338
expect(message, containsPair('topic', 'myTopic'));
339+
expect(private, true);
317340

318341
await req.response.close();
319342
break;

packages/realtime_client/test/socket_test.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,8 @@ void main() {
287287
expect(channel.params, {
288288
'config': {
289289
'broadcast': {'ack': false, 'self': false},
290-
'presence': {'key': ''}
290+
'presence': {'key': ''},
291+
'private': false,
291292
}
292293
});
293294
});

packages/realtime_client/test/transformers_test.dart

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,4 +234,19 @@ void main() {
234234
expect(enrichedPayload, expectedMap);
235235
});
236236
});
237+
238+
group('httpEndpointURL', () {
239+
test('Converts a hosted Supabase WS URL', () {
240+
expect(
241+
httpEndpointURL('wss://example.supabase.co/realtime/v1'),
242+
equals('https://example.supabase.co/realtime/v1'),
243+
);
244+
});
245+
test('Converts a custom domain WS URL', () {
246+
expect(
247+
httpEndpointURL('wss://custom-domain.com/realtime/v1'),
248+
equals('https://custom-domain.com/realtime/v1'),
249+
);
250+
});
251+
});
237252
}

0 commit comments

Comments
 (0)