Skip to content

Commit

Permalink
Update to Orleans 2.4.4
Browse files Browse the repository at this point in the history
  • Loading branch information
yevhen committed Dec 2, 2019
1 parent 533b21c commit 390667f
Show file tree
Hide file tree
Showing 5 changed files with 487 additions and 13 deletions.
14 changes: 7 additions & 7 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@
<PropertyGroup>

<!-- Orleans packages -->
<MicrosoftOrleansClientVersion>2.3.0</MicrosoftOrleansClientVersion>
<MicrosoftOrleansServerVersion>2.3.0</MicrosoftOrleansServerVersion>
<MicrosoftOrleansCodeGeneratorVersion>2.3.0</MicrosoftOrleansCodeGeneratorVersion>
<MicrosoftOrleansRuntimeVersion>2.3.0</MicrosoftOrleansRuntimeVersion>
<MicrosoftOrleansStreamingAzureStorageVersion>2.3.0</MicrosoftOrleansStreamingAzureStorageVersion>
<MicrosoftOrleansClientVersion>2.4.4</MicrosoftOrleansClientVersion>
<MicrosoftOrleansServerVersion>2.4.4</MicrosoftOrleansServerVersion>
<MicrosoftOrleansCodeGeneratorVersion>2.4.4</MicrosoftOrleansCodeGeneratorVersion>
<MicrosoftOrleansRuntimeVersion>2.4.4</MicrosoftOrleansRuntimeVersion>
<MicrosoftOrleansStreamingAzureStorageVersion>2.4.4</MicrosoftOrleansStreamingAzureStorageVersion>

<!-- Microsoft extensions -->
<MicrosoftExtensionsLoggingVersion>2.1.0</MicrosoftExtensionsLoggingVersion>
<MicrosoftExtensionsLoggingConsoleVersion>2.1.0</MicrosoftExtensionsLoggingConsoleVersion>
<MicrosoftExtensionsLoggingVersion>2.1.1</MicrosoftExtensionsLoggingVersion>
<MicrosoftExtensionsLoggingConsoleVersion>2.1.1</MicrosoftExtensionsLoggingConsoleVersion>

<!-- Testing packages -->
<NUnitVersion>3.9.0</NUnitVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,30 +43,43 @@ public IAsyncStream<T> GetStream<T>(Guid unused, string id)
var recipients = StreamSubscriptionSpecification.Match(system, id, specifications);

Func<T, Task> fan = item => Task.CompletedTask;
Func<IEnumerable<T>, Task> batchFan = batch => Task.CompletedTask;

if (recipients.Length > 0)
{
fan = item => Task.WhenAll(recipients.Select(x => x.Receive(item)));
batchFan = batch => Task.WhenAll(recipients.Select(x => x.Receive(batch)));
}

return new StreamVentilator<T>(stream, fan);
return new StreamVentilator<T>(stream, fan, batchFan);
});
}

class StreamVentilator<T> : IAsyncStream<T>
{
readonly IAsyncStream<T> stream;
readonly Func<T, Task> fan;
readonly Func<IEnumerable<T>, Task> batchFan;

public StreamVentilator(IAsyncStream<T> stream, Func<T, Task> fan)
public StreamVentilator(IAsyncStream<T> stream, Func<T, Task> fan, Func<IEnumerable<T>, Task> batchFan)
{
this.stream = stream;
this.fan = fan;
this.batchFan = batchFan;
}

public Task OnNextAsync(T item, StreamSequenceToken token = null)
{
return Task.WhenAll(stream.OnNextAsync(item, token), fan(item));
}

public Task OnNextBatchAsync(IEnumerable<T> batch, StreamSequenceToken token = null)
{
// ReSharper disable PossibleMultipleEnumeration
return Task.WhenAll(stream.OnNextBatchAsync(batch, token), batchFan(batch));
// ReSharper restore PossibleMultipleEnumeration
}

#region Uninteresting Delegation (Nothing To See Here)

public Guid Guid => stream.Guid;
Expand Down
12 changes: 10 additions & 2 deletions Source/Orleankka/StreamRef.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
Expand Down Expand Up @@ -45,9 +46,16 @@ IAsyncStream<object> Endpoint

public StreamPath Path { get; }

public virtual Task Push(object item)
/// <summary>
/// Pushes given item of batch of items to a stream.
/// </summary>
/// <param name="itemOrBatch">Single item or batch (<see cref="IEnumerable{T}"/>) </param>
/// <returns></returns>
public virtual Task Push(object itemOrBatch)
{
return Endpoint.OnNextAsync(item);
return itemOrBatch is IEnumerable<object> batch
? Endpoint.OnNextBatchAsync(batch)
: Endpoint.OnNextAsync(itemOrBatch);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

using NUnit.Framework;

using Orleans;

namespace Orleankka.Legacy.Features
{
namespace Declarative_stream_subscriptions
Expand Down Expand Up @@ -173,6 +175,29 @@ public async Task Dynamic_target_selection<T>() where T : IActorGrain
Assert.That((await consumer2.Ask(new Received()))[0], Is.EqualTo("blue"));
}

public async Task Batch_receive<T>(bool unsupported = false) where T : IActorGrain, IGrainWithStringKey
{
var stream = system.StreamOf(provider, "batched");

if (unsupported)
{
var e = Assert.ThrowsAsync<NotImplementedException>(async () => await stream.Push(new[] {"i1", "i2"}));
Assert.That(e.Message,
Is.EqualTo("We still don't support OnNextBatchAsync()"),
"Orleans still doesn't support batching in 2.4.4");
return;
}

await stream.Push(new[] {"i1", "i2"});
await Task.Delay(timeout);

var consumer = system.ActorOf<T>("#");
var received = await consumer.Ask(new Received());
Assert.That(received,
Is.EquivalentTo(new[] {"i1", "i2"}),
"The batch will be unrolled at a grain and items will be delivered to Receive one by one");
}

async Task Push(StreamRef stream, object item)
{
var producer = system.ActorOf<ITestProducerActor>("foo");
Expand Down Expand Up @@ -254,6 +279,13 @@ public class TestDynamicTargetSelectorActor : TestConsumerActorBase, ITestDynami
public static string ComputeTarget(object item) => $"{item}-pill";
}

public interface ITestBatchReceiveActor : IActorGrain, IGrainWithStringKey
{ }

[StreamSubscription(Source = "sms:batched", Target = "#")]
public class TestBatchReceiveActor : TestConsumerActorBase, ITestBatchReceiveActor
{}

[TestFixture, RequiresSilo]
public class Tests
{
Expand All @@ -270,6 +302,7 @@ static TestCases Verify() =>
[Test] public async Task Select_all_filter() => await Verify().Select_all_filter<ITestSelectAllFilterActor>();
[Test] public async Task Explicit_filter() => await Verify().Explicit_filter<ITestExplicitFilterActor>();
[Test] public async Task Dynamic_target_selection() => await Verify().Dynamic_target_selection<ITestDynamicTargetSelectorActor>();
[Test] public async Task Batch_receive() => await Verify().Batch_receive<ITestBatchReceiveActor>(unsupported: true);
}
}

Expand Down Expand Up @@ -312,7 +345,7 @@ public class TestDeclaredHandlerOnlyAutomaticFilterActor : TestConsumerActorBase
{}

public interface ITestSelectAllFilterActor : IActorGrain
{ }
{}

[StreamSubscription(Source = "aqp:select-all", Target = "#", Filter = "*")]
public class TestSelectAllFilterActor : TestConsumerActorBase, ITestSelectAllFilterActor
Expand Down Expand Up @@ -347,6 +380,13 @@ public class TestDynamicTargetSelectorActor : TestConsumerActorBase, ITestDynami
public static string ComputeTarget(object item) => $"{item}-pill";
}

public interface ITestBatchReceiveActor : IActorGrain, IGrainWithStringKey
{ }

[StreamSubscription(Source = "aqp:batched", Target = "#")]
public class TestBatchReceiveActor : TestConsumerActorBase, ITestBatchReceiveActor
{}

[TestFixture, RequiresSilo]
[Category("Slow")]
public class Tests
Expand All @@ -362,7 +402,8 @@ static TestCases Verify() =>
[Test] public async Task Select_all_filter() => await Verify().Select_all_filter<ITestSelectAllFilterActor>();
[Test] public async Task Explicit_filter() => await Verify().Explicit_filter<ITestExplicitFilterActor>();
[Test] public async Task Dynamic_target_selection() => await Verify().Dynamic_target_selection<ITestDynamicTargetSelectorActor>();
}
[Test] public async Task Batch_receive() => await Verify().Batch_receive<ITestBatchReceiveActor>();
}
}
}
}
Loading

0 comments on commit 390667f

Please sign in to comment.