Skip to content

Commit

Permalink
Initialize event channels early
Browse files Browse the repository at this point in the history
  • Loading branch information
werediver committed Nov 10, 2020
1 parent ff81239 commit 6c01074
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 71 deletions.
2 changes: 1 addition & 1 deletion ios/Classes/ReactiveBle/Central.swift
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ final class Central {
return "A peripheral \(peripheralID.uuidString) is unknown (make sure it has been discovered)"
case .peripheralIsNotConnected(let peripheralID):
return "The peripheral \(peripheralID.uuidString) is not connected"
case .serviceNotFound(let peripheralID, let serviceID):
case .serviceNotFound(let serviceID, let peripheralID):
return "A service \(serviceID) is not found in the peripheral \(peripheralID) (make sure it has been discovered)"
case .characteristicNotFound(let qualifiedCharacteristic):
return "A characteristic \(qualifiedCharacteristic.id) is not found in the service \(qualifiedCharacteristic.serviceID) of the peripheral \(qualifiedCharacteristic.peripheralID) (make sure it has been discovered)"
Expand Down
4 changes: 2 additions & 2 deletions lib/src/connected_device_operation.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ConnectedDeviceOperation {

Stream<List<int>> subscribeToCharacteristic(
QualifiedCharacteristic characteristic,
Future<ConnectionStateUpdate> shouldTerminate,
Future<void> isDisconnected,
) {
final specificCharacteristicValueStream = characteristicValueStream
.where((update) => update.characteristic == characteristic)
Expand All @@ -59,7 +59,7 @@ class ConnectedDeviceOperation {
print("Error unsubscribing from notifications: $e")),
);

shouldTerminate.then<void>((_) => autosubscribingRepeater.dispose());
isDisconnected.then<void>((_) => autosubscribingRepeater.dispose());

return autosubscribingRepeater.stream;
}
Expand Down
68 changes: 29 additions & 39 deletions lib/src/plugin_controller.dart
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ class PluginController {
@required ArgsToProtobufConverter argsToProtobufConverter,
@required ProtobufConverter protobufConverter,
@required MethodChannel bleMethodChannel,
@required EventChannel connectedDeviceChannel,
@required EventChannel charUpdateChannel,
@required EventChannel bleDeviceScanChannel,
@required EventChannel bleStatusChannel,
@required Stream<List<int>> connectedDeviceChannel,
@required Stream<List<int>> charUpdateChannel,
@required Stream<List<int>> bleDeviceScanChannel,
@required Stream<List<int>> bleStatusChannel,
@required DebugLogger debugLogger,
}) : assert(argsToProtobufConverter != null),
assert(protobufConverter != null),
Expand All @@ -35,30 +35,28 @@ class PluginController {
_argsToProtobufConverter = argsToProtobufConverter,
_protobufConverter = protobufConverter,
_bleMethodChannel = bleMethodChannel,
_connectedDeviceChannel = connectedDeviceChannel,
_charUpdateChannel = charUpdateChannel,
_bleStatusChannel = bleStatusChannel,
_bleDeviceScanChannel = bleDeviceScanChannel,
_connectedDeviceRawStream = connectedDeviceChannel,
_charUpdateRawStream = charUpdateChannel,
_bleStatusRawChannel = bleStatusChannel,
_bleDeviceScanRawStream = bleDeviceScanChannel,
_debugLogger = debugLogger;

final ArgsToProtobufConverter _argsToProtobufConverter;
final ProtobufConverter _protobufConverter;
final MethodChannel _bleMethodChannel;
final EventChannel _connectedDeviceChannel;
final EventChannel _charUpdateChannel;
final EventChannel _bleDeviceScanChannel;
final EventChannel _bleStatusChannel;
final Stream<List<int>> _connectedDeviceRawStream;
final Stream<List<int>> _charUpdateRawStream;
final Stream<List<int>> _bleDeviceScanRawStream;
final Stream<List<int>> _bleStatusRawChannel;
final DebugLogger _debugLogger;

Stream<ConnectionStateUpdate> _connectionUpdateEventChannelStream;
Stream<CharacteristicValue> _charValueEventChannelStream;
Stream<ScanResult> _scanResultEventChannelStream;
Stream<ConnectionStateUpdate> _connectionUpdateStream;
Stream<CharacteristicValue> _charValueStream;
Stream<ScanResult> _scanResultStream;
Stream<BleStatus> _bleStatusStream;

Stream<ConnectionStateUpdate> get connectionUpdateStream =>
_connectionUpdateEventChannelStream ??= _connectedDeviceChannel
.receiveBroadcastStream()
.cast<List<int>>()
_connectionUpdateStream ??= _connectedDeviceRawStream
.map(_protobufConverter.connectionStateUpdateFrom)
.map(
(update) {
Expand All @@ -70,9 +68,7 @@ class PluginController {
);

Stream<CharacteristicValue> get charValueUpdateStream =>
_charValueEventChannelStream ??= _charUpdateChannel
.receiveBroadcastStream()
.cast<List<int>>()
_charValueStream ??= _charUpdateRawStream
.map(_protobufConverter.characteristicValueFrom)
.map(
(update) {
Expand All @@ -82,14 +78,8 @@ class PluginController {
},
);

Stream<ScanResult> get scanStream =>
_scanResultEventChannelStream ??= _bleDeviceScanChannel
.receiveBroadcastStream()
.cast<List<int>>()
.map(
_protobufConverter.scanResultFrom,
)
.map(
Stream<ScanResult> get scanStream => _scanResultStream ??=
_bleDeviceScanRawStream.map(_protobufConverter.scanResultFrom).map(
(scanResult) {
_debugLogger
.log('Received $ScanResult(result: ${scanResult.result})');
Expand All @@ -98,12 +88,8 @@ class PluginController {
);

Stream<BleStatus> get bleStatusStream =>
_bleStatusStream ??= _bleStatusChannel
.receiveBroadcastStream()
.cast<List<int>>()
.map(
_protobufConverter.bleStatusFrom,
)
_bleStatusStream ??= _bleStatusRawChannel
.map(_protobufConverter.bleStatusFrom)
.map((status) {
_debugLogger.log('Received ble status update: $status');
return status;
Expand Down Expand Up @@ -313,10 +299,14 @@ class PluginControllerFactory {
protobufConverter: const ProtobufConverter(),
argsToProtobufConverter: const ArgsToProtobufConverter(),
bleMethodChannel: _bleMethodChannel,
connectedDeviceChannel: connectedDeviceChannel,
charUpdateChannel: charEventChannel,
bleDeviceScanChannel: scanEventChannel,
bleStatusChannel: bleStatusChannel,
connectedDeviceChannel:
connectedDeviceChannel.receiveBroadcastStream().cast<List<int>>(),
charUpdateChannel:
charEventChannel.receiveBroadcastStream().cast<List<int>>(),
bleDeviceScanChannel:
scanEventChannel.receiveBroadcastStream().cast<List<int>>(),
bleStatusChannel:
bleStatusChannel.receiveBroadcastStream().cast<List<int>>(),
debugLogger: logger,
);
}
Expand Down
11 changes: 6 additions & 5 deletions lib/src/reactive_ble.dart
Original file line number Diff line number Diff line change
Expand Up @@ -317,19 +317,20 @@ class FlutterReactiveBle {
///
/// This stream terminates automatically when the device is disconnected.
Stream<List<int>> subscribeToCharacteristic(
QualifiedCharacteristic characteristic) {
final terminateFuture = connectedDeviceStream
QualifiedCharacteristic characteristic,
) {
final isDisconnected = connectedDeviceStream
.where((update) =>
update.deviceId == characteristic.deviceId &&
(update.connectionState == DeviceConnectionState.disconnecting ||
update.connectionState == DeviceConnectionState.disconnected))
.firstWhere((_) => true,
orElse: () => throw NoBleCharacteristicDataReceived());
.cast<void>()
.firstWhere((_) => true, orElse: () {});

return initialize().asStream().asyncExpand(
(_) => _connectedDeviceOperator.subscribeToCharacteristic(
characteristic,
terminateFuture,
isDisconnected,
),
);
}
Expand Down
47 changes: 23 additions & 24 deletions test/plugin_controller_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,28 @@ void main() {
_MethodChannelMock _methodChannel;
_ArgsToProtobufConverterMock _argsConverter;
ProtobufConverter _protobufConverter;
_EventChannelMock _connectedDeviceChannel;
_EventChannelMock _argsChannel;
_EventChannelMock _scanChannel;
_EventChannelMock _statusChannel;
StreamController<List<int>> _connectedDeviceStreamController;
StreamController<List<int>> _argsStreamController;
StreamController<List<int>> _scanStreamController;
StreamController<List<int>> _statusStreamController;

setUp(() {
_argsConverter = _ArgsToProtobufConverterMock();
_methodChannel = _MethodChannelMock();
_protobufConverter = _ProtobufConverterMock();
_connectedDeviceChannel = _EventChannelMock();
_argsChannel = _EventChannelMock();
_scanChannel = _EventChannelMock();
_statusChannel = _EventChannelMock();
_connectedDeviceStreamController = StreamController();
_argsStreamController = StreamController();
_scanStreamController = StreamController();
_statusStreamController = StreamController();

_sut = PluginController(
argsToProtobufConverter: _argsConverter,
bleMethodChannel: _methodChannel,
protobufConverter: _protobufConverter,
connectedDeviceChannel: _connectedDeviceChannel,
charUpdateChannel: _argsChannel,
bleDeviceScanChannel: _scanChannel,
bleStatusChannel: _statusChannel,
connectedDeviceChannel: _connectedDeviceStreamController.stream,
charUpdateChannel: _argsStreamController.stream,
bleDeviceScanChannel: _scanStreamController.stream,
bleStatusChannel: _statusStreamController.stream,
debugLogger: _DebugLoggerMock(),
);
});
Expand Down Expand Up @@ -102,8 +102,8 @@ void main() {
Stream<ConnectionStateUpdate> result;

setUp(() {
when(_connectedDeviceChannel.receiveBroadcastStream()).thenAnswer(
(_) => Stream<dynamic>.fromIterable(<dynamic>[
_connectedDeviceStreamController.addStream(
Stream.fromIterable([
[1, 2, 3],
]),
);
Expand Down Expand Up @@ -132,8 +132,8 @@ void main() {
result: const Result.success([1]),
);

when(_argsChannel.receiveBroadcastStream()).thenAnswer(
(realInvocation) => Stream<List<int>>.fromIterable([
_argsStreamController.addStream(
Stream<List<int>>.fromIterable([
[0, 1]
]),
);
Expand Down Expand Up @@ -461,10 +461,11 @@ void main() {
setUp(() {
scanResult = ScanResult(result: Result.success(device));
when(_protobufConverter.scanResultFrom(any)).thenReturn(scanResult);
when(_scanChannel.receiveBroadcastStream())
.thenAnswer((_) => Stream<List<int>>.fromIterable([
[1]
]));
_scanStreamController.addStream(
Stream<List<int>>.fromIterable([
[1],
]),
);
result = _sut.scanStream;
});

Expand Down Expand Up @@ -556,8 +557,8 @@ void main() {
Stream<BleStatus> _bleStatusStream;

setUp(() {
when(_statusChannel.receiveBroadcastStream()).thenAnswer(
(_) => Stream<List<int>>.fromIterable([
_statusStreamController.addStream(
Stream<List<int>>.fromIterable([
[1],
[0]
]),
Expand Down Expand Up @@ -621,8 +622,6 @@ void main() {

class _MethodChannelMock extends Mock implements MethodChannel {}

class _EventChannelMock extends Mock implements EventChannel {}

class _ArgsToProtobufConverterMock extends Mock
implements ArgsToProtobufConverter {}

Expand Down

0 comments on commit 6c01074

Please sign in to comment.