Skip to content

Commit

Permalink
fix undeliverable exception due to issues in android threading
Browse files Browse the repository at this point in the history
It is very likely that the exception is delivered already
  • Loading branch information
remonh87 committed Jan 11, 2020
1 parent efc02aa commit 8f1c830
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.signify.hue.flutterreactiveble

import com.polidea.rxandroidble2.exceptions.BleException
import com.signify.hue.flutterreactiveble.ble.RequestConnectionPriorityFailed
import com.signify.hue.flutterreactiveble.channelhandlers.BleStatusHandler
import com.signify.hue.flutterreactiveble.channelhandlers.CharNotificationHandler
Expand All @@ -16,6 +17,8 @@ import io.flutter.plugin.common.MethodChannel.Result
import io.flutter.plugin.common.PluginRegistry
import io.reactivex.Single
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.exceptions.UndeliverableException
import io.reactivex.plugins.RxJavaPlugins
import timber.log.Timber
import java.util.UUID
import com.signify.hue.flutterreactiveble.ProtobufModel as pb
Expand Down Expand Up @@ -68,6 +71,19 @@ class PluginController {
deviceConnectionChannel.setStreamHandler(deviceConnectionHandler)
charNotificationChannel.setStreamHandler(charNotificationHandler)
bleStatusChannel.setStreamHandler(bleStatusHandler)

/*Workaround for issue undeliverable https://github.com/Polidea/RxAndroidBle/wiki/FAQ:-UndeliverableException
note that this not override the onError for the observable only the RXJAVA error handler like described in:
https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling
*/
RxJavaPlugins.setErrorHandler { throwable ->
if (throwable is UndeliverableException && throwable.cause is BleException) {
return@setErrorHandler // ignore BleExceptions as they were surely delivered at least once
}
// add other custom handlers if needed
@Suppress("TooGenericExceptionThrown")
throw RuntimeException("Unexpected Throwable in RxJavaPlugins error handler", throwable)
}
}

internal fun execute(call: MethodCall, result: Result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ import java.util.concurrent.TimeUnit
internal class DeviceConnector(
private val device: RxBleDevice,
private val connectionTimeout: Duration,
private val updateListeners: (update: com.signify.hue.flutterreactiveble.ble.ConnectionUpdate) -> Unit,
private val connectionQueue: com.signify.hue.flutterreactiveble.ble.ConnectionQueue
private val updateListeners: (update: ConnectionUpdate) -> Unit,
private val connectionQueue: ConnectionQueue
) {

companion object {
private const val minTimeMsBeforeDisconnectingIsAllowed = 200L
private const val delayMsAfterClearingCache = 300L
}

private val connectDeviceSubject = BehaviorSubject.create<com.signify.hue.flutterreactiveble.ble.EstablishConnectionResult>()
private val connectDeviceSubject = BehaviorSubject.create<EstablishConnectionResult>()

private var timestampEstablishConnection: Long = 0

Expand All @@ -40,17 +40,17 @@ internal class DeviceConnector(
connectDeviceSubject
}

private val currentConnection: com.signify.hue.flutterreactiveble.ble.EstablishConnectionResult?
private val currentConnection: EstablishConnectionResult?
get() = if (lazyConnection.isInitialized()) connection.value else null

internal val connection by lazyConnection

private val connectionStatusUpdates by lazy {
device.observeConnectionStateChanges()
.startWith(device.connectionState)
.map<com.signify.hue.flutterreactiveble.ble.ConnectionUpdate> { com.signify.hue.flutterreactiveble.ble.ConnectionUpdateSuccess(device.macAddress, it.toConnectionState().code) }
.map<ConnectionUpdate> { ConnectionUpdateSuccess(device.macAddress, it.toConnectionState().code) }
.onErrorReturn {
com.signify.hue.flutterreactiveble.ble.ConnectionUpdateError(device.macAddress, it.message
ConnectionUpdateError(device.macAddress, it.message
?: "Unknown error")
}
.subscribe(
Expand All @@ -70,8 +70,8 @@ internal class DeviceConnector(
in order to prevent Android from ignoring disconnects we add a delay when we try to
disconnect to quickly after establishing connection. https://issuetracker.google.com/issues/37121223
*/
if (diff < com.signify.hue.flutterreactiveble.ble.DeviceConnector.Companion.minTimeMsBeforeDisconnectingIsAllowed) {
Single.timer(com.signify.hue.flutterreactiveble.ble.DeviceConnector.Companion.minTimeMsBeforeDisconnectingIsAllowed - diff, TimeUnit.MILLISECONDS)
if (diff < DeviceConnector.Companion.minTimeMsBeforeDisconnectingIsAllowed) {
Single.timer(DeviceConnector.Companion.minTimeMsBeforeDisconnectingIsAllowed - diff, TimeUnit.MILLISECONDS)
.doFinally {
disposeSubscriptions()
}.subscribe()
Expand All @@ -92,20 +92,20 @@ internal class DeviceConnector(

val shouldNotTimeout = connectionTimeout.value <= 0L
connectionQueue.addToQueue(deviceId)
updateListeners(com.signify.hue.flutterreactiveble.ble.ConnectionUpdateSuccess(deviceId, ConnectionState.CONNECTING.code))
updateListeners(ConnectionUpdateSuccess(deviceId, ConnectionState.CONNECTING.code))

return waitUntilFirstOfQueue(deviceId)
.switchMap { queue ->
if (!queue.contains(deviceId)) {
Observable.just(com.signify.hue.flutterreactiveble.ble.EstablishConnectionFailure(deviceId,
Observable.just(EstablishConnectionFailure(deviceId,
"Device is not in queue"))
} else {
connectDevice(rxBleDevice, shouldNotTimeout)
.map<com.signify.hue.flutterreactiveble.ble.EstablishConnectionResult> { com.signify.hue.flutterreactiveble.ble.EstablishedConnection(rxBleDevice.macAddress, it) }
.map<EstablishConnectionResult> { EstablishedConnection(rxBleDevice.macAddress, it) }
}
}
.onErrorReturn { error ->
com.signify.hue.flutterreactiveble.ble.EstablishConnectionFailure(rxBleDevice.macAddress,
EstablishConnectionFailure(rxBleDevice.macAddress,
error.message ?: "Unknown error")
}
.doOnNext {
Expand All @@ -115,13 +115,13 @@ internal class DeviceConnector(
connectionStatusUpdates
timestampEstablishConnection = System.currentTimeMillis()
connectionQueue.removeFromQueue(deviceId)
if (it is com.signify.hue.flutterreactiveble.ble.EstablishConnectionFailure) {
updateListeners.invoke(com.signify.hue.flutterreactiveble.ble.ConnectionUpdateError(deviceId, it.errorMessage))
if (it is EstablishConnectionFailure) {
updateListeners.invoke(ConnectionUpdateError(deviceId, it.errorMessage))
}
}
.doOnError {
connectionQueue.removeFromQueue(deviceId)
updateListeners.invoke(com.signify.hue.flutterreactiveble.ble.ConnectionUpdateError(deviceId, it.message
updateListeners.invoke(ConnectionUpdateError(deviceId, it.message
?: "Unknown error"))
}
.doOnDispose {
Expand Down Expand Up @@ -152,8 +152,8 @@ internal class DeviceConnector(

internal fun clearGattCache(): Completable = currentConnection?.let { connection ->
when (connection) {
is com.signify.hue.flutterreactiveble.ble.EstablishedConnection -> clearGattCache(connection.rxConnection)
is com.signify.hue.flutterreactiveble.ble.EstablishConnectionFailure -> Completable.error(Throwable(connection.errorMessage))
is EstablishedConnection -> clearGattCache(connection.rxConnection)
is EstablishConnectionFailure -> Completable.error(Throwable(connection.errorMessage))
}
} ?: Completable.error(IllegalStateException("Connection is not established"))

Expand All @@ -174,7 +174,7 @@ internal class DeviceConnector(
val success = refreshMethod.invoke(bluetoothGatt) as Boolean
if (success) {
Observable.empty<Unit>()
.delay(com.signify.hue.flutterreactiveble.ble.DeviceConnector.Companion.delayMsAfterClearingCache, TimeUnit.MILLISECONDS)
.delay(DeviceConnector.Companion.delayMsAfterClearingCache, TimeUnit.MILLISECONDS)
.doOnComplete { Timber.d("Clearing GATT cache completed") }
} else {
val reason = "BluetoothGatt.refresh() returned false"
Expand Down

0 comments on commit 8f1c830

Please sign in to comment.