Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Take steps to avoid threadpool starvation #11275

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Build/BackEnd/Components/Caching/ResultsCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ private static bool CheckResults(BuildResult result, List<string> targets, HashS
bool returnValue = true;
foreach (string target in targets)
{
if (!result.HasResultsForTarget(target) || (result[target].ResultCode == TargetResultCode.Skipped && !skippedResultsAreOK))
if (!result.TryGetResultsForTarget(target, out TargetResult targetResult) || (targetResult.ResultCode == TargetResultCode.Skipped && !skippedResultsAreOK))
{
if (targetsMissingResults != null)
{
Expand All @@ -333,7 +333,7 @@ private static bool CheckResults(BuildResult result, List<string> targets, HashS
{
// If the result was a failure and we have not seen any skipped targets up to this point, then we conclude we do
// have results for this request, and they indicate failure.
if (result[target].ResultCode == TargetResultCode.Failure && (targetsMissingResults == null || targetsMissingResults.Count == 0))
if (targetResult.ResultCode == TargetResultCode.Failure && (targetsMissingResults == null || targetsMissingResults.Count == 0))
{
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@
using System.IO.Pipes;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
#if FEATURE_PIPE_SECURITY
using System.Security.Principal;
#endif

#if FEATURE_APM
using Microsoft.Build.Eventing;
#else
using System.Threading;
#endif
using Microsoft.Build.Internal;
using Microsoft.Build.Shared;
Expand Down Expand Up @@ -576,7 +575,7 @@ private enum ExitPacketState
/// <summary>
/// A queue used for enqueuing packets to write to the stream asynchronously.
/// </summary>
private BlockingCollection<INodePacket> _packetWriteQueue = new BlockingCollection<INodePacket>();
private ConcurrentQueue<INodePacket> _packetWriteQueue = new ConcurrentQueue<INodePacket>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any advantage for a ConcurrentQueue as opposed to using Channel?
I'm not saying that we should use channel - when I was researching this piece of code, I've stumbled onto it and it piqued my interest. Hence the question.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to that option. I'm not familiar with the performance characteristics of Channel, so I can't comment on that.


/// <summary>
/// A task representing the last packet write, so we can chain packet writes one after another.
Expand Down Expand Up @@ -711,7 +710,7 @@ public void SendData(INodePacket packet)
{
_exitPacketState = ExitPacketState.ExitPacketQueued;
}
_packetWriteQueue.Add(packet);
_packetWriteQueue.Enqueue(packet);
DrainPacketQueue();
}

Expand All @@ -733,65 +732,63 @@ private void DrainPacketQueue()
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it time to consider having a separate thread for the drain? with the "new" await, it should not even require a dedicated thread. (unless I'm grossly misunderstanding something)

Building on the channel idea from previous comment:
We could set up a multi writer, one reader channel.
Then have one thread to drain the channel
e.g. while (await channelReader.WaitToReadAsync()) {doTheDraining()}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside from that, the only place that is ever doing the Draining is this one:

 public void SendData(INodePacket packet)
 {
     if (IsExitPacket(packet))
     {
         _exitPacketState = ExitPacketState.ExitPacketQueued;
     }
     _packetWriteQueue.Add(packet);
     DrainPacketQueue();
 }

it is also the only place that ever places packets into the Queue.

since we're already touching this place, I would like to take it further if possible.

// average latency between the moment this runs and when the delegate starts
// running is about 100-200 microseconds (unless there's thread pool saturation)
_packetWriteDrainTask = _packetWriteDrainTask.ContinueWith(_ =>
_packetWriteDrainTask = _packetWriteDrainTask.ContinueWith(
SendDataCoreAsync,
this,
TaskScheduler.Default).Unwrap();

static async Task SendDataCoreAsync(Task _, object state)
{
while (_packetWriteQueue.TryTake(out var packet))
NodeContext context = (NodeContext)state;
while (context._packetWriteQueue.TryDequeue(out var packet))
{
SendDataCore(packet);
}
}, TaskScheduler.Default);
}
}
MemoryStream writeStream = context._writeBufferMemoryStream;

/// <summary>
/// Actually writes and sends the packet. This can't be called in parallel
/// because it reuses the _writeBufferMemoryStream, and this is why we use
/// the _packetWriteDrainTask to serially chain invocations one after another.
/// </summary>
/// <param name="packet">The packet to send.</param>
private void SendDataCore(INodePacket packet)
{
MemoryStream writeStream = _writeBufferMemoryStream;
// clear the buffer but keep the underlying capacity to avoid reallocations
writeStream.SetLength(0);

// clear the buffer but keep the underlying capacity to avoid reallocations
writeStream.SetLength(0);
ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(writeStream);
try
{
writeStream.WriteByte((byte)packet.Type);

ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(writeStream);
try
{
writeStream.WriteByte((byte)packet.Type);
// Pad for the packet length
WriteInt32(writeStream, 0);
packet.Translate(writeTranslator);

// Pad for the packet length
WriteInt32(writeStream, 0);
packet.Translate(writeTranslator);
int writeStreamLength = (int)writeStream.Position;

int writeStreamLength = (int)writeStream.Position;
// Now plug in the real packet length
writeStream.Position = 1;
WriteInt32(writeStream, writeStreamLength - 5);

// Now plug in the real packet length
writeStream.Position = 1;
WriteInt32(writeStream, writeStreamLength - 5);
byte[] writeStreamBuffer = writeStream.GetBuffer();

byte[] writeStreamBuffer = writeStream.GetBuffer();
for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize)
{
int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize);
#pragma warning disable CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync'
await context._serverToClientStream.WriteAsync(writeStreamBuffer, i, lengthToWrite, CancellationToken.None);
#pragma warning restore CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync'
}

for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize)
{
int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize);
_serverToClientStream.Write(writeStreamBuffer, i, lengthToWrite);
}
if (IsExitPacket(packet))
{
_exitPacketState = ExitPacketState.ExitPacketSent;
if (IsExitPacket(packet))
{
context._exitPacketState = ExitPacketState.ExitPacketSent;
}
}
catch (IOException e)
{
// Do nothing here because any exception will be caught by the async read handler
CommunicationsUtilities.Trace(context._nodeId, "EXCEPTION in SendData: {0}", e);
}
catch (ObjectDisposedException) // This happens if a child dies unexpectedly
{
// Do nothing here because any exception will be caught by the async read handler
}
}
}
}
catch (IOException e)
{
// Do nothing here because any exception will be caught by the async read handler
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in SendData: {0}", e);
}
catch (ObjectDisposedException) // This happens if a child dies unexpectedly
{
// Do nothing here because any exception will be caught by the async read handler
}
}

private static bool IsExitPacket(INodePacket packet)
Expand All @@ -802,7 +799,7 @@ private static bool IsExitPacket(INodePacket packet)
/// <summary>
/// Avoid having a BinaryWriter just to write a 4-byte int
/// </summary>
private void WriteInt32(MemoryStream stream, int value)
private static void WriteInt32(MemoryStream stream, int value)
{
stream.WriteByte((byte)value);
stream.WriteByte((byte)(value >> 8));
Expand Down
10 changes: 3 additions & 7 deletions src/Build/BackEnd/Components/RequestBuilder/TargetBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -546,9 +546,8 @@ await PushTargets(errorTargets, currentTargetEntry, currentTargetEntry.Lookup, t
/// <returns>True to skip the target, false otherwise.</returns>
private bool CheckSkipTarget(ref bool stopProcessingStack, TargetEntry currentTargetEntry)
{
if (_buildResult.HasResultsForTarget(currentTargetEntry.Name))
if (_buildResult.TryGetResultsForTarget(currentTargetEntry.Name, out TargetResult targetResult))
{
TargetResult targetResult = _buildResult[currentTargetEntry.Name] as TargetResult;
ErrorUtilities.VerifyThrowInternalNull(targetResult, "targetResult");

if (targetResult.ResultCode != TargetResultCode.Skipped)
Expand Down Expand Up @@ -665,12 +664,9 @@ private async Task<bool> PushTargets(IList<TargetSpecification> targets, TargetE
{
// Don't build any Before or After targets for which we already have results. Unlike other targets,
// we don't explicitly log a skipped-with-results message because it is not interesting.
if (_buildResult.HasResultsForTarget(targetSpecification.TargetName))
if (_buildResult.TryGetResultsForTarget(targetSpecification.TargetName, out TargetResult targetResult) && targetResult.ResultCode != TargetResultCode.Skipped)
{
if (_buildResult[targetSpecification.TargetName].ResultCode != TargetResultCode.Skipped)
{
continue;
}
continue;
}
}

Expand Down
7 changes: 2 additions & 5 deletions src/Build/BackEnd/Components/Scheduler/Scheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1019,11 +1019,8 @@ private bool IsTraversalRequest(BuildRequest request)
private void AssignUnscheduledRequestsWithConfigurationCountLevelling(List<ScheduleResponse> responses, HashSet<int> idleNodes)
{
// Assign requests but try to keep the same number of configurations on each node
List<int> nodesByConfigurationCountAscending = new List<int>(_availableNodes.Keys);
nodesByConfigurationCountAscending.Sort(delegate (int left, int right)
{
return Comparer<int>.Default.Compare(_schedulingData.GetConfigurationsCountByNode(left, true /* excludeTraversals */, _configCache), _schedulingData.GetConfigurationsCountByNode(right, true /* excludeTraversals */, _configCache));
});
// Use OrderBy to sort since it will cache the lookup in configCache which. This reduces the number of times we have to acquire the lock.
IEnumerable<int> nodesByConfigurationCountAscending = _availableNodes.Keys.OrderBy(x => _schedulingData.GetConfigurationsCountByNode(x, excludeTraversals: true, _configCache));

// Assign projects to nodes, preferring to assign work to nodes with the fewest configurations first.
foreach (int nodeId in nodesByConfigurationCountAscending)
Expand Down
11 changes: 11 additions & 0 deletions src/Build/BackEnd/Shared/BuildResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Microsoft.Build.Shared;
using Microsoft.Build.Shared.FileSystem;
using Microsoft.Build.Framework;
using System.Diagnostics.CodeAnalysis;

namespace Microsoft.Build.Execution
{
Expand Down Expand Up @@ -601,6 +602,16 @@ public bool HasResultsForTarget(string target)
return _resultsByTarget?.ContainsKey(target) ?? false;
}

public bool TryGetResultsForTarget(string target, [NotNullWhen(true)] out TargetResult? value)
{
if (_resultsByTarget is null)
{
value = default;
return false;
}

return _resultsByTarget.TryGetValue(target, out value);
}
#region INodePacket Members

/// <summary>
Expand Down
75 changes: 73 additions & 2 deletions src/Shared/BufferedReadStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@

using System;
using System.IO;
using System.IO.Pipes;
using System.Threading;

#if NET451_OR_GREATER || NETCOREAPP
using System.Threading.Tasks;
#endif

#nullable disable

Expand All @@ -11,14 +17,14 @@ namespace Microsoft.Build.BackEnd
internal class BufferedReadStream : Stream
{
private const int BUFFER_SIZE = 1024;
private Stream _innerStream;
private NamedPipeServerStream _innerStream;
private byte[] _buffer;

// The number of bytes in the buffer that have been read from the underlying stream but not read by consumers of this stream
private int _currentlyBufferedByteCount;
private int _currentIndexInBuffer;

public BufferedReadStream(Stream innerStream)
public BufferedReadStream(NamedPipeServerStream innerStream)
{
_innerStream = innerStream;
_buffer = new byte[BUFFER_SIZE];
Expand Down Expand Up @@ -120,6 +126,71 @@ public override int Read(byte[] buffer, int offset, int count)
}
}

#if NET451_OR_GREATER || NETCOREAPP
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (count > BUFFER_SIZE)
{
// Trying to read more data than the buffer can hold
int alreadyCopied = 0;
if (_currentlyBufferedByteCount > 0)
{
Array.Copy(_buffer, _currentIndexInBuffer, buffer, offset, _currentlyBufferedByteCount);
alreadyCopied = _currentlyBufferedByteCount;
_currentIndexInBuffer = 0;
_currentlyBufferedByteCount = 0;
}
#pragma warning disable CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync'
int innerReadCount = await _innerStream.ReadAsync(buffer, offset + alreadyCopied, count - alreadyCopied, cancellationToken);
#pragma warning restore CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync'
return innerReadCount + alreadyCopied;
}
else if (count <= _currentlyBufferedByteCount)
{
// Enough data buffered to satisfy read request
Array.Copy(_buffer, _currentIndexInBuffer, buffer, offset, count);
_currentIndexInBuffer += count;
_currentlyBufferedByteCount -= count;
return count;
}
else
{
// Need to read more data
int alreadyCopied = 0;
if (_currentlyBufferedByteCount > 0)
{
Array.Copy(_buffer, _currentIndexInBuffer, buffer, offset, _currentlyBufferedByteCount);
alreadyCopied = _currentlyBufferedByteCount;
_currentIndexInBuffer = 0;
_currentlyBufferedByteCount = 0;
}

#pragma warning disable CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync'
int innerReadCount = await _innerStream.ReadAsync(_buffer, 0, BUFFER_SIZE, cancellationToken);
#pragma warning restore CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync'
_currentIndexInBuffer = 0;
_currentlyBufferedByteCount = innerReadCount;

int remainingCopyCount;

if (alreadyCopied + innerReadCount >= count)
{
remainingCopyCount = count - alreadyCopied;
}
else
{
remainingCopyCount = innerReadCount;
}

Array.Copy(_buffer, 0, buffer, offset + alreadyCopied, remainingCopyCount);
_currentIndexInBuffer += remainingCopyCount;
_currentlyBufferedByteCount -= remainingCopyCount;

return alreadyCopied + remainingCopyCount;
}
}
#endif

public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
Expand Down
30 changes: 12 additions & 18 deletions src/Shared/CommunicationsUtilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -539,26 +539,20 @@ internal static int ReadIntForHandshake(this PipeStream stream, byte? byteToAcce
else
#endif
{
// Legacy approach with an early-abort for connection attempts from ancient MSBuild.exes
for (int i = 0; i < bytes.Length; i++)
int bytesRead = stream.Read(bytes, 0, bytes.Length);
if (bytesRead != bytes.Length)
{
int read = stream.ReadByte();

if (read == -1)
{
// We've unexpectly reached end of stream.
// We are now in a bad state, disconnect on our end
throw new IOException(String.Format(CultureInfo.InvariantCulture, "Unexpected end of stream while reading for handshake"));
}

bytes[i] = Convert.ToByte(read);
// We've unexpectly reached end of stream.
// We are now in a bad state, disconnect on our end
throw new IOException(String.Format(CultureInfo.InvariantCulture, "Unexpected end of stream while reading for handshake"));
}

if (i == 0 && byteToAccept != null && byteToAccept != bytes[0])
{
stream.WriteIntForHandshake(0x0F0F0F0F);
stream.WriteIntForHandshake(0x0F0F0F0F);
throw new InvalidOperationException(String.Format(CultureInfo.InvariantCulture, "Client: rejected old host. Received byte {0} instead of {1}.", bytes[0], byteToAccept));
}
// Legacy approach with an early-abort for connection attempts from ancient MSBuild.exes
if (byteToAccept != null && byteToAccept != bytes[0])
{
stream.WriteIntForHandshake(0x0F0F0F0F);
stream.WriteIntForHandshake(0x0F0F0F0F);
throw new InvalidOperationException(String.Format(CultureInfo.InvariantCulture, "Client: rejected old host. Received byte {0} instead of {1}.", bytes[0], byteToAccept));
}
}

Expand Down
Loading