Skip to content

Commit

Permalink
ghost asynchronous send.
Browse files Browse the repository at this point in the history
  • Loading branch information
jiowchern committed Oct 12, 2024
1 parent cc337b8 commit ffb3949
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 47 deletions.
29 changes: 16 additions & 13 deletions Regulus.Network/PackageSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,14 @@ public class PackageSender : IDisposable
private readonly IPool _Pool;


private Task _Sending;
private Task<int> _Sending;

public PackageSender(IStreamable stream, Regulus.Memorys.IPool pool)
{
_Sending = Task.CompletedTask;
_Sending = Task.FromResult(0);

_Stream = stream;
_Pool = pool;

//_Consumer.Add(this);
}

public void Push(Regulus.Memorys.Buffer buffer)
Expand All @@ -43,30 +41,35 @@ public void Push(Regulus.Memorys.Buffer buffer)
}

void IDisposable.Dispose()
{
//_Consumer.Remove(this);
{
}



private void _Push(Memorys.Buffer buffer)
{
_Sending = _Sending.ContinueWith(async t => {
await _SendBufferAsync(buffer);
});

if(_Sending.IsCompleted || _Sending.IsFaulted || _Sending.IsCanceled )
{
_Sending = _SendBufferAsync(buffer);
}
else
{
_Sending = _Sending.ContinueWith(t=> _SendBufferAsync(buffer)).Unwrap();
}
}
private async Task _SendBufferAsync(Regulus.Memorys.Buffer buffer)

private async Task<int> _SendBufferAsync(Regulus.Memorys.Buffer buffer)
{
var sendCount = 0;
do
{
var count = await _Stream.Send(buffer.Bytes.Array, buffer.Bytes.Offset + sendCount, buffer.Count - sendCount);
if (count == 0)
return;
return 0;
sendCount += count;

} while (sendCount < buffer.Count);
return sendCount;
}


Expand Down
48 changes: 14 additions & 34 deletions Regulus.Remote.Ghost/GhostSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class GhostSerializer : ServerExchangeable
private readonly IInternalSerializable _Serializable;
private readonly System.Collections.Concurrent.ConcurrentQueue<Regulus.Remote.Packages.ResponsePackage> _Receives;

private readonly System.Collections.Concurrent.ConcurrentQueue<Regulus.Remote.Packages.RequestPackage> _Sends;

private readonly System.Collections.Concurrent.ConcurrentBag<System.Exception> _Exceptions;
public event System.Action<System.Exception> ErrorEvent;
public GhostSerializer(Regulus.Network.PackageReader reader , PackageSender sender, IInternalSerializable serializable)
Expand All @@ -25,7 +25,7 @@ public GhostSerializer(Regulus.Network.PackageReader reader , PackageSender send
_Reader = reader;
_Sender = sender;
this._Serializable = serializable;
_Sends = new System.Collections.Concurrent.ConcurrentQueue<Regulus.Remote.Packages.RequestPackage>();

_Receives = new System.Collections.Concurrent.ConcurrentQueue<Regulus.Remote.Packages.ResponsePackage>();

_ResponseEvent += _Empty;
Expand All @@ -52,13 +52,13 @@ private void _Empty(ServerToClientOpCode arg1, Regulus.Memorys.Buffer arg2)
}

void Exchangeable<ClientToServerOpCode, ServerToClientOpCode>.Request(ClientToServerOpCode code, Regulus.Memorys.Buffer args)
{
_Sends.Enqueue(
new Regulus.Remote.Packages.RequestPackage()
{
Data = args.ToArray(),
Code = code
});
{
var buf = _Serializable.Serialize(new Regulus.Remote.Packages.RequestPackage()
{
Data = args.ToArray(),
Code = code
});
_Sender.Push(buf);
}

public void Start()
Expand All @@ -71,9 +71,9 @@ public void Stop()
{

_ReaderStop();
Regulus.Remote.Packages.RequestPackage val;

Regulus.Remote.Packages.ResponsePackage val2;
while (_Sends.TryDequeue(out val) || _Receives.TryDequeue(out val2))
while (_Receives.TryDequeue(out val2))
{

}
Expand All @@ -93,34 +93,14 @@ void _Update()
private void _Process()
{

Regulus.Remote.Packages.ResponsePackage receivePkg;
while(_Receives.TryDequeue(out receivePkg))

while(_Receives.TryDequeue(out var receivePkg))
{
_ResponseEvent(receivePkg.Code, receivePkg.Data.AsBuffer());
}

Regulus.Remote.Packages.RequestPackage[] sends = _SendsPop();
foreach (var send in sends)
{
var buf = _Serializable.Serialize(send);
_Sender.Push(buf);
}


}

private Regulus.Remote.Packages.RequestPackage[] _SendsPop()
{

List<Regulus.Remote.Packages.RequestPackage> pkgs = new List<Regulus.Remote.Packages.RequestPackage>();
Regulus.Remote.Packages.RequestPackage pkg;
while(_Sends.TryDequeue(out pkg))
{
pkgs.Add(pkg);

}
return pkgs.ToArray();
}


private async Task _ReaderStart()
{
Expand Down

0 comments on commit ffb3949

Please sign in to comment.