Skip to content

Commit cae979c

Browse files
committed
Investigate & fix SemaphoreFullException
Fixes #1818 Start by modifying test to concurrently publish to the same `IChannel` instance.
1 parent 970d36d commit cae979c

File tree

1 file changed

+21
-15
lines changed

1 file changed

+21
-15
lines changed

projects/Test/Integration/TestFloodPublishing.cs

+21-15
Original file line numberDiff line numberDiff line change
@@ -92,31 +92,37 @@ public async Task TestUnthrottledFloodPublishing()
9292
return Task.CompletedTask;
9393
};
9494

95-
var publishTasks = new List<Task>();
9695
var stopwatch = Stopwatch.StartNew();
97-
int i = 0;
9896
int publishCount = 0;
9997
try
10098
{
101-
for (i = 0; i < 65535 * 64; i++)
99+
var tasks = new List<Task>();
100+
for (int j = 0; j < 64; j++)
102101
{
103-
if (i % 65536 == 0)
102+
tasks.Add(Task.Run(async () =>
104103
{
105-
if (stopwatch.Elapsed > FiveSeconds)
104+
var publishTasks = new List<Task>();
105+
for (int i = 0; i < 65536 * 2; i++)
106106
{
107-
break;
108-
}
109-
}
107+
if (stopwatch.Elapsed > FiveSeconds)
108+
{
109+
await Task.WhenAll(publishTasks).WaitAsync(ShortSpan);
110+
publishTasks.Clear();
111+
break;
112+
}
110113

111-
publishCount++;
112-
publishTasks.Add(_channel.BasicPublishAsync(CachedString.Empty, CachedString.Empty, _body).AsTask());
114+
Interlocked.Increment(ref publishCount);
115+
publishTasks.Add(_channel.BasicPublishAsync(CachedString.Empty, CachedString.Empty, _body).AsTask());
113116

114-
if (i % 500 == 0)
115-
{
116-
await Task.WhenAll(publishTasks).WaitAsync(ShortSpan);
117-
publishTasks.Clear();
118-
}
117+
if (i % 500 == 0)
118+
{
119+
await Task.WhenAll(publishTasks).WaitAsync(ShortSpan);
120+
publishTasks.Clear();
121+
}
122+
}
123+
}));
119124
}
125+
await Task.WhenAll(tasks).WaitAsync(WaitSpan);
120126
}
121127
finally
122128
{

0 commit comments

Comments
 (0)