Skip to content

Commit

Permalink
Merge pull request #542 from AVSystem/reactive-publisher-extensions
Browse files Browse the repository at this point in the history
Reactive Publisher to Monix extensions for typed Mongo
  • Loading branch information
ddworak authored Feb 22, 2024
2 parents f9d1aa1 + c493ed7 commit 81ec8a0
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.avsystem.commons
package mongo.reactive

import monix.eval.Task
import monix.reactive.Observable
import org.reactivestreams.Publisher

trait ReactiveMongoExtensions {
import ReactiveMongoExtensions._

implicit final def publisherOps[T](publisher: Publisher[T]): PublisherOps[T] = new PublisherOps(publisher)
}
object ReactiveMongoExtensions extends ReactiveMongoExtensions {
/**
* Extensions for converting [[Publisher]] to [[Task]]/[[Observable]] Monix types
*/
final class PublisherOps[T](private val publisher: Publisher[T]) extends AnyVal {
def asMonix: Observable[T] = Observable.fromReactivePublisher(publisher)

// prefer using the family of methods below for observables which are intended to only return a single document,
// mongo observable implementation sometimes logs an error on server-closed cursors
private def singleObservable: Task[Option[T]] = Task.fromReactivePublisher(publisher)

// handles both an empty Publisher and and a single null item
def headOptionL: Task[Option[T]] = singleObservable.map(_.filterNot(_ == null))
def headOptL: Task[Opt[T]] = headOptionL.map(_.toOpt)
// can return null if Mongo driver for some reason returns null, intentionally don't want to report failure
def headL: Task[T] = singleObservable.map(_.get)
def completedL: Task[Unit] = headOptionL.void
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class TypedMongoCollection[E <: BaseMongoEntity] private(
* to be used for database operations not covered directly by [[TypedMongoCollection]].
*/
def multiResultNativeOp[T](operation: MongoCollection[E] => Publisher[T]): Observable[T] =
Observable.fromReactivePublisher(operation(nativeCollection))
multi(operation(nativeCollection))

def drop(): Task[Unit] =
empty(optionalizeFirstArg(nativeCollection.drop(sessionOrNull)))
Expand Down Expand Up @@ -155,7 +155,7 @@ class TypedMongoCollection[E <: BaseMongoEntity] private(
}

def toObservable[X](publisher: FindPublisher[X]): Observable[X] =
Observable.fromReactivePublisher(setupPublisher(publisher))
multi(setupPublisher(publisher))

projection match {
case SelfRef =>
Expand Down Expand Up @@ -242,15 +242,14 @@ class TypedMongoCollection[E <: BaseMongoEntity] private(
filter: MongoDocumentFilter[E] = MongoFilter.empty,
setupOptions: DistinctPublisher[Any] => DistinctPublisher[Any] = identity,
): Observable[T] = {

val publisher =
optionalizeFirstArg(nativeCollection.distinct(sessionOrNull, property.rawPath, classOf[BsonValue]))
.filter(filter.toFilterBson(Opt.Empty, property.projectionRefs))

val publisherWithOptions =
setupOptions(publisher.asInstanceOf[DistinctPublisher[Any]]).asInstanceOf[DistinctPublisher[BsonValue]]

Observable.fromReactivePublisher(publisherWithOptions).map(property.format.readBson)
multi(publisherWithOptions).map(property.format.readBson)
}

def insertOne(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,13 @@ import monix.reactive.Observable
import org.reactivestreams.Publisher

trait TypedMongoUtils {
protected final def empty(publisher: Publisher[Void]): Task[Unit] =
Observable.fromReactivePublisher(publisher, 1).completedL

protected final def single[T](publisher: Publisher[T]): Task[T] =
Observable.fromReactivePublisher(publisher, 1).firstL
import com.avsystem.commons.mongo.reactive.ReactiveMongoExtensions._

protected final def empty(publisher: Publisher[Void]): Task[Unit] = publisher.completedL
protected final def single[T](publisher: Publisher[T]): Task[T] = publisher.headL
// handles both an empty Publisher and and a single null item
protected final def singleOpt[T](publisher: Publisher[T]): Task[Option[T]] =
Observable.fromReactivePublisher(publisher, 1).filter(_ != null).firstOptionL

protected final def multi[T](publisher: Publisher[T]): Observable[T] =
Observable.fromReactivePublisher(publisher)
protected final def singleOpt[T](publisher: Publisher[T]): Task[Option[T]] = publisher.headOptionL
protected final def multi[T](publisher: Publisher[T]): Observable[T] = publisher.asMonix

/**
* Transforms an expression `method(nullableArg, moreArgs)` into
Expand Down

0 comments on commit 81ec8a0

Please sign in to comment.