Skip to content

Commit

Permalink
Merge pull request #1230 from eikek/bugfix/periodic-tasks
Browse files Browse the repository at this point in the history
Fix sending notification mails from background tasks
  • Loading branch information
mergify[bot] authored Dec 22, 2021
2 parents 50e4c31 + 3a642ee commit d060b04
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ trait ONotification[F[_]] {

def offerEvents(ev: Iterable[Event]): F[Unit]

def mkNotificationChannel(channel: Channel): F[Vector[NotificationChannel]]
def mkNotificationChannel(
channel: Channel,
userId: Ident
): F[Vector[NotificationChannel]]

def findNotificationChannel(ref: ChannelRef): F[Vector[NotificationChannel]]

Expand Down Expand Up @@ -109,22 +112,30 @@ object ONotification {
channel: Channel,
account: AccountId,
baseUrl: Option[LenientUri]
): F[SendTestResult] =
(for {
ev <- sampleEvent(evt, account, baseUrl)
logbuf <- Logger.buffer()
ch <- mkNotificationChannel(channel)
_ <- notMod.send(logbuf._2.andThen(log), ev, ch)
logs <- logbuf._1.get
res = SendTestResult(true, logs)
} yield res).attempt
.map {
case Right(res) => res
case Left(ex) =>
val ps = new StringWriter()
ex.printStackTrace(new PrintWriter(ps))
SendTestResult(false, Vector(s"${ex.getMessage}\n$ps"))
}
): F[SendTestResult] = {
def doCreate(userId: Ident) =
(for {
ev <- sampleEvent(evt, account, baseUrl)
logbuf <- Logger.buffer()
ch <- mkNotificationChannel(channel, userId)
_ <- notMod.send(logbuf._2.andThen(log), ev, ch)
logs <- logbuf._1.get
res = SendTestResult(true, logs)
} yield res).attempt
.map {
case Right(res) => res
case Left(ex) =>
val ps = new StringWriter()
ex.printStackTrace(new PrintWriter(ps))
SendTestResult(false, Vector(s"${ex.getMessage}\n$ps"))
}

OptionT(store.transact(RUser.findIdByAccount(account)))
.semiflatMap(doCreate)
.getOrElse(
SendTestResult(false, Vector(s"No user found in db for: ${account.asString}"))
)
}

def listChannels(account: AccountId): F[Vector[Channel]] =
store
Expand All @@ -142,7 +153,7 @@ object ONotification {
(for {
newId <- OptionT.liftF(Ident.randomId[F])
userId <- OptionT(store.transact(RUser.findIdByAccount(account)))
r <- ChannelConv.makeRecord[F](store, Right(channel), newId, userId)
r <- ChannelConv.makeRecord[F](store, log, Right(channel), newId, userId)
_ <- OptionT.liftF(store.transact(RNotificationChannel.insert(r)))
_ <- OptionT.liftF(log.debug(s"Created channel $r for $account"))
} yield AddResult.Success)
Expand All @@ -151,7 +162,7 @@ object ONotification {
def updateChannel(channel: Channel, account: AccountId): F[UpdateResult] =
(for {
userId <- OptionT(store.transact(RUser.findIdByAccount(account)))
r <- ChannelConv.makeRecord[F](store, Right(channel), channel.id, userId)
r <- ChannelConv.makeRecord[F](store, log, Right(channel), channel.id, userId)
n <- OptionT.liftF(store.transact(RNotificationChannel.update(r)))
} yield UpdateResult.fromUpdateRows(n)).getOrElse(UpdateResult.notFound)

Expand All @@ -170,7 +181,7 @@ object ONotification {
_ <- OptionT.liftF(log.debug(s"Creating new notification hook: $hook"))
channelId <- OptionT.liftF(Ident.randomId[F])
userId <- OptionT(store.transact(RUser.findIdByAccount(account)))
r <- ChannelConv.makeRecord[F](store, hook.channel, channelId, userId)
r <- ChannelConv.makeRecord[F](store, log, hook.channel, channelId, userId)
_ <- OptionT.liftF(
if (channelId == r.id) store.transact(RNotificationChannel.insert(r))
else ().pure[F]
Expand All @@ -196,7 +207,7 @@ object ONotification {
r: RNotificationHook
)(f: RNotificationChannel => F[UpdateResult]): F[UpdateResult] =
ChannelConv
.makeRecord(store, hook.channel, r.channelId, r.uid)
.makeRecord(store, log, hook.channel, r.channelId, r.uid)
.semiflatMap(f)
.getOrElse(UpdateResult.notFound)

Expand All @@ -221,10 +232,13 @@ object ONotification {
withHook(doUpdate)
}

def mkNotificationChannel(channel: Channel): F[Vector[NotificationChannel]] =
def mkNotificationChannel(
channel: Channel,
userId: Ident
): F[Vector[NotificationChannel]] =
(for {
rec <- ChannelConv
.makeRecord(store, Right(channel), channel.id, Ident.unsafe(""))
.makeRecord(store, log, Right(channel), channel.id, userId)
ch <- OptionT.liftF(store.transact(QNotification.readChannel(rec)))
} yield ch).getOrElse(Vector.empty)

Expand All @@ -249,13 +263,15 @@ object ONotification {

private[ops] def makeRecord[F[_]: Sync](
store: Store[F],
logger: Logger[F],
channelIn: Either[ChannelRef, Channel],
id: Ident,
userId: Ident
): OptionT[F, RNotificationChannel] =
channelIn match {
case Left(ref) =>
OptionT(store.transact(RNotificationChannel.getByRef(ref)))
OptionT.liftF(logger.debug(s"Loading channel for ref: ${ref}")) *>
OptionT(store.transact(RNotificationChannel.getByRef(ref)))

case Right(channel) =>
for {
Expand All @@ -264,6 +280,11 @@ object ONotification {
channel match {
case Channel.Mail(_, conn, recipients) =>
for {
_ <- OptionT.liftF(
logger.debug(
s"Looking up user smtp for ${userId.id} and ${conn.id}"
)
)
mailConn <- OptionT(
store.transact(RUserEmail.getByUser(userId, conn))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package docspell.joex.notify

import cats.data.NonEmptyList
import cats.data.OptionT
import cats.effect._
import cats.implicits._

Expand All @@ -23,6 +24,7 @@ import docspell.query.ItemQueryDsl._
import docspell.store.qb.Batch
import docspell.store.queries.ListItem
import docspell.store.queries.{QItem, Query}
import docspell.store.records.RUser

object PeriodicDueItemsTask {
val taskName = PeriodicDueItemsArgs.taskName
Expand All @@ -49,7 +51,11 @@ object PeriodicDueItemsTask {
def withChannel[F[_]: Sync](ctx: Context[F, Args], ops: ONotification[F])(
cont: Vector[NotificationChannel] => F[Unit]
): F[Unit] =
TaskOperations.withChannel(ctx.logger, ctx.args.channel, ops)(cont)
OptionT(ctx.store.transact(RUser.findIdByAccount(ctx.args.account)))
.semiflatMap(userId =>
TaskOperations.withChannel(ctx.logger, ctx.args.channel, userId, ops)(cont)
)
.getOrElse(())

def withItems[F[_]: Sync](ctx: Context[F, Args], limit: Int, now: Timestamp)(
cont: Vector[ListItem] => F[Unit]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package docspell.joex.notify

import cats.data.OptionT
import cats.effect._
import cats.implicits._

Expand All @@ -20,6 +21,7 @@ import docspell.query.ItemQueryParser
import docspell.store.qb.Batch
import docspell.store.queries.ListItem
import docspell.store.queries.{QItem, Query}
import docspell.store.records.RUser

object PeriodicQueryTask {
val taskName = PeriodicQueryArgs.taskName
Expand All @@ -46,7 +48,11 @@ object PeriodicQueryTask {
def withChannel[F[_]: Sync](ctx: Context[F, Args], ops: ONotification[F])(
cont: Vector[NotificationChannel] => F[Unit]
): F[Unit] =
TaskOperations.withChannel(ctx.logger, ctx.args.channel, ops)(cont)
OptionT(ctx.store.transact(RUser.findIdByAccount(ctx.args.account)))
.semiflatMap(userId =>
TaskOperations.withChannel(ctx.logger, ctx.args.channel, userId, ops)(cont)
)
.getOrElse(())

def withItems[F[_]: Sync](ctx: Context[F, Args], limit: Int, now: Timestamp)(
cont: Vector[ListItem] => F[Unit]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ trait TaskOperations {
def withChannel[F[_]: Sync](
logger: Logger[F],
channel: ChannelOrRef,
userId: Ident,
ops: ONotification[F]
)(
cont: Vector[NotificationChannel] => F[Unit]
): F[Unit] = {
val channels = channel match {
case Right(ch) => ops.mkNotificationChannel(ch)
case Right(ch) => ops.mkNotificationChannel(ch, userId)
case Left(ref) => ops.findNotificationChannel(ref)
}
channels.flatMap { ch =>
Expand Down

0 comments on commit d060b04

Please sign in to comment.