Pipes refactor and Cofunctors #1444
louthy
announced in
Announcements
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
LanguageExt Pipes Background
Part of the
v5
refresh was to migrate thePipes
functionality to be a proper monad-transformer (inv4
it's a transformer too, but it can only liftEff<RT, A>
, rather than the more generalK<M, A> where M : Monad<M>
). I completed the generalisation work a while back, but it had some problems:For any users of pipes it was going to be a big upheaval
Obviously,
v5
is a big change, but where possible I want the migrations to be quite mechanical - it wasn't going to be. That doesn't mean I shouldn't 'go for it', but I'm trying to make sure that every bit of pain a user has to go through to move fromv4
tov5
is strongly justified and will lead to a better experience once migrated.It was inconsistently named
The core type
Proxy
, and the derived types:Producer
,Consumer
,Pipe
, etc. don't follow the monad-transformer naming convention of having aT
suffix. Really, if they're going to be generalised for any monad then they should be calledProducerT
,ConsumerT
,PipeT
, ...Pipes is hard to use
This is not a new problem with
v5
. I made Pipes into a 1-for-1 clone of the Haskell Pipes library. Even in Haskell they can be quite hard to use as you chase alignment of generics. The desire for pipes to support: producers, pipes, clients, servers, and more seems (in hindsight) to be too greedy.Hard to retrofit
The generalisation process wasn't working well in some areas. The
Producer.merge
was blocking and fixing it with the original code was challenging to say the least.LanguageExt Pipes Refresh
So, I decided to take a step back. Instead of trying to make an exact clone of the Haskell version, I thought I'd build it from scratch in a way that's more 'csharpy', consistent, and simpler. In particular I looked at the techniques I used to refactor the
IO
monad (to support recursion, asynchrony, etc.) and brought them into a new Pipes implementation.I also decided to drop support for
Client
,Server
,Request
,Response
, and all of the other stuff that I suspect nobody used because they were too hard.That means:
Proxy<A1, A, B1, B, M, R>
interface. This was only needed to support all flavours of client, server, producer, consumer, etc.PipeT<IN, OUT, M, R>
ProducerT<OUT, M, R>
is simply a pipe with the input set toUnit
:PipeT<Unit, OUT, M, R>
ConsumerT<IN, M, R>
is simply a pipe with the output set toVoid
:PipeT<IN, Void, M, R>
EffectT<M, R>
is simply a pipe with the input set toUnit
and the output set toVoid
. This enclosed effect is the result of fusing producer, pipe, and consumers together:PipeT<Unit, Void, M, R>
Those four types:
ProducerT
,PipeT
,ConsumerT
, andEffectT
are the new simplified and, fully generalised, version of pipes.Now that the generalised implementation follows the naming convention of having a
T
suffix for transformers, we can use the original namesProducer
,Pipe
,Consumer
, andEffect
to provide a more specialised version that only works withEff<RT, A>
(like the original pipes).So,
Producer<RT, OUT, R>
is (internally) aProducerT<OUT, Eff<RT>, R>
Pipe<RT, IN, OUT, R>
is (internally) aPipeT<IN, OUT, Eff<RT>, R>
Consumer<RT, IN, R>
is (internally) aConsumerT<IN, Eff<RT>, R>
Effect<RT, IN, R>
is (internally) aEffectT<IN, Eff<RT>, R>
The good thing about this refactor is that there really is only one implementation of the pipes functionality and it all sits in the
PipesT.DSL.cs
. This focused DSL is much easier to manage than before - it was implemented in a similar way before, but it's now just much easier for a C# dev to consume. I have put a real effort into making the interfaces, modules, preludes, etc. consistent for all types.Pipes concurrency
Concurrency wasn't front-and-centre in the original implementation. In some senses it was 'bolted on'. You got concurrency from the lifted
Eff
type and from theProducer.merge
function, but that was it.Now pipes has first-class support for concurrency:
IEnumerable
andIAsyncEnumerable
withProducerT.yieldAll
,Producer.yieldAll
,PipeT.yieldAll
, andPipe.yieldAll
.PipeT.liftT
,PipeT.liftM
,Pipe.liftT
,Pipe.liftM
,ProducerT.liftT
,ProducerT.liftM
,Producer.liftT
,Producer.liftM
,ConsumerT.liftT
,ConsumerT.liftM
,Consumer.liftT
,Consumer.liftM
,EffectT.liftT
,EffectT.liftM
,Effect.liftT
, andEffect.liftM
!Mailbox
,Inbox
, andOutbox
Inspired by the original
Pipes.Concurrency
library, I implementedMailbox
,Inbox
, andOutbox
. It's not a clone of the original, just inspired by. AMailbox
consists of anInbox
and anOutbox
. The inbox receives values posted to it. The outbox yields values posted to the inbox upon request.Backing the
Mailbox
is aSystem.Threading.Channels.Channel
. You can create aMailbox
like so:A mailbox is simply a
record
with anInbox
andOutbox
:You can
Post
to theMailbox
and you canRead
from theMailbox
. But, even more critically, you can call:mailbox.ToConsumer<M>()
- to get a consumer of values being posted into theInbox
mailbox.ToProducer<M>()
- to get a producer of values being yielded into theOutbox
A good example of why this is useful is the new
Producer.merge
function:The
merge
function gets a collection of producers. What we want is for those to run concurrently so we can receive the values as they happen. Then we want to produce a single merged stream of values.This creates the merged stream
Mailbox
:In
forkEffects
we process each producerp
and pipe its values tomailbox.ToConsumerT
:So, we get a
ConsumerT
for the merged-stream'sMailbox
. It consumes every value fromp
, fusing into anEffectT
. We thenRun()
thatEffectT
which gives us the underlyingM
monad:We do this for every
ProducerT
, which means the merged-valuesMailbox
gets every value yielded from upstream.Finally, we
ForkIO
eachEffectT
so that it can run in parallel.Back to the
merge
function, we then access the other side of the mailbox by asking for theOutbox
producer, usingToProducerT
:This will then yield all of the merged values downstream (whilst there are values to yield). Once complete, we tidy up the forks:
Cofunctor
Mailbox
is pretty powerful in its own right and doesn't need pipes to function. This is a quick example of a loop that reads every value posted to aMailbox
and writes it to the console:Mailbox<A, B>
has two type parameters:A
represents the values coming in andB
represents the values being yielded.Values of type
A
are posted toMailbox.Inbox
and values of typeB
are yielded fromMailbox.Outbox
.If you call
mailbox.Map<C>((B b) => ...)
onMailbox
then you could imagineMailbox
being represented like this:Subsequent calls to
Map<D>
, and the like, would continue to transform the value being yielded from theMailbox.Outbox
:But what if we wanted to transform the values being posted into the
Mailbox.Inbox
(theA
value).Map
doesn't work here, because it transforms an existing value, we'd have toMap
theA
to something else. But, to do that, we'd have to have anA
value.So, there's no way we can change the values coming in? Well there is, but not with
Functor
andMap
. We need Contravariant Functors; colloquially known as 'Co-functors'.When it comes to category-theoretic concepts, 'co', can usually be read to mean 'reverse the arrows'. Or, in other words, find the 'dual' of. So a co-functor is a functor with the arrows reversed.
Functor
, looks like this:It maps an
A -> B
. This can be seen as mapping the values thatF<A>
yields after they've been yielded.Let's reverse the arrows:
Now, it takes an
F<B>
and function fromA -> B
and returns anF<A>
. This may seem batshit crazy. How can we get a value ofA
out of anF<B>
to pass to thef
function?We can't. And we won't be doing that.
F
in this case is not a type that yields values, but a type that receives values. It's a sink rather than a stream. So, thef
is being used to transform values coming into theF<B>
(before arrival), not transforming values being yielded (after leaving).Mailbox.Inbox
is aCofunctor
and so, you can callContramap
on theMailbox
to transform values before they are posted into theInbox
.Custom Mailboxes
Because
Mailbox
is simply arecord
that takes anInbox
and anOutbox
, you can build your own without usingMailbox.spawn
.Inbox
is currently created from aSystem.Threading.Channels.ChannelWriter
:This simply writes to the
Channel
when a value is posted.And,
Inbox
is created fromSystem.Threading.Channels.ChannelReader
:Which simply reads a value from
Channel
when one is available.You could extend
Inbox<A>
andOutbox<A>
to work with any sink or source-type you like.Channel
works pretty well and has good control over buffer-size and back-pressure, but there are other options too.Divisible
andDecidable
contravariant functorsAnother powerful aspect is that
Inbox<A>
isDivisible
andDecidable
.A
Divisible
contravariant functor is the contravariant analogue ofApplicative
.Continuing the intuition that Contravariant functors consume input, a
Divisible
contravariant functor also has the ability to be composed "beside" another contravariant functor.If you 'follow the arrows' here, then you'll see that
fb
andfc
get somehow composed using thef
function that takes anA
value, turns them into a pair(B, C)
and then passes them on tofb
andfc
.Visually:
So,
Divide
allows a singleF<A>
to represent the splitting of values and the routing into two new sinks (F<B>
andF<C>
). WithInbox
this allows anInbox<A>
to be a sink of values ofA
that then route to other (hidden)Inbox
structures.Decidable
contravariant functors are very similar toDivisible
contravariant functors. But, instead of generating a tuple of(B, C)
to route the incoming values to two other contravariant functors at the same time,Decidable
contravariant functors return anEither<B, C>
, which means we route the values to only one contravariant functor.Again,
Inbox<A>
is aDecidable
contravariant functor and so you can callRoute
to direct the values downstream.Visually, if
B
is returned byf
:If
C
is returned byf
This discussion was created from the release Pipes refactor.
Beta Was this translation helpful? Give feedback.
All reactions