@@ -19,7 +19,7 @@ package fetch
19
19
import cats .data .NonEmptyList
20
20
import cats .effect ._
21
21
import cats .effect .implicits ._
22
- import cats .effect .std .Queue
22
+ import cats .effect .std .{ Queue , Supervisor }
23
23
import cats .kernel .{Hash => H }
24
24
import cats .syntax .all ._
25
25
@@ -75,8 +75,10 @@ object DataSource {
75
75
implicit F : Temporal [F ]
76
76
): F [List [T ]] = {
77
77
Ref [F ].of(List .empty[T ]).flatMap { ref =>
78
- val takeAndBuffer = queue.take.flatMap { x =>
79
- ref.updateAndGet(list => x :: list)
78
+ val takeAndBuffer = F .uncancelable { poll =>
79
+ poll(queue.take).flatMap { x =>
80
+ ref.updateAndGet(list => x :: list)
81
+ }
80
82
}
81
83
val bufferUntilNumElements = takeAndBuffer.iterateUntil { buffer =>
82
84
buffer.size == maxElements
@@ -112,14 +114,16 @@ object DataSource {
112
114
): Resource [F , DataSource [F , I , A ]] = {
113
115
type Callback = Either [Throwable , Option [A ]] => Unit
114
116
for {
115
- queue <- Resource .eval(Queue .unbounded[F , (I , Callback )])
117
+ queue <- Resource .eval(Queue .unbounded[F , (I , Callback )])
118
+ supervisor <- Supervisor [F ]
116
119
workerFiber = upToWithin(
117
120
queue,
118
121
dataSource.maxBatchSize.getOrElse(Int .MaxValue ),
119
122
delayPerBatch
120
- ).flatMap {
121
- case Nil => F .start(F .unit)
122
- case x =>
123
+ ).flatMap { x =>
124
+ if (x.isEmpty) {
125
+ supervisor.supervise(F .unit)
126
+ } else {
123
127
val asMap = x.groupBy(_._1).mapValues(callbacks => callbacks.map(_._2))
124
128
val batchResults = dataSource.batch(NonEmptyList .fromListUnsafe(asMap.keys.toList))
125
129
val resultsHaveBeenSent = batchResults.map { results =>
@@ -132,7 +136,8 @@ object DataSource {
132
136
callbacks.foreach(cb => cb(Left (ex)))
133
137
}
134
138
}
135
- F .start(fiberWork)
139
+ supervisor.supervise(fiberWork)
140
+ }
136
141
}.foreverM[Unit ]
137
142
_ <- F .background(workerFiber)
138
143
} yield {
0 commit comments