diff --git a/mongo/jvm/src/main/scala/com/avsystem/commons/mongo/reactive/ReactiveMongoExtensions.scala b/mongo/jvm/src/main/scala/com/avsystem/commons/mongo/reactive/ReactiveMongoExtensions.scala new file mode 100644 index 000000000..b5a07b0c2 --- /dev/null +++ b/mongo/jvm/src/main/scala/com/avsystem/commons/mongo/reactive/ReactiveMongoExtensions.scala @@ -0,0 +1,31 @@ +package com.avsystem.commons +package mongo.reactive + +import monix.eval.Task +import monix.reactive.Observable +import org.reactivestreams.Publisher + +trait ReactiveMongoExtensions { + import ReactiveMongoExtensions._ + + implicit final def publisherOps[T](publisher: Publisher[T]): PublisherOps[T] = new PublisherOps(publisher) +} +object ReactiveMongoExtensions extends ReactiveMongoExtensions { + /** + * Extensions for converting [[Publisher]] to [[Task]]/[[Observable]] Monix types + */ + final class PublisherOps[T](private val publisher: Publisher[T]) extends AnyVal { + def asMonix: Observable[T] = Observable.fromReactivePublisher(publisher) + + // prefer using the family of methods below for observables which are intended to only return a single document, + // mongo observable implementation sometimes logs an error on server-closed cursors + private def singleObservable: Task[Option[T]] = Task.fromReactivePublisher(publisher) + + // handles both an empty Publisher and and a single null item + def headOptionL: Task[Option[T]] = singleObservable.map(_.filterNot(_ == null)) + def headOptL: Task[Opt[T]] = headOptionL.map(_.toOpt) + // can return null if Mongo driver for some reason returns null, intentionally don't want to report failure + def headL: Task[T] = singleObservable.map(_.get) + def completedL: Task[Unit] = headOptionL.void + } +} diff --git a/mongo/jvm/src/main/scala/com/avsystem/commons/mongo/typed/TypedMongoCollection.scala b/mongo/jvm/src/main/scala/com/avsystem/commons/mongo/typed/TypedMongoCollection.scala index c1f210665..3f7d85350 100644 --- a/mongo/jvm/src/main/scala/com/avsystem/commons/mongo/typed/TypedMongoCollection.scala +++ b/mongo/jvm/src/main/scala/com/avsystem/commons/mongo/typed/TypedMongoCollection.scala @@ -91,7 +91,7 @@ class TypedMongoCollection[E <: BaseMongoEntity] private( * to be used for database operations not covered directly by [[TypedMongoCollection]]. */ def multiResultNativeOp[T](operation: MongoCollection[E] => Publisher[T]): Observable[T] = - Observable.fromReactivePublisher(operation(nativeCollection)) + multi(operation(nativeCollection)) def drop(): Task[Unit] = empty(optionalizeFirstArg(nativeCollection.drop(sessionOrNull))) @@ -155,7 +155,7 @@ class TypedMongoCollection[E <: BaseMongoEntity] private( } def toObservable[X](publisher: FindPublisher[X]): Observable[X] = - Observable.fromReactivePublisher(setupPublisher(publisher)) + multi(setupPublisher(publisher)) projection match { case SelfRef => @@ -242,7 +242,6 @@ class TypedMongoCollection[E <: BaseMongoEntity] private( filter: MongoDocumentFilter[E] = MongoFilter.empty, setupOptions: DistinctPublisher[Any] => DistinctPublisher[Any] = identity, ): Observable[T] = { - val publisher = optionalizeFirstArg(nativeCollection.distinct(sessionOrNull, property.rawPath, classOf[BsonValue])) .filter(filter.toFilterBson(Opt.Empty, property.projectionRefs)) @@ -250,7 +249,7 @@ class TypedMongoCollection[E <: BaseMongoEntity] private( val publisherWithOptions = setupOptions(publisher.asInstanceOf[DistinctPublisher[Any]]).asInstanceOf[DistinctPublisher[BsonValue]] - Observable.fromReactivePublisher(publisherWithOptions).map(property.format.readBson) + multi(publisherWithOptions).map(property.format.readBson) } def insertOne( diff --git a/mongo/jvm/src/main/scala/com/avsystem/commons/mongo/typed/TypedMongoUtils.scala b/mongo/jvm/src/main/scala/com/avsystem/commons/mongo/typed/TypedMongoUtils.scala index 0c284154d..1019a91c3 100644 --- a/mongo/jvm/src/main/scala/com/avsystem/commons/mongo/typed/TypedMongoUtils.scala +++ b/mongo/jvm/src/main/scala/com/avsystem/commons/mongo/typed/TypedMongoUtils.scala @@ -7,18 +7,13 @@ import monix.reactive.Observable import org.reactivestreams.Publisher trait TypedMongoUtils { - protected final def empty(publisher: Publisher[Void]): Task[Unit] = - Observable.fromReactivePublisher(publisher, 1).completedL - - protected final def single[T](publisher: Publisher[T]): Task[T] = - Observable.fromReactivePublisher(publisher, 1).firstL + import com.avsystem.commons.mongo.reactive.ReactiveMongoExtensions._ + protected final def empty(publisher: Publisher[Void]): Task[Unit] = publisher.completedL + protected final def single[T](publisher: Publisher[T]): Task[T] = publisher.headL // handles both an empty Publisher and and a single null item - protected final def singleOpt[T](publisher: Publisher[T]): Task[Option[T]] = - Observable.fromReactivePublisher(publisher, 1).filter(_ != null).firstOptionL - - protected final def multi[T](publisher: Publisher[T]): Observable[T] = - Observable.fromReactivePublisher(publisher) + protected final def singleOpt[T](publisher: Publisher[T]): Task[Option[T]] = publisher.headOptionL + protected final def multi[T](publisher: Publisher[T]): Observable[T] = publisher.asMonix /** * Transforms an expression `method(nullableArg, moreArgs)` into