-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Take steps to avoid threadpool starvation #11275
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,15 +10,14 @@ | |
using System.IO.Pipes; | ||
using System.Diagnostics; | ||
using System.Linq; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
#if FEATURE_PIPE_SECURITY | ||
using System.Security.Principal; | ||
#endif | ||
|
||
#if FEATURE_APM | ||
using Microsoft.Build.Eventing; | ||
#else | ||
using System.Threading; | ||
#endif | ||
using Microsoft.Build.Internal; | ||
using Microsoft.Build.Shared; | ||
|
@@ -576,7 +575,7 @@ private enum ExitPacketState | |
/// <summary> | ||
/// A queue used for enqueuing packets to write to the stream asynchronously. | ||
/// </summary> | ||
private BlockingCollection<INodePacket> _packetWriteQueue = new BlockingCollection<INodePacket>(); | ||
private ConcurrentQueue<INodePacket> _packetWriteQueue = new ConcurrentQueue<INodePacket>(); | ||
|
||
/// <summary> | ||
/// A task representing the last packet write, so we can chain packet writes one after another. | ||
|
@@ -711,7 +710,7 @@ public void SendData(INodePacket packet) | |
{ | ||
_exitPacketState = ExitPacketState.ExitPacketQueued; | ||
} | ||
_packetWriteQueue.Add(packet); | ||
_packetWriteQueue.Enqueue(packet); | ||
DrainPacketQueue(); | ||
} | ||
|
||
|
@@ -733,65 +732,63 @@ private void DrainPacketQueue() | |
{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't it time to consider having a separate thread for the drain? with the "new" await, it should not even require a dedicated thread. (unless I'm grossly misunderstanding something) Building on the channel idea from previous comment: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Aside from that, the only place that is ever doing the Draining is this one:
it is also the only place that ever places packets into the Queue. since we're already touching this place, I would like to take it further if possible. |
||
// average latency between the moment this runs and when the delegate starts | ||
// running is about 100-200 microseconds (unless there's thread pool saturation) | ||
_packetWriteDrainTask = _packetWriteDrainTask.ContinueWith(_ => | ||
_packetWriteDrainTask = _packetWriteDrainTask.ContinueWith( | ||
SendDataCoreAsync, | ||
this, | ||
TaskScheduler.Default).Unwrap(); | ||
|
||
static async Task SendDataCoreAsync(Task _, object state) | ||
{ | ||
while (_packetWriteQueue.TryTake(out var packet)) | ||
NodeContext context = (NodeContext)state; | ||
while (context._packetWriteQueue.TryDequeue(out var packet)) | ||
{ | ||
SendDataCore(packet); | ||
} | ||
}, TaskScheduler.Default); | ||
} | ||
} | ||
MemoryStream writeStream = context._writeBufferMemoryStream; | ||
|
||
/// <summary> | ||
/// Actually writes and sends the packet. This can't be called in parallel | ||
/// because it reuses the _writeBufferMemoryStream, and this is why we use | ||
/// the _packetWriteDrainTask to serially chain invocations one after another. | ||
/// </summary> | ||
/// <param name="packet">The packet to send.</param> | ||
private void SendDataCore(INodePacket packet) | ||
{ | ||
MemoryStream writeStream = _writeBufferMemoryStream; | ||
// clear the buffer but keep the underlying capacity to avoid reallocations | ||
writeStream.SetLength(0); | ||
|
||
// clear the buffer but keep the underlying capacity to avoid reallocations | ||
writeStream.SetLength(0); | ||
ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(writeStream); | ||
try | ||
{ | ||
writeStream.WriteByte((byte)packet.Type); | ||
|
||
ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(writeStream); | ||
try | ||
{ | ||
writeStream.WriteByte((byte)packet.Type); | ||
// Pad for the packet length | ||
WriteInt32(writeStream, 0); | ||
packet.Translate(writeTranslator); | ||
|
||
// Pad for the packet length | ||
WriteInt32(writeStream, 0); | ||
packet.Translate(writeTranslator); | ||
int writeStreamLength = (int)writeStream.Position; | ||
|
||
int writeStreamLength = (int)writeStream.Position; | ||
// Now plug in the real packet length | ||
writeStream.Position = 1; | ||
WriteInt32(writeStream, writeStreamLength - 5); | ||
|
||
// Now plug in the real packet length | ||
writeStream.Position = 1; | ||
WriteInt32(writeStream, writeStreamLength - 5); | ||
byte[] writeStreamBuffer = writeStream.GetBuffer(); | ||
|
||
byte[] writeStreamBuffer = writeStream.GetBuffer(); | ||
for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize) | ||
{ | ||
int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize); | ||
#pragma warning disable CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync' | ||
await context._serverToClientStream.WriteAsync(writeStreamBuffer, i, lengthToWrite, CancellationToken.None); | ||
#pragma warning restore CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync' | ||
} | ||
|
||
for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize) | ||
{ | ||
int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize); | ||
_serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite); | ||
} | ||
if (IsExitPacket(packet)) | ||
{ | ||
_exitPacketState = ExitPacketState.ExitPacketSent; | ||
if (IsExitPacket(packet)) | ||
{ | ||
context._exitPacketState = ExitPacketState.ExitPacketSent; | ||
} | ||
} | ||
catch (IOException e) | ||
{ | ||
// Do nothing here because any exception will be caught by the async read handler | ||
CommunicationsUtilities.Trace(context._nodeId, "EXCEPTION in SendData: {0}", e); | ||
} | ||
catch (ObjectDisposedException) // This happens if a child dies unexpectedly | ||
{ | ||
// Do nothing here because any exception will be caught by the async read handler | ||
} | ||
} | ||
} | ||
} | ||
catch (IOException e) | ||
{ | ||
// Do nothing here because any exception will be caught by the async read handler | ||
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in SendData: {0}", e); | ||
} | ||
catch (ObjectDisposedException) // This happens if a child dies unexpectedly | ||
{ | ||
// Do nothing here because any exception will be caught by the async read handler | ||
} | ||
} | ||
|
||
private static bool IsExitPacket(INodePacket packet) | ||
|
@@ -802,7 +799,7 @@ private static bool IsExitPacket(INodePacket packet) | |
/// <summary> | ||
/// Avoid having a BinaryWriter just to write a 4-byte int | ||
/// </summary> | ||
private void WriteInt32(MemoryStream stream, int value) | ||
private static void WriteInt32(MemoryStream stream, int value) | ||
{ | ||
stream.WriteByte((byte)value); | ||
stream.WriteByte((byte)(value >> 8)); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any advantage for a ConcurrentQueue as opposed to using Channel?
I'm not saying that we should use channel - when I was researching this piece of code, I've stumbled onto it and it piqued my interest. Hence the question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm open to that option. I'm not familiar with the performance characteristics of Channel, so I can't comment on that.