Skip to content

Commit 17053fa

Browse files
Merge branch 'dart-query-stream' into 'main'
Dart query stream via Isolate See merge request objectbox/objectbox-dart!2
2 parents b6071f4 + cf0529b commit 17053fa

File tree

5 files changed

+343
-62
lines changed

5 files changed

+343
-62
lines changed

benchmark/bin/query.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class QueryFindIds extends QueryBenchmark {
5656
Future<void> run() async => query.findIds();
5757
}
5858

59+
/// Stream where visitor is running in Dart isolate.
5960
class QueryStream extends QueryBenchmark {
6061
QueryStream() : super('${QueryStream}');
6162

objectbox/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
Linux, Windows). This is where the [`install.sh`](/install.sh) script downloads it by default.
88
E.g. it is no longer necessary to install the library globally to run `dart test` or `flutter test`.
99
* Windows: Support database directory paths that contain unicode (UTF-8) characters. #406
10+
* Changed `Query.stream` to collect results in a worker isolate, which should typically be faster.
1011
* Update: [objectbox-c 0.16.0](https://github.com/objectbox/objectbox-c/releases/tag/v0.16.0).
1112
* Update: [objectbox-android 3.1.3](https://github.com/objectbox/objectbox-java/releases/tag/V3.1.3).
1213
* Add new [task with tag list Flutter example app](example/flutter/objectbox_demo_relations) that

objectbox/lib/src/native/query/query.dart

Lines changed: 252 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ import 'dart:isolate';
77
import 'dart:typed_data';
88

99
import 'package:ffi/ffi.dart';
10+
import 'package:meta/meta.dart';
1011

1112
import '../../common.dart';
1213
import '../../modelinfo/entity_definition.dart';
1314
import '../../modelinfo/modelproperty.dart';
1415
import '../../modelinfo/modelrelation.dart';
1516
import '../../store.dart';
17+
import '../../transaction.dart';
1618
import '../bindings/bindings.dart';
1719
import '../bindings/data_visitor.dart';
1820
import '../bindings/helpers.dart';
@@ -721,6 +723,24 @@ class Query<T> {
721723
}
722724
}
723725

726+
/// Clones this native query and returns a pointer to the clone.
727+
///
728+
/// This is useful to send a reference to a query to an isolate. A [Query] can
729+
/// not be sent to an isolate directly because it contains pointers.
730+
///
731+
/// ```dart
732+
/// // Clone the query and obtain its address, can be sent to an isolate.
733+
/// final queryPtrAddress = query._clone().address;
734+
///
735+
/// // Within an isolate re-create the query pointer to be used with the C API.
736+
/// final queryPtr = Pointer<OBX_query>.fromAddress(isolateInit.queryPtrAddress);
737+
/// ```
738+
Pointer<OBX_query> _clone() {
739+
final ptr = checkObxPtr(C.query_clone(_ptr));
740+
reachabilityFence(this);
741+
return ptr;
742+
}
743+
724744
/// Close the query and free resources.
725745
void close() {
726746
if (!_closed) {
@@ -811,58 +831,59 @@ class Query<T> {
811831

812832
/// Finds Objects matching the query, streaming them while the query executes.
813833
///
814-
/// Note: make sure you evaluate performance in your use case - streams come
815-
/// with an overhead so a plain [find()] is usually faster.
816-
Stream<T> stream() => _stream1();
834+
/// Results are streamed from a worker isolate in batches (the stream still
835+
/// returns objects one by one).
836+
Stream<T> stream() => _streamIsolate();
817837

818838
/// Stream items by sending full flatbuffers binary as a message.
819-
Stream<T> _stream1() {
820-
initializeDartAPI();
821-
final port = ReceivePort();
822-
final cStream = checkObxPtr(
823-
C.dartc_query_find(_cQuery, port.sendPort.nativePort), 'query stream');
824-
825-
var closed = false;
826-
final close = () {
827-
if (closed) return;
828-
closed = true;
829-
C.dartc_stream_close(cStream);
830-
port.close();
831-
reachabilityFence(this);
832-
};
833-
834-
try {
835-
final controller = StreamController<T>(onCancel: close);
836-
port.listen((dynamic message) {
837-
// We expect Uint8List for data and NULL when the query has finished.
838-
if (message is Uint8List) {
839-
try {
840-
controller.add(
841-
_entity.objectFromFB(_store, ByteData.view(message.buffer)));
842-
return;
843-
} catch (e) {
844-
controller.addError(e);
845-
}
846-
} else if (message is String) {
847-
controller.addError(
848-
ObjectBoxException('Query stream native exception: $message'));
849-
} else if (message != null) {
850-
controller.addError(ObjectBoxException(
851-
'Query stream received an invalid message type '
852-
'(${message.runtimeType}): $message'));
853-
}
854-
// Close the stream, this will call the onCancel function.
855-
// Do not call the onCancel function manually,
856-
// if cancel() is called on the Stream subscription right afterwards it
857-
// will use the shortcut in the onCancel function and not wait.
858-
controller.close(); // done
859-
});
860-
return controller.stream;
861-
} catch (e) {
862-
close();
863-
rethrow;
864-
}
865-
}
839+
/// Replaced by _streamIsolate which in benchmarks has been faster.
840+
// Stream<T> _stream1() {
841+
// initializeDartAPI();
842+
// final port = ReceivePort();
843+
// final cStream = checkObxPtr(
844+
// C.dartc_query_find(_cQuery, port.sendPort.nativePort), 'query stream');
845+
//
846+
// var closed = false;
847+
// final close = () {
848+
// if (closed) return;
849+
// closed = true;
850+
// C.dartc_stream_close(cStream);
851+
// port.close();
852+
// reachabilityFence(this);
853+
// };
854+
//
855+
// try {
856+
// final controller = StreamController<T>(onCancel: close);
857+
// port.listen((dynamic message) {
858+
// // We expect Uint8List for data and NULL when the query has finished.
859+
// if (message is Uint8List) {
860+
// try {
861+
// controller.add(
862+
// _entity.objectFromFB(_store, ByteData.view(message.buffer)));
863+
// return;
864+
// } catch (e) {
865+
// controller.addError(e);
866+
// }
867+
// } else if (message is String) {
868+
// controller.addError(
869+
// ObjectBoxException('Query stream native exception: $message'));
870+
// } else if (message != null) {
871+
// controller.addError(ObjectBoxException(
872+
// 'Query stream received an invalid message type '
873+
// '(${message.runtimeType}): $message'));
874+
// }
875+
// // Close the stream, this will call the onCancel function.
876+
// // Do not call the onCancel function manually,
877+
// // if cancel() is called on the Stream subscription right afterwards it
878+
// // will use the shortcut in the onCancel function and not wait.
879+
// controller.close(); // done
880+
// });
881+
// return controller.stream;
882+
// } catch (e) {
883+
// close();
884+
// rethrow;
885+
// }
886+
// }
866887

867888
/// Stream items by sending pointers from native code.
868889
/// Interestingly this is slower even though it transfers only pointers...
@@ -915,6 +936,166 @@ class Query<T> {
915936
// }
916937
// }
917938

939+
Stream<T> _streamIsolate() {
940+
final resultPort = ReceivePort();
941+
final exitPort = ReceivePort();
942+
943+
void spawnWorkerIsolate() async {
944+
// Pass clones of Store and Query to avoid these getting closed while the
945+
// worker isolate is still running. The isolate closes the clones once done.
946+
final storeClonePtr = InternalStoreAccess.clone(_store);
947+
final queryClonePtr = _clone();
948+
949+
// Current batch size determined through testing, performs well for smaller
950+
// objects. Might want to expose in the future for performance tuning by
951+
// users.
952+
final isolateInit = _StreamIsolateInit(resultPort.sendPort,
953+
storeClonePtr.address, queryClonePtr.address, 20);
954+
// If spawn errors StreamController will propagate the error, no point in
955+
// using addError as no listener before this function completes.
956+
await Isolate.spawn(_queryAndVisit, isolateInit,
957+
onExit: exitPort.sendPort);
958+
}
959+
960+
SendPort? sendPort;
961+
962+
// Callback to exit the isolate once consumers or this close the stream
963+
// (potentially before all results have been streamed).
964+
// Must return Future<void>, otherwise StreamController will not wait on it.
965+
var isolateExitSent = false;
966+
Future<void> exitIsolate() async {
967+
if (isolateExitSent) return;
968+
isolateExitSent = true;
969+
// Send signal to isolate it should exit.
970+
sendPort?.send(null);
971+
// Wait for isolate to clean up native resources,
972+
// otherwise e.g. Store is still open and
973+
// e.g. tests can not delete database files.
974+
await exitPort.first;
975+
resultPort.close();
976+
exitPort.close();
977+
}
978+
979+
final streamController = StreamController<T>(
980+
onListen: spawnWorkerIsolate, onCancel: exitIsolate);
981+
resultPort.listen((dynamic message) async {
982+
// The first message from the spawned isolate is a SendPort. This port
983+
// is used to communicate with the spawned isolate.
984+
if (message is SendPort) {
985+
sendPort = message;
986+
return; // wait for next message.
987+
}
988+
// Further messages are
989+
// - ObxObjectMessage for data,
990+
// - Exception and Error for errors and
991+
// - null if the worker isolate is done sending data.
992+
else if (message is _StreamIsolateMessage) {
993+
try {
994+
for (var i = 0; i < message.dataPtrAddresses.length; i++) {
995+
final dataPtrAddress = message.dataPtrAddresses[i];
996+
final size = message.sizes[i];
997+
if (size == 0) break; // Reached last object.
998+
streamController.add(_entity.objectFromFB(
999+
_store,
1000+
InternalStoreAccess.reader(_store)
1001+
.access(Pointer.fromAddress(dataPtrAddress), size)));
1002+
}
1003+
return; // wait for next message.
1004+
} catch (e) {
1005+
streamController.addError(e);
1006+
}
1007+
} else if (message is Error) {
1008+
streamController.addError(message);
1009+
} else if (message is Exception) {
1010+
streamController.addError(message);
1011+
} else if (message != null) {
1012+
streamController.addError(
1013+
ObjectBoxException('Query stream received an invalid message type '
1014+
'(${message.runtimeType}): $message'));
1015+
}
1016+
// Close the stream, this will call the onCancel function.
1017+
// Do not call the onCancel function manually,
1018+
// if cancel() is called on the Stream subscription right afterwards it
1019+
// will use the shortcut in the onCancel function and not wait.
1020+
streamController.close();
1021+
});
1022+
return streamController.stream;
1023+
}
1024+
1025+
// Isolate entry point must be top-level or static.
1026+
static Future<void> _queryAndVisit(_StreamIsolateInit isolateInit) async {
1027+
// Init native resources asap so that they do not leak, e.g. on exceptions
1028+
final store =
1029+
InternalStoreAccess.createMinimal(isolateInit.storePtrAddress);
1030+
1031+
var resultPort = isolateInit.resultPort;
1032+
1033+
// Send a SendPort to the main isolate so that it can send to this isolate.
1034+
final commandPort = ReceivePort();
1035+
resultPort.send(commandPort.sendPort);
1036+
1037+
try {
1038+
// Visit inside transaction and do not complete transaction to ensure
1039+
// data pointers remain valid until main isolate has deserialized all data.
1040+
await InternalStoreAccess.runInTransaction(store, TxMode.read,
1041+
(Transaction tx) async {
1042+
// Use fixed-length lists to avoid performance hit due to growing.
1043+
final maxBatchSize = isolateInit.batchSize;
1044+
var dataPtrBatch = List<int>.filled(maxBatchSize, 0);
1045+
var sizeBatch = List<int>.filled(maxBatchSize, 0);
1046+
var batchSize = 0;
1047+
final visitor = dataVisitor((Pointer<Uint8> data, int size) {
1048+
// Currently returning all results, even if the stream has been closed
1049+
// before (e.g. only first element taken). Would need a way to check
1050+
// for exit command on commandPort synchronously.
1051+
dataPtrBatch[batchSize] = data.address;
1052+
sizeBatch[batchSize] = size;
1053+
batchSize++;
1054+
// Send data in batches as sending a message is rather expensive.
1055+
if (batchSize == maxBatchSize) {
1056+
resultPort.send(_StreamIsolateMessage(dataPtrBatch, sizeBatch));
1057+
// Re-use list instance to avoid performance hit due to new instance.
1058+
dataPtrBatch.fillRange(0, dataPtrBatch.length, 0);
1059+
sizeBatch.fillRange(0, dataPtrBatch.length, 0);
1060+
batchSize = 0;
1061+
}
1062+
return true;
1063+
});
1064+
final queryPtr =
1065+
Pointer<OBX_query>.fromAddress(isolateInit.queryPtrAddress);
1066+
try {
1067+
checkObx(C.query_visit(queryPtr, visitor, nullptr));
1068+
} catch (e) {
1069+
resultPort.send(e);
1070+
return;
1071+
} finally {
1072+
try {
1073+
checkObx(C.query_close(queryPtr));
1074+
} catch (e) {
1075+
resultPort.send(e);
1076+
return;
1077+
}
1078+
}
1079+
// Send any remaining data.
1080+
if (batchSize > 0) {
1081+
resultPort.send(_StreamIsolateMessage(dataPtrBatch, sizeBatch));
1082+
}
1083+
1084+
// Signal to the main isolate there are no more results.
1085+
resultPort.send(null);
1086+
// Wait for main isolate to confirm it is done accessing sent data pointers.
1087+
await commandPort.first;
1088+
// Note: when the transaction is closed after await this might lead to an
1089+
// error log as the isolate could have been transferred to another thread
1090+
// when resuming execution.
1091+
// https://github.com/dart-lang/sdk/issues/46943
1092+
});
1093+
} finally {
1094+
store.close();
1095+
commandPort.close();
1096+
}
1097+
}
1098+
9181099
/// For internal testing purposes.
9191100
String describe() {
9201101
final result = dartStringFromC(C.query_describe(_ptr));
@@ -947,3 +1128,24 @@ class Query<T> {
9471128
return result;
9481129
}
9491130
}
1131+
1132+
/// Message passed to entry point [Query._queryAndVisit] of isolate.
1133+
@immutable
1134+
class _StreamIsolateInit {
1135+
final SendPort resultPort;
1136+
final int storePtrAddress;
1137+
final int queryPtrAddress;
1138+
final int batchSize;
1139+
1140+
const _StreamIsolateInit(this.resultPort, this.storePtrAddress,
1141+
this.queryPtrAddress, this.batchSize);
1142+
}
1143+
1144+
/// Message sent to main isolate containing info about a batch of objects.
1145+
@immutable
1146+
class _StreamIsolateMessage {
1147+
final List<int> dataPtrAddresses;
1148+
final List<int> sizes;
1149+
1150+
const _StreamIsolateMessage(this.dataPtrAddresses, this.sizes);
1151+
}

0 commit comments

Comments
 (0)