Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1511,6 +1511,7 @@ public SocketError ReceiveFrom(Memory<byte> buffer, ref SocketFlags flags, byte[
{
Buffer = buffer,
Flags = flags,
SetReceivedFlags = true,
SocketAddress = socketAddress,
SocketAddressLen = socketAddressLen,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,86 +48,6 @@ from addr in Loopbacks
from b in new[] { false, true }
select new object[] { addr[0], b };

[ActiveIssue("https://github.com/dotnet/runtime/issues/1712")]
[OuterLoop]
[Theory]
[MemberData(nameof(LoopbackWithBool))]
public async Task SendToRecvFrom_Datagram_UDP(IPAddress loopbackAddress, bool useClone)
{
IPAddress leftAddress = loopbackAddress, rightAddress = loopbackAddress;

const int DatagramSize = 256;
const int DatagramsToSend = 256;
const int AckTimeout = 10000;
const int TestTimeout = 30000;

using var origLeft = new Socket(leftAddress.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
using var origRight = new Socket(rightAddress.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
origLeft.BindToAnonymousPort(leftAddress);
origRight.BindToAnonymousPort(rightAddress);

using var left = useClone ? new Socket(origLeft.SafeHandle) : origLeft;
using var right = useClone ? new Socket(origRight.SafeHandle) : origRight;

var leftEndpoint = (IPEndPoint)left.LocalEndPoint;
var rightEndpoint = (IPEndPoint)right.LocalEndPoint;

var receiverAck = new SemaphoreSlim(0);
var senderAck = new SemaphoreSlim(0);

_output.WriteLine($"{DateTime.Now}: Sending data from {rightEndpoint} to {leftEndpoint}");

var receivedChecksums = new uint?[DatagramsToSend];
Task leftThread = Task.Run(async () =>
{
EndPoint remote = leftEndpoint.Create(leftEndpoint.Serialize());
var recvBuffer = new byte[DatagramSize];
for (int i = 0; i < DatagramsToSend; i++)
{
SocketReceiveFromResult result = await ReceiveFromAsync(
left, new ArraySegment<byte>(recvBuffer), remote);
Assert.Equal(DatagramSize, result.ReceivedBytes);
Assert.Equal(rightEndpoint, result.RemoteEndPoint);

int datagramId = recvBuffer[0];
Assert.Null(receivedChecksums[datagramId]);
receivedChecksums[datagramId] = Fletcher32.Checksum(recvBuffer, 0, result.ReceivedBytes);

receiverAck.Release();
bool gotAck = await senderAck.WaitAsync(TestTimeout);
Assert.True(gotAck, $"{DateTime.Now}: Timeout waiting {TestTimeout} for senderAck in iteration {i}");
}
});

var sentChecksums = new uint[DatagramsToSend];
using (right)
{
var random = new Random();
var sendBuffer = new byte[DatagramSize];
for (int i = 0; i < DatagramsToSend; i++)
{
random.NextBytes(sendBuffer);
sendBuffer[0] = (byte)i;

int sent = await SendToAsync(right, new ArraySegment<byte>(sendBuffer), leftEndpoint);

bool gotAck = await receiverAck.WaitAsync(AckTimeout);
Assert.True(gotAck, $"{DateTime.Now}: Timeout waiting {AckTimeout} for receiverAck in iteration {i} after sending {sent}. Receiver is in {leftThread.Status}");
senderAck.Release();

Assert.Equal(DatagramSize, sent);
sentChecksums[i] = Fletcher32.Checksum(sendBuffer, 0, sent);
}
}

await leftThread;
for (int i = 0; i < DatagramsToSend; i++)
{
Assert.NotNull(receivedChecksums[i]);
Assert.Equal(sentChecksums[i], (uint)receivedChecksums[i]);
}
}

[OuterLoop]
[Theory]
[MemberData(nameof(LoopbacksAndBuffers))]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

namespace System.Net.Sockets.Tests
{
[Collection(nameof(NoParallelTests))]
public abstract class SendReceiveNonParallel<T> : SocketTestHelperBase<T> where T : SocketHelperBase, new()
{
public SendReceiveNonParallel(ITestOutputHelper output) : base(output) { }

public static IEnumerable<object[]> LoopbackWithBool =>
from addr in Loopbacks
from b in new[] { false, true }
select new object[] { addr[0], b };

[OuterLoop("Serial execution of all variants takes long")]
[Theory]
[MemberData(nameof(LoopbackWithBool))]
public async Task SendToRecvFrom_Datagram_UDP(IPAddress loopbackAddress, bool useClone)
{
IPAddress leftAddress = loopbackAddress, rightAddress = loopbackAddress;

const int DatagramSize = 256;
const int DatagramsToSend = 256;
const int ReceiverAckTimeout = 5000;
const int SenderAckTimeout = 10000;

using var origLeft = new Socket(leftAddress.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
using var origRight = new Socket(rightAddress.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
origLeft.BindToAnonymousPort(leftAddress);
origRight.BindToAnonymousPort(rightAddress);

using var left = useClone ? new Socket(origLeft.SafeHandle) : origLeft;
using var right = useClone ? new Socket(origRight.SafeHandle) : origRight;

// Force non-blocking mode in ...SyncForceNonBlocking variants of the test:
ConfigureNonBlocking(left);
ConfigureNonBlocking(right);

var leftEndpoint = (IPEndPoint)left.LocalEndPoint;
var rightEndpoint = (IPEndPoint)right.LocalEndPoint;

var receiverAck = new SemaphoreSlim(0);
var senderAck = new SemaphoreSlim(0);

_output.WriteLine($"{DateTime.Now}: Sending data from {rightEndpoint} to {leftEndpoint}");

var receivedChecksums = new uint?[DatagramsToSend];
Task leftThread = Task.Run(async () =>
{
EndPoint remote = leftEndpoint.Create(leftEndpoint.Serialize());
var recvBuffer = new byte[DatagramSize];
for (int i = 0; i < DatagramsToSend; i++)
{
SocketReceiveFromResult result = await ReceiveFromAsync(
left, new ArraySegment<byte>(recvBuffer), remote);
Assert.Equal(DatagramSize, result.ReceivedBytes);
Assert.Equal(rightEndpoint, result.RemoteEndPoint);

int datagramId = recvBuffer[0];
Assert.Null(receivedChecksums[datagramId]);
receivedChecksums[datagramId] = Fletcher32.Checksum(recvBuffer, 0, result.ReceivedBytes);

receiverAck.Release();
bool gotAck = await senderAck.WaitAsync(SenderAckTimeout);
Assert.True(gotAck, $"{DateTime.Now}: Timeout waiting {SenderAckTimeout} for senderAck in iteration {i}");
}
});

var sentChecksums = new uint[DatagramsToSend];
using (right)
{
var random = new Random();
var sendBuffer = new byte[DatagramSize];
for (int i = 0; i < DatagramsToSend; i++)
{
random.NextBytes(sendBuffer);
sendBuffer[0] = (byte)i;

int sent = await SendToAsync(right, new ArraySegment<byte>(sendBuffer), leftEndpoint);

bool gotAck = await receiverAck.WaitAsync(ReceiverAckTimeout);
Assert.True(gotAck, $"{DateTime.Now}: Timeout waiting {ReceiverAckTimeout} for receiverAck in iteration {i} after sending {sent}. Receiver is in {leftThread.Status}");
senderAck.Release();

Assert.Equal(DatagramSize, sent);
sentChecksums[i] = Fletcher32.Checksum(sendBuffer, 0, sent);
}
}

await leftThread;
for (int i = 0; i < DatagramsToSend; i++)
{
Assert.NotNull(receivedChecksums[i]);
Assert.Equal(sentChecksums[i], (uint)receivedChecksums[i]);
}
}
}

public sealed class SendReceiveNonParallel_Sync : SendReceiveNonParallel<SocketHelperArraySync>
{
public SendReceiveNonParallel_Sync(ITestOutputHelper output) : base(output) { }
}

public sealed class SendReceiveNonParallel_SyncForceNonBlocking : SendReceiveNonParallel<SocketHelperSyncForceNonBlocking>
{
public SendReceiveNonParallel_SyncForceNonBlocking(ITestOutputHelper output) : base(output) { }
}

public sealed class SendReceiveNonParallel_Apm : SendReceiveNonParallel<SocketHelperApm>
{
public SendReceiveNonParallel_Apm(ITestOutputHelper output) : base(output) { }
}

public sealed class SendReceiveNonParallel_Task : SendReceiveNonParallel<SocketHelperTask>
{
public SendReceiveNonParallel_Task(ITestOutputHelper output) : base(output) { }
}

public sealed class SendReceiveNonParallel_Eap : SendReceiveNonParallel<SocketHelperEap>
{
public SendReceiveNonParallel_Eap(ITestOutputHelper output) : base(output) { }
}

public sealed class SendReceiveNonParallel_SpanSync : SendReceiveNonParallel<SocketHelperSpanSync>
{
public SendReceiveNonParallel_SpanSync(ITestOutputHelper output) : base(output) { }
}

public sealed class SendReceiveNonParallel_SpanSyncForceNonBlocking : SendReceiveNonParallel<SocketHelperSpanSyncForceNonBlocking>
{
public SendReceiveNonParallel_SpanSyncForceNonBlocking(ITestOutputHelper output) : base(output) { }
}

public sealed class SendReceiveNonParallel_MemoryArrayTask : SendReceiveNonParallel<SocketHelperMemoryArrayTask>
{
public SendReceiveNonParallel_MemoryArrayTask(ITestOutputHelper output) : base(output) { }
}

public sealed class SendReceiveNonParallel_MemoryNativeTask : SendReceiveNonParallel<SocketHelperMemoryNativeTask>
{
public SendReceiveNonParallel_MemoryNativeTask(ITestOutputHelper output) : base(output) { }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public abstract Task<SocketReceiveFromResult> ReceiveFromAsync(
public virtual bool SupportsAcceptIntoExistingSocket => true;
public virtual bool SupportsAcceptReceive => false;
public virtual void Listen(Socket s, int backlog) { s.Listen(backlog); }
public virtual void ConfigureNonBlocking(Socket s) { }
}

public class SocketHelperArraySync : SocketHelperBase
Expand Down Expand Up @@ -91,6 +92,7 @@ public override void Listen(Socket s, int backlog)
s.Listen(backlog);
s.ForceNonBlocking(true);
}
public override void ConfigureNonBlocking(Socket s) => s.ForceNonBlocking(true);
}

public sealed class SocketHelperApm : SocketHelperBase
Expand Down Expand Up @@ -345,6 +347,7 @@ public Task<SocketReceiveFromResult> ReceiveFromAsync(
public bool SupportsAcceptIntoExistingSocket => _socketHelper.SupportsAcceptIntoExistingSocket;
public bool SupportsAcceptReceive => _socketHelper.SupportsAcceptReceive;
public void Listen(Socket s, int backlog) => _socketHelper.Listen(s, backlog);
public void ConfigureNonBlocking(Socket s) => _socketHelper.ConfigureNonBlocking(s);
}

public class SocketHelperSpanSync : SocketHelperArraySync
Expand All @@ -364,6 +367,7 @@ public override Task<Socket> AcceptAsync(Socket s) =>
Task.Run(() => { s.ForceNonBlocking(true); Socket accepted = s.Accept(); accepted.ForceNonBlocking(true); return accepted; });
public override Task ConnectAsync(Socket s, EndPoint endPoint) =>
Task.Run(() => { s.ForceNonBlocking(true); s.Connect(endPoint); });
public override void ConfigureNonBlocking(Socket s) => s.ForceNonBlocking(true);
}

public sealed class SocketHelperMemoryArrayTask : SocketHelperTask
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
<Compile Include="SendPacketsElementTest.cs" />
<Compile Include="SendFile.cs" />
<Compile Include="OSSupport.cs" />
<Compile Include="SendReceive.cs" />
<Compile Include="SendReceive\SendReceive.cs" />
<Compile Include="SendReceive\SendReceiveNonParallel.cs" />
<Compile Include="SendTo.cs" />
<Compile Include="SocketDuplicationTests.cs" />
<Compile Include="SocketTestHelper.cs" />
Expand Down