Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,51 @@
using System.Collections.Generic;
using System.Data;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json;
using Xunit.Abstractions;
using Xunit;
using System.Collections;

namespace Microsoft.Data.SqlClient.ManualTesting.Tests.SQL.JsonTest
{
public class JsonBulkCopyTestData : IEnumerable<object[]>
{
public IEnumerator<object[]> GetEnumerator()
{
yield return new object[] { CommandBehavior.Default, false, 300, 100 };
yield return new object[] { CommandBehavior.Default, true, 300, 100 };
yield return new object[] { CommandBehavior.SequentialAccess, false, 300, 100 };
yield return new object[] { CommandBehavior.SequentialAccess, true, 300, 100 };
}
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

public class JsonBulkCopyTest
{
private readonly ITestOutputHelper _output;
private static readonly string _jsonFile = "randomRecords.json";
private static readonly string _outputFile = "serverRecords.json";
private static readonly string _generatedJsonFile = DataTestUtility.GenerateRandomCharacters("randomRecords");
private static readonly string _outputFile = DataTestUtility.GenerateRandomCharacters("serverResults");
private static readonly string _sourceTableName = DataTestUtility.GenerateObjectName();
private static readonly string _destinationTableName = DataTestUtility.GenerateObjectName();

public JsonBulkCopyTest(ITestOutputHelper output)
{
_output = output;
}

private void PopulateData(int noOfRecords)
private void PopulateData(int noOfRecords, int rows)
{
using (SqlConnection connection = new SqlConnection(DataTestUtility.TCPConnectionString))
{
DataTestUtility.CreateTable(connection, "jsonTab", "(data json)");
DataTestUtility.CreateTable(connection, "jsonTabCopy", "(data json)");
GenerateJsonFile(50000, _jsonFile);
StreamJsonFileToServer(connection);
DataTestUtility.CreateTable(connection, _sourceTableName, "(data json)");
DataTestUtility.CreateTable(connection, _destinationTableName, "(data json)");
GenerateJsonFile(noOfRecords, _generatedJsonFile);
while (rows-- > 0)
{
StreamJsonFileToServer(connection);
}
}
}

Expand Down Expand Up @@ -59,7 +75,7 @@ private void GenerateJsonFile(int noOfRecords, string filename)

private void CompareJsonFiles()
{
using (var stream1 = File.OpenText(_jsonFile))
using (var stream1 = File.OpenText(_generatedJsonFile))
using (var stream2 = File.OpenText(_outputFile))
using (var reader1 = new JsonTextReader(stream1))
using (var reader2 = new JsonTextReader(stream2))
Expand All @@ -70,14 +86,14 @@ private void CompareJsonFiles()
}
}

private void PrintJsonDataToFile(SqlConnection connection)
private void PrintJsonDataToFileAndCompare(SqlConnection connection)
{
DeleteFile(_outputFile);
using (SqlCommand command = new SqlCommand("SELECT [data] FROM [jsonTabCopy]", connection))
try
{
using (SqlDataReader reader = command.ExecuteReader(CommandBehavior.SequentialAccess))
DeleteFile(_outputFile);
using (SqlCommand command = new SqlCommand("SELECT [data] FROM [" + _destinationTableName + "]", connection))
{
using (StreamWriter sw = new StreamWriter(_outputFile))
using (SqlDataReader reader = command.ExecuteReader(CommandBehavior.SequentialAccess))
{
while (reader.Read())
{
Expand All @@ -86,28 +102,36 @@ private void PrintJsonDataToFile(SqlConnection connection)

using (TextReader data = reader.GetTextReader(0))
{
do
using (StreamWriter sw = new StreamWriter(_outputFile))
{
charsRead = data.Read(buffer, 0, buffer.Length);
sw.Write(buffer, 0, charsRead);
do
{
charsRead = data.Read(buffer, 0, buffer.Length);
sw.Write(buffer, 0, charsRead);

} while (charsRead > 0);
} while (charsRead > 0);
}
}
_output.WriteLine("Output written to " + _outputFile);
CompareJsonFiles();
}
}
}
}
finally
{
DeleteFile(_outputFile);
}

}

private async Task PrintJsonDataToFileAsync(SqlConnection connection)
private async Task PrintJsonDataToFileAndCompareAsync(SqlConnection connection)
{
DeleteFile(_outputFile);
using (SqlCommand command = new SqlCommand("SELECT [data] FROM [jsonTab]", connection))
try
{
using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
DeleteFile(_outputFile);
using (SqlCommand command = new SqlCommand("SELECT [data] FROM [" + _destinationTableName + "]", connection))
{
using (StreamWriter sw = new StreamWriter(_outputFile))
using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
{
while (await reader.ReadAsync())
{
Expand All @@ -116,25 +140,32 @@ private async Task PrintJsonDataToFileAsync(SqlConnection connection)

using (TextReader data = reader.GetTextReader(0))
{
do
using (StreamWriter sw = new StreamWriter(_outputFile))
{
charsRead = await data.ReadAsync(buffer, 0, buffer.Length);
await sw.WriteAsync(buffer, 0, charsRead);
do
{
charsRead = await data.ReadAsync(buffer, 0, buffer.Length);
await sw.WriteAsync(buffer, 0, charsRead);

} while (charsRead > 0);
} while (charsRead > 0);
}
}
_output.WriteLine("Output written to file " + _outputFile);
CompareJsonFiles();
}
}
}
}
finally
{
DeleteFile(_outputFile);
}
}

private void StreamJsonFileToServer(SqlConnection connection)
{
using (SqlCommand cmd = new SqlCommand("INSERT INTO [jsonTab] (data) VALUES (@jsondata)", connection))
using (SqlCommand cmd = new SqlCommand("INSERT INTO [" + _sourceTableName + "] (data) VALUES (@jsondata)", connection))
{
using (StreamReader jsonFile = File.OpenText(_jsonFile))
using (StreamReader jsonFile = File.OpenText(_generatedJsonFile))
{
cmd.Parameters.Add("@jsondata", Microsoft.Data.SqlDbTypeExtensions.Json, -1).Value = jsonFile;
cmd.ExecuteNonQuery();
Expand All @@ -144,9 +175,9 @@ private void StreamJsonFileToServer(SqlConnection connection)

private async Task StreamJsonFileToServerAsync(SqlConnection connection)
{
using (SqlCommand cmd = new SqlCommand("INSERT INTO [jsonTab] (data) VALUES (@jsondata)", connection))
using (SqlCommand cmd = new SqlCommand("INSERT INTO [" + _sourceTableName + "] (data) VALUES (@jsondata)", connection))
{
using (StreamReader jsonFile = File.OpenText(_jsonFile))
using (StreamReader jsonFile = File.OpenText(_generatedJsonFile))
{
cmd.Parameters.Add("@jsondata", Microsoft.Data.SqlDbTypeExtensions.Json, -1).Value = jsonFile;
await cmd.ExecuteNonQueryAsync();
Expand All @@ -162,23 +193,23 @@ private void DeleteFile(string filename)
}
}

private void BulkCopyData(CommandBehavior cb, bool enableStraming)
private void BulkCopyData(CommandBehavior cb, bool enableStraming, int expectedTransferCount)
{
using (SqlConnection sourceConnection = new SqlConnection(DataTestUtility.TCPConnectionString))
{
sourceConnection.Open();
SqlCommand commandRowCount = new SqlCommand("SELECT COUNT(*) FROM " + "dbo.jsonTabCopy;", sourceConnection);
SqlCommand commandRowCount = new SqlCommand("SELECT COUNT(*) FROM " + _destinationTableName, sourceConnection);
long countStart = System.Convert.ToInt32(commandRowCount.ExecuteScalar());
_output.WriteLine("Starting row count = {0}", countStart);
SqlCommand commandSourceData = new SqlCommand("SELECT data FROM dbo.jsonTab;", sourceConnection);
SqlCommand commandSourceData = new SqlCommand("SELECT data FROM " + _sourceTableName, sourceConnection);
SqlDataReader reader = commandSourceData.ExecuteReader(cb);
using (SqlConnection destinationConnection = new SqlConnection(DataTestUtility.TCPConnectionString))
{
destinationConnection.Open();
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(destinationConnection))
{
bulkCopy.EnableStreaming = enableStraming;
bulkCopy.DestinationTableName = "dbo.jsonTabCopy";
bulkCopy.DestinationTableName = _destinationTableName;
try
{
bulkCopy.WriteToServer(reader);
Expand All @@ -195,27 +226,28 @@ private void BulkCopyData(CommandBehavior cb, bool enableStraming)
long countEnd = System.Convert.ToInt32(commandRowCount.ExecuteScalar());
_output.WriteLine("Ending row count = {0}", countEnd);
_output.WriteLine("{0} rows were added.", countEnd - countStart);
Assert.Equal(expectedTransferCount, countEnd - countStart);
}
}
}

private async Task BulkCopyDataAsync(CommandBehavior cb, bool enableStraming)
private async Task BulkCopyDataAsync(CommandBehavior cb, bool enableStraming, int expectedTransferCount)
{
using (SqlConnection sourceConnection = new SqlConnection(DataTestUtility.TCPConnectionString))
{
await sourceConnection.OpenAsync();
SqlCommand commandRowCount = new SqlCommand("SELECT COUNT(*) FROM " + "dbo.jsonTabCopy;", sourceConnection);
SqlCommand commandRowCount = new SqlCommand("SELECT COUNT(*) FROM " + _destinationTableName, sourceConnection);
long countStart = System.Convert.ToInt32(await commandRowCount.ExecuteScalarAsync());
_output.WriteLine("Starting row count = {0}", countStart);
SqlCommand commandSourceData = new SqlCommand("SELECT data FROM dbo.jsonTab;", sourceConnection);
SqlCommand commandSourceData = new SqlCommand("SELECT data FROM " + _sourceTableName, sourceConnection);
SqlDataReader reader = await commandSourceData.ExecuteReaderAsync(cb);
using (SqlConnection destinationConnection = new SqlConnection(DataTestUtility.TCPConnectionString))
{
await destinationConnection.OpenAsync();
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(destinationConnection))
{
bulkCopy.EnableStreaming = enableStraming;
bulkCopy.DestinationTableName = "dbo.jsonTabCopy";
bulkCopy.DestinationTableName = _destinationTableName;
try
{
await bulkCopy.WriteToServerAsync(reader);
Expand All @@ -232,44 +264,37 @@ private async Task BulkCopyDataAsync(CommandBehavior cb, bool enableStraming)
long countEnd = System.Convert.ToInt32(await commandRowCount.ExecuteScalarAsync());
_output.WriteLine("Ending row count = {0}", countEnd);
_output.WriteLine("{0} rows were added.", countEnd - countStart);
Assert.Equal(expectedTransferCount, countEnd - countStart);
}
}
}

[InlineData(CommandBehavior.Default, false)]
[InlineData(CommandBehavior.Default, true)]
[InlineData(CommandBehavior.SequentialAccess, false)]
[InlineData(CommandBehavior.SequentialAccess, true)]
[ConditionalTheory(typeof(DataTestUtility), nameof(DataTestUtility.IsJsonSupported))]
Comment thread
apoorvdeshmukh marked this conversation as resolved.
public void TestJsonBulkCopy(CommandBehavior cb, bool enableStraming)
[ClassData(typeof(JsonBulkCopyTestData))]
public void TestJsonBulkCopy(CommandBehavior cb, bool enableStraming, int jsonArrayElements, int rows)
{
PopulateData(10000);
PopulateData(jsonArrayElements, rows);
using (SqlConnection connection = new SqlConnection(DataTestUtility.TCPConnectionString))
{
BulkCopyData(cb, enableStraming);
BulkCopyData(cb, enableStraming, rows);
connection.Open();
PrintJsonDataToFile(connection);
CompareJsonFiles();
DeleteFile(_jsonFile);
PrintJsonDataToFileAndCompare(connection);
DeleteFile(_generatedJsonFile);
DeleteFile(_outputFile);
}
}

[InlineData(CommandBehavior.Default, false)]
[InlineData(CommandBehavior.Default, true)]
[InlineData(CommandBehavior.SequentialAccess, false)]
[InlineData(CommandBehavior.SequentialAccess, true)]
[ConditionalTheory(typeof(DataTestUtility), nameof(DataTestUtility.IsJsonSupported))]
public async Task TestJsonBulkCopyAsync(CommandBehavior cb, bool enableStraming)
[ClassData(typeof(JsonBulkCopyTestData))]
public async Task TestJsonBulkCopyAsync(CommandBehavior cb, bool enableStraming, int jsonArrayElements, int rows)
{
PopulateData(10000);
PopulateData(jsonArrayElements, rows);
using (SqlConnection connection = new SqlConnection(DataTestUtility.TCPConnectionString))
{
await BulkCopyDataAsync(cb, enableStraming);
await BulkCopyDataAsync(cb, enableStraming, rows);
await connection.OpenAsync();
await PrintJsonDataToFileAsync(connection);
CompareJsonFiles();
DeleteFile(_jsonFile);
await PrintJsonDataToFileAndCompareAsync(connection);
DeleteFile(_generatedJsonFile);
DeleteFile(_outputFile);
}
}
Expand Down
Loading