From 893433f9be7960330c9f5841383b23ed5b7247b7 Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Fri, 4 Sep 2020 13:36:26 -0700 Subject: [PATCH 01/21] initial commit --- .../IpcTests/Sql/DataFrameTests.cs | 4 +- .../Utils/VersionBasedFacts.cs | 13 +++- .../CommandExecutorTests.cs | 32 +++++++--- .../Command/CommandExecutor.cs | 1 - .../Command/SqlCommandExecutor.cs | 62 ++++++++++++++++--- src/csharp/Microsoft.Spark/Broadcast.cs | 1 + .../Microsoft.Spark/Microsoft.Spark.csproj | 2 +- src/csharp/Microsoft.Spark/Utils/UdfUtils.cs | 6 +- 8 files changed, 97 insertions(+), 24 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs index 19105442a..40ea4561e 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs @@ -290,7 +290,7 @@ public void TestDataFrameVectorUdf() } } - [Fact] + [SkipIfSparkVersionIsGreaterOrEqualTo(Versions.V3_0_0)] public void TestGroupedMapUdf() { DataFrame df = _spark @@ -369,7 +369,7 @@ private static RecordBatch ArrowBasedCountCharacters(RecordBatch records) } - [Fact] + [SkipIfSparkVersionIsGreaterOrEqualTo(Versions.V3_0_0)] public void TestDataFrameGroupedMapUdf() { DataFrame df = _spark diff --git a/src/csharp/Microsoft.Spark.E2ETest/Utils/VersionBasedFacts.cs b/src/csharp/Microsoft.Spark.E2ETest/Utils/VersionBasedFacts.cs index fa2be4a6b..1d59a84af 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/Utils/VersionBasedFacts.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/Utils/VersionBasedFacts.cs @@ -13,7 +13,18 @@ public SkipIfSparkVersionIsLessThan(string version) { if (SparkSettings.Version < new Version(version)) { - Skip = $"Ignore on Spark version ({SparkSettings.Version}) <= {version}"; + Skip = $"Ignore on Spark version ({SparkSettings.Version}) < {version}"; + } + } + } + + public sealed class SkipIfSparkVersionIsGreaterOrEqualTo : FactAttribute + { + public SkipIfSparkVersionIsGreaterOrEqualTo(string version) + { + if (SparkSettings.Version >= new Version(version)) + { + Skip = $"Ignore on Spark version ({SparkSettings.Version}) >= {version}"; } } } diff --git a/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs b/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs index 8978e321e..0a1970664 100644 --- a/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs +++ b/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs @@ -246,6 +246,8 @@ public async Task TestArrowSqlCommandExecutorWithSingleCommand() Commands = new[] { command } }; + IpcOptions ipcOptions = ArrowBasedCommandExecutor.ArrowIpcOptions; + using var inputStream = new MemoryStream(); using var outputStream = new MemoryStream(); int numRows = 10; @@ -254,7 +256,7 @@ public async Task TestArrowSqlCommandExecutorWithSingleCommand() Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -323,6 +325,8 @@ public async Task TestDataFrameSqlCommandExecutorWithSingleCommand() Commands = new[] { command } }; + IpcOptions ipcOptions = ArrowBasedCommandExecutor.ArrowIpcOptions; + using var inputStream = new MemoryStream(); using var outputStream = new MemoryStream(); int numRows = 10; @@ -331,7 +335,7 @@ public async Task TestDataFrameSqlCommandExecutorWithSingleCommand() Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -417,6 +421,8 @@ public async Task TestArrowSqlCommandExecutorWithMultiCommands() Commands = new[] { command1, command2 } }; + IpcOptions ipcOptions = ArrowBasedCommandExecutor.ArrowIpcOptions; + using var inputStream = new MemoryStream(); using var outputStream = new MemoryStream(); int numRows = 10; @@ -427,7 +433,7 @@ public async Task TestArrowSqlCommandExecutorWithMultiCommands() .Field(b => b.Name("arg2").DataType(Int32Type.Default)) .Field(b => b.Name("arg3").DataType(Int32Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -511,6 +517,8 @@ public async Task TestDataFrameSqlCommandExecutorWithMultiCommands() Commands = new[] { command1, command2 } }; + IpcOptions ipcOptions = ArrowBasedCommandExecutor.ArrowIpcOptions; + using var inputStream = new MemoryStream(); using var outputStream = new MemoryStream(); int numRows = 10; @@ -521,7 +529,7 @@ public async Task TestDataFrameSqlCommandExecutorWithMultiCommands() .Field(b => b.Name("arg2").DataType(Int32Type.Default)) .Field(b => b.Name("arg3").DataType(Int32Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -601,13 +609,15 @@ public void TestArrowSqlCommandExecutorWithEmptyInput() Commands = new[] { command } }; + IpcOptions ipcOptions = ArrowBasedCommandExecutor.ArrowIpcOptions; + using var inputStream = new MemoryStream(); using var outputStream = new MemoryStream(); // Write test data to the input stream. Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); // The .NET ArrowStreamWriter doesn't currently support writing just a // schema with no batches - but Java does. We use Reflection to simulate @@ -683,13 +693,15 @@ public void TestDataFrameSqlCommandExecutorWithEmptyInput() Commands = new[] { command } }; + IpcOptions ipcOptions = ArrowBasedCommandExecutor.ArrowIpcOptions; + using var inputStream = new MemoryStream(); using var outputStream = new MemoryStream(); // Write test data to the input stream. Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); // The .NET ArrowStreamWriter doesn't currently support writing just a // schema with no batches - but Java does. We use Reflection to simulate @@ -788,6 +800,8 @@ Int64Array ConvertInt64s(Int64Array int64s) Commands = new[] { command } }; + IpcOptions ipcOptions = ArrowBasedCommandExecutor.ArrowIpcOptions; + using var inputStream = new MemoryStream(); using var outputStream = new MemoryStream(); int numRows = 10; @@ -797,7 +811,7 @@ Int64Array ConvertInt64s(Int64Array int64s) .Field(b => b.Name("arg1").DataType(StringType.Default)) .Field(b => b.Name("arg2").DataType(Int64Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -891,6 +905,8 @@ ArrowStringDataFrameColumn ConvertStrings(ArrowStringDataFrameColumn strings) Commands = new[] { command } }; + IpcOptions ipcOptions = ArrowBasedCommandExecutor.ArrowIpcOptions; + using var inputStream = new MemoryStream(); using var outputStream = new MemoryStream(); int numRows = 10; @@ -900,7 +916,7 @@ ArrowStringDataFrameColumn ConvertStrings(ArrowStringDataFrameColumn strings) .Field(b => b.Name("arg1").DataType(StringType.Default)) .Field(b => b.Name("arg2").DataType(Int64Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, diff --git a/src/csharp/Microsoft.Spark.Worker/Command/CommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/CommandExecutor.cs index c981842f6..cdf520322 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/CommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/CommandExecutor.cs @@ -59,7 +59,6 @@ internal CommandExecutorStat Execute( outputStream, commandPayload.EvalType, commandPayload.Commands.Cast().ToArray()); - } } } diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index f6fffd31e..d5b2f7283 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -12,6 +12,7 @@ using Apache.Arrow.Ipc; using Apache.Arrow.Types; using Microsoft.Data.Analysis; +using Microsoft.Spark.Interop; using Microsoft.Spark.Interop.Ipc; using Microsoft.Spark.Sql; using Microsoft.Spark.Utils; @@ -298,6 +299,33 @@ public object Run(int splitId, object input) internal abstract class ArrowBasedCommandExecutor : SqlCommandExecutor { + private static IpcOptions s_arrowIpcOptions; + + internal static IpcOptions ArrowIpcOptions + { + get + { + if (s_arrowIpcOptions == null) + { + Version version = SparkEnvironment.SparkVersion; + + s_arrowIpcOptions = new IpcOptions + { + WriteLegacyIpcFormat = (version.Major, version.Minor) switch + { + (2, 3) => true, + (2, 4) => true, + (3, 0) => false, + _ => throw new NotSupportedException( + $"Spark {SparkEnvironment.SparkVersion} not supported.") + } + }; + } + + return s_arrowIpcOptions; + } + } + protected IEnumerable GetInputIterator(Stream inputStream) { using (var reader = new ArrowStreamReader(inputStream, leaveOpen: true)) @@ -387,7 +415,7 @@ private CommandExecutorStat ExecuteArrowSqlCommand( Debug.Assert(resultSchema == null); resultSchema = BuildSchema(results); - writer = new ArrowStreamWriter(outputStream, resultSchema, leaveOpen: true); + writer = new ArrowStreamWriter(outputStream, resultSchema, true, ArrowIpcOptions); } var recordBatch = new RecordBatch(resultSchema, results, numEntries); @@ -396,7 +424,7 @@ private CommandExecutorStat ExecuteArrowSqlCommand( writer.WriteRecordBatchAsync(recordBatch).GetAwaiter().GetResult(); } - SerDe.Write(outputStream, 0); + writer.WriteEndAsync().GetAwaiter().GetResult(); if (writer != null) { @@ -437,7 +465,7 @@ private CommandExecutorStat ExecuteDataFrameSqlCommand( if (writer == null) { - writer = new ArrowStreamWriter(outputStream, result.Schema, leaveOpen: true); + writer = new ArrowStreamWriter(outputStream, result.Schema, true, ArrowIpcOptions); } // TODO: Remove sync-over-async once WriteRecordBatch exists. @@ -445,7 +473,7 @@ private CommandExecutorStat ExecuteDataFrameSqlCommand( } } - SerDe.Write(outputStream, 0); + writer.WriteEndAsync().GetAwaiter().GetResult(); if (writer != null) { @@ -677,6 +705,19 @@ public DataFrameColumn[] Run(ReadOnlyMemory input) internal class ArrowOrDataFrameGroupedMapCommandExecutor : ArrowOrDataFrameSqlCommandExecutor { + private readonly Lazy> _transformRecord = + new Lazy>(() => + { + Version version = SparkEnvironment.SparkVersion; + return (version.Major, version.Minor) switch + { + (2, 3) => (RecordBatch r) => r, + (2, 4) => (RecordBatch r) => r, + _ => throw new NotSupportedException( + $"Spark {SparkEnvironment.SparkVersion} not supported.") + }; + }); + protected internal override CommandExecutorStat ExecuteCore( Stream inputStream, Stream outputStream, @@ -726,21 +767,21 @@ private CommandExecutorStat ExecuteArrowGroupedMapCommand( ArrowStreamWriter writer = null; foreach (RecordBatch input in GetInputIterator(inputStream)) { - RecordBatch result = worker.Func(input); + RecordBatch result = _transformRecord.Value(worker.Func(input)); int numEntries = result.Length; stat.NumEntriesProcessed += numEntries; if (writer == null) { - writer = new ArrowStreamWriter(outputStream, result.Schema, leaveOpen: true); + writer = new ArrowStreamWriter(outputStream, result.Schema, true, ArrowIpcOptions); } // TODO: Remove sync-over-async once WriteRecordBatch exists. writer.WriteRecordBatchAsync(result).GetAwaiter().GetResult(); } - SerDe.Write(outputStream, 0); + writer.WriteEndAsync().GetAwaiter().GetResult(); if (writer != null) { @@ -770,13 +811,14 @@ private CommandExecutorStat ExecuteDataFrameGroupedMapCommand( FxDataFrame resultDataFrame = worker.Func(dataFrame); IEnumerable recordBatches = resultDataFrame.ToArrowRecordBatches(); - foreach (RecordBatch result in recordBatches) + foreach (RecordBatch record in recordBatches) { + RecordBatch result = _transformRecord.Value(record); stat.NumEntriesProcessed += result.Length; if (writer == null) { - writer = new ArrowStreamWriter(outputStream, result.Schema, leaveOpen: true); + writer = new ArrowStreamWriter(outputStream, result.Schema, true, ArrowIpcOptions); } // TODO: Remove sync-over-async once WriteRecordBatch exists. @@ -784,7 +826,7 @@ private CommandExecutorStat ExecuteDataFrameGroupedMapCommand( } } - SerDe.Write(outputStream, 0); + writer.WriteEndAsync().GetAwaiter().GetResult(); if (writer != null) { diff --git a/src/csharp/Microsoft.Spark/Broadcast.cs b/src/csharp/Microsoft.Spark/Broadcast.cs index 2791ec546..99025b7c2 100644 --- a/src/csharp/Microsoft.Spark/Broadcast.cs +++ b/src/csharp/Microsoft.Spark/Broadcast.cs @@ -128,6 +128,7 @@ private JvmObjectReference CreateBroadcast(SparkContext sc, T value) CreateBroadcast_V2_3_1_AndBelow(javaSparkContext, value), (2, 3) => CreateBroadcast_V2_3_2_AndAbove(javaSparkContext, sc, value), (2, 4) => CreateBroadcast_V2_3_2_AndAbove(javaSparkContext, sc, value), + (3, 0) => CreateBroadcast_V2_3_2_AndAbove(javaSparkContext, sc, value), _ => throw new NotSupportedException($"Spark {version} not supported.") }; } diff --git a/src/csharp/Microsoft.Spark/Microsoft.Spark.csproj b/src/csharp/Microsoft.Spark/Microsoft.Spark.csproj index 2cddc5627..0205701f2 100644 --- a/src/csharp/Microsoft.Spark/Microsoft.Spark.csproj +++ b/src/csharp/Microsoft.Spark/Microsoft.Spark.csproj @@ -29,7 +29,7 @@ - + diff --git a/src/csharp/Microsoft.Spark/Utils/UdfUtils.cs b/src/csharp/Microsoft.Spark/Utils/UdfUtils.cs index ccb5e5209..66c96a2a2 100644 --- a/src/csharp/Microsoft.Spark/Utils/UdfUtils.cs +++ b/src/csharp/Microsoft.Spark/Utils/UdfUtils.cs @@ -72,7 +72,11 @@ internal enum PythonEvalType SQL_SCALAR_PANDAS_UDF = 200, SQL_GROUPED_MAP_PANDAS_UDF = 201, SQL_GROUPED_AGG_PANDAS_UDF = 202, - SQL_WINDOW_AGG_PANDAS_UDF = 203 + SQL_WINDOW_AGG_PANDAS_UDF = 203, + + SQL_SCALAR_PANDAS_ITER_UDF = 204, + SQL_MAP_PANDAS_ITER_UDF = 205, + SQL_COGROUPED_MAP_PANDAS_UDF = 206 } /// From d1d8b8c26ebdd2d6be87b88b63940ff48d323f32 Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Fri, 4 Sep 2020 23:06:53 -0700 Subject: [PATCH 02/21] fix tests --- .../CommandExecutorTests.cs | 69 ++++++++++++------- .../Command/SqlCommandExecutor.cs | 48 +++++++++---- 2 files changed, 79 insertions(+), 38 deletions(-) diff --git a/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs b/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs index 0a1970664..e649a7199 100644 --- a/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs +++ b/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs @@ -25,6 +25,19 @@ namespace Microsoft.Spark.Worker.UnitTest { public class CommandExecutorTests { + IpcOptions _ipcOptions; + + public CommandExecutorTests() + { + _ipcOptions = new IpcOptions + { + WriteLegacyIpcFormat = false + }; + + ArrowBasedCommandExecutor.ArrowIpcOptions = _ipcOptions; + ArrowOrDataFrameGroupedMapCommandExecutor.RecordBatchFunc = (RecordBatch r) => r; + } + [Fact] public void TestPicklingSqlCommandExecutorWithSingleCommand() { @@ -246,8 +259,6 @@ public async Task TestArrowSqlCommandExecutorWithSingleCommand() Commands = new[] { command } }; - IpcOptions ipcOptions = ArrowBasedCommandExecutor.ArrowIpcOptions; - using var inputStream = new MemoryStream(); using var outputStream = new MemoryStream(); int numRows = 10; @@ -256,7 +267,7 @@ public async Task TestArrowSqlCommandExecutorWithSingleCommand() Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, _ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -297,6 +308,9 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal($"udf: {i}", array.GetString(i)); } + int continuationToken = SerDe.ReadInt32(outputStream); + Assert.Equal(-1, continuationToken); + int end = SerDe.ReadInt32(outputStream); Assert.Equal(0, end); @@ -325,8 +339,6 @@ public async Task TestDataFrameSqlCommandExecutorWithSingleCommand() Commands = new[] { command } }; - IpcOptions ipcOptions = ArrowBasedCommandExecutor.ArrowIpcOptions; - using var inputStream = new MemoryStream(); using var outputStream = new MemoryStream(); int numRows = 10; @@ -335,7 +347,7 @@ public async Task TestDataFrameSqlCommandExecutorWithSingleCommand() Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, _ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -376,6 +388,9 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal($"udf: {i}", array.GetString(i)); } + int continuationToken = SerDe.ReadInt32(outputStream); + Assert.Equal(-1, continuationToken); + int end = SerDe.ReadInt32(outputStream); Assert.Equal(0, end); @@ -421,8 +436,6 @@ public async Task TestArrowSqlCommandExecutorWithMultiCommands() Commands = new[] { command1, command2 } }; - IpcOptions ipcOptions = ArrowBasedCommandExecutor.ArrowIpcOptions; - using var inputStream = new MemoryStream(); using var outputStream = new MemoryStream(); int numRows = 10; @@ -433,7 +446,7 @@ public async Task TestArrowSqlCommandExecutorWithMultiCommands() .Field(b => b.Name("arg2").DataType(Int32Type.Default)) .Field(b => b.Name("arg3").DataType(Int32Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, _ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -477,6 +490,9 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(i * i, array2.Values[i]); } + int continuationToken = SerDe.ReadInt32(outputStream); + Assert.Equal(-1, continuationToken); + int end = SerDe.ReadInt32(outputStream); Assert.Equal(0, end); @@ -517,8 +533,6 @@ public async Task TestDataFrameSqlCommandExecutorWithMultiCommands() Commands = new[] { command1, command2 } }; - IpcOptions ipcOptions = ArrowBasedCommandExecutor.ArrowIpcOptions; - using var inputStream = new MemoryStream(); using var outputStream = new MemoryStream(); int numRows = 10; @@ -529,7 +543,7 @@ public async Task TestDataFrameSqlCommandExecutorWithMultiCommands() .Field(b => b.Name("arg2").DataType(Int32Type.Default)) .Field(b => b.Name("arg3").DataType(Int32Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, _ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -573,6 +587,9 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(i * i, array2.Values[i]); } + int continuationToken = SerDe.ReadInt32(outputStream); + Assert.Equal(-1, continuationToken); + int end = SerDe.ReadInt32(outputStream); Assert.Equal(0, end); @@ -609,15 +626,13 @@ public void TestArrowSqlCommandExecutorWithEmptyInput() Commands = new[] { command } }; - IpcOptions ipcOptions = ArrowBasedCommandExecutor.ArrowIpcOptions; - using var inputStream = new MemoryStream(); using var outputStream = new MemoryStream(); // Write test data to the input stream. Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, _ipcOptions); // The .NET ArrowStreamWriter doesn't currently support writing just a // schema with no batches - but Java does. We use Reflection to simulate @@ -660,6 +675,9 @@ public void TestArrowSqlCommandExecutorWithEmptyInput() var array = (StringArray)outputBatch.Arrays.ElementAt(0); Assert.Equal(0, array.Length); + int continuationToken = SerDe.ReadInt32(outputStream); + Assert.Equal(-1, continuationToken); + int end = SerDe.ReadInt32(outputStream); Assert.Equal(0, end); @@ -693,15 +711,13 @@ public void TestDataFrameSqlCommandExecutorWithEmptyInput() Commands = new[] { command } }; - IpcOptions ipcOptions = ArrowBasedCommandExecutor.ArrowIpcOptions; - using var inputStream = new MemoryStream(); using var outputStream = new MemoryStream(); // Write test data to the input stream. Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, _ipcOptions); // The .NET ArrowStreamWriter doesn't currently support writing just a // schema with no batches - but Java does. We use Reflection to simulate @@ -744,6 +760,9 @@ public void TestDataFrameSqlCommandExecutorWithEmptyInput() var array = (StringArray)outputBatch.Arrays.ElementAt(0); Assert.Equal(0, array.Length); + int continuationToken = SerDe.ReadInt32(outputStream); + Assert.Equal(-1, continuationToken); + int end = SerDe.ReadInt32(outputStream); Assert.Equal(0, end); @@ -800,8 +819,6 @@ Int64Array ConvertInt64s(Int64Array int64s) Commands = new[] { command } }; - IpcOptions ipcOptions = ArrowBasedCommandExecutor.ArrowIpcOptions; - using var inputStream = new MemoryStream(); using var outputStream = new MemoryStream(); int numRows = 10; @@ -811,7 +828,7 @@ Int64Array ConvertInt64s(Int64Array int64s) .Field(b => b.Name("arg1").DataType(StringType.Default)) .Field(b => b.Name("arg2").DataType(Int64Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, _ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -862,6 +879,9 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(100 + i, longArray.Values[i]); } + int continuationToken = SerDe.ReadInt32(outputStream); ; + Assert.Equal(-1, continuationToken); + int end = SerDe.ReadInt32(outputStream); Assert.Equal(0, end); @@ -905,8 +925,6 @@ ArrowStringDataFrameColumn ConvertStrings(ArrowStringDataFrameColumn strings) Commands = new[] { command } }; - IpcOptions ipcOptions = ArrowBasedCommandExecutor.ArrowIpcOptions; - using var inputStream = new MemoryStream(); using var outputStream = new MemoryStream(); int numRows = 10; @@ -916,7 +934,7 @@ ArrowStringDataFrameColumn ConvertStrings(ArrowStringDataFrameColumn strings) .Field(b => b.Name("arg1").DataType(StringType.Default)) .Field(b => b.Name("arg2").DataType(Int64Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, _ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -967,6 +985,9 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(100 + i, doubleArray.Values[i]); } + int continuationToken = SerDe.ReadInt32(outputStream); + Assert.Equal(-1, continuationToken); + int end = SerDe.ReadInt32(outputStream); Assert.Equal(0, end); diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index d5b2f7283..c8e2c6f17 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -313,8 +313,7 @@ internal static IpcOptions ArrowIpcOptions { WriteLegacyIpcFormat = (version.Major, version.Minor) switch { - (2, 3) => true, - (2, 4) => true, + (2, _) => true, (3, 0) => false, _ => throw new NotSupportedException( $"Spark {SparkEnvironment.SparkVersion} not supported.") @@ -324,6 +323,11 @@ internal static IpcOptions ArrowIpcOptions return s_arrowIpcOptions; } + set + { + // For Tests + s_arrowIpcOptions = value; + } } protected IEnumerable GetInputIterator(Stream inputStream) @@ -705,18 +709,34 @@ public DataFrameColumn[] Run(ReadOnlyMemory input) internal class ArrowOrDataFrameGroupedMapCommandExecutor : ArrowOrDataFrameSqlCommandExecutor { - private readonly Lazy> _transformRecord = - new Lazy>(() => + private static Func s_recordBatchFunc; + + // Transforms the RecordBatch to something that is compatible with the + // current version of Spark. + internal static Func RecordBatchFunc + { + get { - Version version = SparkEnvironment.SparkVersion; - return (version.Major, version.Minor) switch + if (s_recordBatchFunc == null) { - (2, 3) => (RecordBatch r) => r, - (2, 4) => (RecordBatch r) => r, - _ => throw new NotSupportedException( - $"Spark {SparkEnvironment.SparkVersion} not supported.") - }; - }); + Version version = SparkEnvironment.SparkVersion; + + s_recordBatchFunc = (version.Major, version.Minor) switch + { + (2, _) => (RecordBatch r) => r, + _ => throw new NotSupportedException( + $"Spark {SparkEnvironment.SparkVersion} not supported.") + }; + } + + return s_recordBatchFunc; + } + set + { + // For Tests + s_recordBatchFunc = value; + } + } protected internal override CommandExecutorStat ExecuteCore( Stream inputStream, @@ -767,7 +787,7 @@ private CommandExecutorStat ExecuteArrowGroupedMapCommand( ArrowStreamWriter writer = null; foreach (RecordBatch input in GetInputIterator(inputStream)) { - RecordBatch result = _transformRecord.Value(worker.Func(input)); + RecordBatch result = RecordBatchFunc(worker.Func(input)); int numEntries = result.Length; stat.NumEntriesProcessed += numEntries; @@ -813,7 +833,7 @@ private CommandExecutorStat ExecuteDataFrameGroupedMapCommand( foreach (RecordBatch record in recordBatches) { - RecordBatch result = _transformRecord.Value(record); + RecordBatch result = RecordBatchFunc(record); stat.NumEntriesProcessed += result.Length; if (writer == null) From 57898462c06bbb606b43e548bb6b5cb597cfca5f Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Sat, 5 Sep 2020 11:32:25 -0700 Subject: [PATCH 03/21] trigger test. --- .../Command/SqlCommandExecutor.cs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index c8e2c6f17..6ddb05787 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -307,14 +307,12 @@ internal static IpcOptions ArrowIpcOptions { if (s_arrowIpcOptions == null) { - Version version = SparkEnvironment.SparkVersion; - s_arrowIpcOptions = new IpcOptions { - WriteLegacyIpcFormat = (version.Major, version.Minor) switch + WriteLegacyIpcFormat = SparkEnvironment.SparkVersion.Major switch { - (2, _) => true, - (3, 0) => false, + 2 => true, + 3 => false, _ => throw new NotSupportedException( $"Spark {SparkEnvironment.SparkVersion} not supported.") } @@ -719,11 +717,9 @@ internal static Func RecordBatchFunc { if (s_recordBatchFunc == null) { - Version version = SparkEnvironment.SparkVersion; - - s_recordBatchFunc = (version.Major, version.Minor) switch + s_recordBatchFunc = SparkEnvironment.SparkVersion.Major switch { - (2, _) => (RecordBatch r) => r, + 2 => (RecordBatch r) => r, _ => throw new NotSupportedException( $"Spark {SparkEnvironment.SparkVersion} not supported.") }; From 1ac30ef7393f43f4b21e7a5426ddeb72fb667acf Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Sun, 6 Sep 2020 02:27:52 -0700 Subject: [PATCH 04/21] fix tests. --- .../Command/SqlCommandExecutor.cs | 33 +++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index 6ddb05787..0e8d86bfd 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -426,12 +426,12 @@ private CommandExecutorStat ExecuteArrowSqlCommand( writer.WriteRecordBatchAsync(recordBatch).GetAwaiter().GetResult(); } - writer.WriteEndAsync().GetAwaiter().GetResult(); - - if (writer != null) + if (!ArrowIpcOptions.WriteLegacyIpcFormat) { - writer.Dispose(); + SerDe.Write(outputStream, -1); } + SerDe.Write(outputStream, 0); + writer?.Dispose(); return stat; } @@ -475,12 +475,12 @@ private CommandExecutorStat ExecuteDataFrameSqlCommand( } } - writer.WriteEndAsync().GetAwaiter().GetResult(); - - if (writer != null) + if (!ArrowIpcOptions.WriteLegacyIpcFormat) { - writer.Dispose(); + SerDe.Write(outputStream, -1); } + SerDe.Write(outputStream, 0); + writer?.Dispose(); return stat; } @@ -733,7 +733,6 @@ internal static Func RecordBatchFunc s_recordBatchFunc = value; } } - protected internal override CommandExecutorStat ExecuteCore( Stream inputStream, Stream outputStream, @@ -797,12 +796,12 @@ private CommandExecutorStat ExecuteArrowGroupedMapCommand( writer.WriteRecordBatchAsync(result).GetAwaiter().GetResult(); } - writer.WriteEndAsync().GetAwaiter().GetResult(); - - if (writer != null) + if (!ArrowIpcOptions.WriteLegacyIpcFormat) { - writer.Dispose(); + SerDe.Write(outputStream, -1); } + SerDe.Write(outputStream, 0); + writer?.Dispose(); return stat; } @@ -842,12 +841,12 @@ private CommandExecutorStat ExecuteDataFrameGroupedMapCommand( } } - writer.WriteEndAsync().GetAwaiter().GetResult(); - - if (writer != null) + if (!ArrowIpcOptions.WriteLegacyIpcFormat) { - writer.Dispose(); + SerDe.Write(outputStream, -1); } + SerDe.Write(outputStream, 0); + writer?.Dispose(); return stat; } From 1ec33f32371c0b265d2b69caedfda56a25eb530d Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Sun, 6 Sep 2020 14:27:08 -0700 Subject: [PATCH 05/21] mitigate xUnit testing deadlock. --- .../IpcTests/Sql/DataFrameTests.cs | 1 + .../Command/CommandExecutor.cs | 2 +- .../Command/SqlCommandExecutor.cs | 77 +++++++------------ .../Microsoft.Spark/Interop/Ipc/SerDe.cs | 22 ++++++ 4 files changed, 53 insertions(+), 49 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs index 1b3493371..6352666f2 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs @@ -19,6 +19,7 @@ using FxDataFrame = Microsoft.Data.Analysis.DataFrame; using Int32Type = Apache.Arrow.Types.Int32Type; +[assembly: CollectionBehavior(DisableTestParallelization = true)] namespace Microsoft.Spark.E2ETest.IpcTests { [Collection("Spark E2E Tests")] diff --git a/src/csharp/Microsoft.Spark.Worker/Command/CommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/CommandExecutor.cs index cdf520322..a63177ab5 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/CommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/CommandExecutor.cs @@ -58,7 +58,7 @@ internal CommandExecutorStat Execute( inputStream, outputStream, commandPayload.EvalType, - commandPayload.Commands.Cast().ToArray()); + commandPayload.Commands.Cast().ToArray()).Result; } } } diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index 0e8d86bfd..54ba013a6 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -8,6 +8,7 @@ using System.Diagnostics; using System.IO; using System.Linq; +using System.Threading.Tasks; using Apache.Arrow; using Apache.Arrow.Ipc; using Apache.Arrow.Types; @@ -36,7 +37,7 @@ internal abstract class SqlCommandExecutor /// Evaluation type for the current commands /// Contains the commands to execute /// Statistics captured during the Execute() run - internal static CommandExecutorStat Execute( + internal static async Task Execute( Stream inputStream, Stream outputStream, UdfUtils.PythonEvalType evalType, @@ -72,10 +73,10 @@ internal static CommandExecutorStat Execute( throw new NotSupportedException($"{evalType} is not supported."); } - return executor.ExecuteCore(inputStream, outputStream, commands); + return await executor.ExecuteCore(inputStream, outputStream, commands); } - protected internal abstract CommandExecutorStat ExecuteCore( + protected internal abstract Task ExecuteCore( Stream inputStream, Stream outputStream, SqlCommand[] commands); @@ -93,7 +94,7 @@ internal class PicklingSqlCommandExecutor : SqlCommandExecutor [ThreadStatic] private static byte[] s_outputBuffer; - protected internal override CommandExecutorStat ExecuteCore( + protected internal override async Task ExecuteCore( Stream inputStream, Stream outputStream, SqlCommand[] commands) @@ -154,7 +155,7 @@ protected internal override CommandExecutorStat ExecuteCore( // The initial (estimated) buffer size for pickling rows is set to the size of // input pickled rows because the number of rows are the same for both input // and output. - WriteOutput(outputStream, outputRows, messageLength); + await WriteOutputAsync(outputStream, outputRows, messageLength); stat.NumEntriesProcessed += inputRows.Length; outputRows.Clear(); } @@ -164,7 +165,7 @@ protected internal override CommandExecutorStat ExecuteCore( } /// - /// Writes the given message to the stream. + /// Asynchronously writes the given message to the stream. /// /// Stream to write to /// Rows to write to @@ -172,12 +173,12 @@ protected internal override CommandExecutorStat ExecuteCore( /// Estimated max size of the serialized output. /// If it's not big enough, pickler increases the buffer. /// - private void WriteOutput(Stream stream, IEnumerable rows, int sizeHint) + private async Task WriteOutputAsync(Stream stream, IEnumerable rows, int sizeHint) { if (s_outputBuffer == null) s_outputBuffer = new byte[sizeHint]; - Pickler pickler = s_pickler ?? (s_pickler = new Pickler(false)); + Pickler pickler = s_pickler ??= new Pickler(false); pickler.dumps(rows, ref s_outputBuffer, out int bytesWritten); if (bytesWritten <= 0) @@ -185,8 +186,8 @@ private void WriteOutput(Stream stream, IEnumerable rows, int sizeHint) throw new Exception($"Serialized output size must be positive. Was {bytesWritten}."); } - SerDe.Write(stream, bytesWritten); - SerDe.Write(stream, s_outputBuffer, bytesWritten); + await SerDe.WriteAsync(stream, bytesWritten); + await SerDe.WriteAsync(stream, s_outputBuffer, bytesWritten); } /// @@ -361,7 +362,7 @@ protected IEnumerable GetInputIterator(Stream inputStream) internal class ArrowOrDataFrameSqlCommandExecutor : ArrowBasedCommandExecutor { - protected internal override CommandExecutorStat ExecuteCore( + protected internal override async Task ExecuteCore( Stream inputStream, Stream outputStream, SqlCommand[] commands) @@ -387,12 +388,12 @@ protected internal override CommandExecutorStat ExecuteCore( } if (useDataFrameCommandExecutor) { - return ExecuteDataFrameSqlCommand(inputStream, outputStream, commands); + return await ExecuteDataFrameSqlCommand(inputStream, outputStream, commands); } - return ExecuteArrowSqlCommand(inputStream, outputStream, commands); + return await ExecuteArrowSqlCommand(inputStream, outputStream, commands); } - private CommandExecutorStat ExecuteArrowSqlCommand( + private async Task ExecuteArrowSqlCommand( Stream inputStream, Stream outputStream, SqlCommand[] commands) @@ -422,21 +423,16 @@ private CommandExecutorStat ExecuteArrowSqlCommand( var recordBatch = new RecordBatch(resultSchema, results, numEntries); - // TODO: Remove sync-over-async once WriteRecordBatch exists. - writer.WriteRecordBatchAsync(recordBatch).GetAwaiter().GetResult(); + await writer.WriteRecordBatchAsync(recordBatch); } - if (!ArrowIpcOptions.WriteLegacyIpcFormat) - { - SerDe.Write(outputStream, -1); - } - SerDe.Write(outputStream, 0); + await writer.WriteEndAsync(); writer?.Dispose(); return stat; } - private CommandExecutorStat ExecuteDataFrameSqlCommand( + private async Task ExecuteDataFrameSqlCommand( Stream inputStream, Stream outputStream, SqlCommand[] commands) @@ -470,16 +466,11 @@ private CommandExecutorStat ExecuteDataFrameSqlCommand( writer = new ArrowStreamWriter(outputStream, result.Schema, true, ArrowIpcOptions); } - // TODO: Remove sync-over-async once WriteRecordBatch exists. - writer.WriteRecordBatchAsync(result).GetAwaiter().GetResult(); + await writer.WriteRecordBatchAsync(result); } } - if (!ArrowIpcOptions.WriteLegacyIpcFormat) - { - SerDe.Write(outputStream, -1); - } - SerDe.Write(outputStream, 0); + await writer.WriteEndAsync(); writer?.Dispose(); return stat; @@ -733,7 +724,7 @@ internal static Func RecordBatchFunc s_recordBatchFunc = value; } } - protected internal override CommandExecutorStat ExecuteCore( + protected internal override async Task ExecuteCore( Stream inputStream, Stream outputStream, SqlCommand[] commands) @@ -761,12 +752,12 @@ protected internal override CommandExecutorStat ExecuteCore( } if (useDataFrameGroupedMapCommandExecutor) { - return ExecuteDataFrameGroupedMapCommand(inputStream, outputStream, commands); + return await ExecuteDataFrameGroupedMapCommand(inputStream, outputStream, commands); } - return ExecuteArrowGroupedMapCommand(inputStream, outputStream, commands); + return await ExecuteArrowGroupedMapCommand(inputStream, outputStream, commands); } - private CommandExecutorStat ExecuteArrowGroupedMapCommand( + private async Task ExecuteArrowGroupedMapCommand( Stream inputStream, Stream outputStream, SqlCommand[] commands) @@ -792,21 +783,16 @@ private CommandExecutorStat ExecuteArrowGroupedMapCommand( writer = new ArrowStreamWriter(outputStream, result.Schema, true, ArrowIpcOptions); } - // TODO: Remove sync-over-async once WriteRecordBatch exists. - writer.WriteRecordBatchAsync(result).GetAwaiter().GetResult(); + await writer.WriteRecordBatchAsync(result); } - if (!ArrowIpcOptions.WriteLegacyIpcFormat) - { - SerDe.Write(outputStream, -1); - } - SerDe.Write(outputStream, 0); + await writer.WriteEndAsync(); writer?.Dispose(); return stat; } - private CommandExecutorStat ExecuteDataFrameGroupedMapCommand( + private async Task ExecuteDataFrameGroupedMapCommand( Stream inputStream, Stream outputStream, SqlCommand[] commands) @@ -836,16 +822,11 @@ private CommandExecutorStat ExecuteDataFrameGroupedMapCommand( writer = new ArrowStreamWriter(outputStream, result.Schema, true, ArrowIpcOptions); } - // TODO: Remove sync-over-async once WriteRecordBatch exists. - writer.WriteRecordBatchAsync(result).GetAwaiter().GetResult(); + await writer.WriteRecordBatchAsync(result); } } - if (!ArrowIpcOptions.WriteLegacyIpcFormat) - { - SerDe.Write(outputStream, -1); - } - SerDe.Write(outputStream, 0); + await writer.WriteEndAsync(); writer?.Dispose(); return stat; diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs index c2c742e87..8dde21ff2 100644 --- a/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs @@ -6,6 +6,7 @@ using System.Buffers.Binary; using System.IO; using System.Text; +using System.Threading.Tasks; namespace Microsoft.Spark.Interop.Ipc { @@ -282,6 +283,15 @@ public static void Write(Stream s, byte[] value) => public static void Write(Stream s, byte[] value, int count) => s.Write(value, 0, count); + /// + /// Asynchronously writes a byte array to a stream + /// + /// The stream to write + /// The byte array to write + /// The number of bytes in the array to write. + public static async Task WriteAsync(Stream s, byte[] value, int count) => + await s.WriteAsync(value, 0, count); + /// /// Writes a boolean to a stream /// @@ -302,6 +312,18 @@ public static void Write(Stream s, int value) Write(s, buffer, sizeof(int)); } + /// + /// Asynchronously writes an integer to a stream (big-endian). + /// + /// The stream to write + /// The integer to write + public static async Task WriteAsync(Stream s, int value) + { + byte[] buffer = GetThreadLocalBuffer(sizeof(int)); + BinaryPrimitives.WriteInt32BigEndian(buffer, value); + await WriteAsync(s, buffer, sizeof(int)); + } + /// /// Writes a long integer to a stream (big-endian). /// From 37c37602a6af45bf18c35271df045c15a7f7b948 Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Mon, 7 Sep 2020 02:17:50 -0700 Subject: [PATCH 06/21] pass spark version to executor. --- .../IpcTests/Sql/DataFrameTests.cs | 1 - .../CommandExecutorTests.cs | 33 ++-- .../Command/CommandExecutor.cs | 11 +- .../Command/SqlCommandExecutor.cs | 147 ++++++++---------- .../Microsoft.Spark.Worker/TaskRunner.cs | 2 +- .../Microsoft.Spark/Interop/Ipc/SerDe.cs | 22 --- 6 files changed, 89 insertions(+), 127 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs index 6352666f2..1b3493371 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs @@ -19,7 +19,6 @@ using FxDataFrame = Microsoft.Data.Analysis.DataFrame; using Int32Type = Apache.Arrow.Types.Int32Type; -[assembly: CollectionBehavior(DisableTestParallelization = true)] namespace Microsoft.Spark.E2ETest.IpcTests { [Collection("Spark E2E Tests")] diff --git a/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs b/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs index e649a7199..450835035 100644 --- a/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs +++ b/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System; using System.Collections; using System.Collections.Generic; using System.IO; @@ -25,17 +26,17 @@ namespace Microsoft.Spark.Worker.UnitTest { public class CommandExecutorTests { - IpcOptions _ipcOptions; + private readonly Version _version; + private readonly IpcOptions _ipcOptions; public CommandExecutorTests() { + // Spark 3.x and tests use ArrowStreamReader 0.15.1+ + _version = new Version(Versions.V3_0_0); _ipcOptions = new IpcOptions { WriteLegacyIpcFormat = false }; - - ArrowBasedCommandExecutor.ArrowIpcOptions = _ipcOptions; - ArrowOrDataFrameGroupedMapCommandExecutor.RecordBatchFunc = (RecordBatch r) => r; } [Fact] @@ -74,7 +75,7 @@ public void TestPicklingSqlCommandExecutorWithSingleCommand() SerDe.Write(inputStream, (int)SpecialLengths.END_OF_DATA_SECTION); inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(_version).Execute( inputStream, outputStream, 0, @@ -158,7 +159,7 @@ public void TestPicklingSqlCommandExecutorWithMultiCommands() SerDe.Write(inputStream, (int)SpecialLengths.END_OF_DATA_SECTION); inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(_version).Execute( inputStream, outputStream, 0, @@ -221,7 +222,7 @@ public void TestPicklingSqlCommandExecutorWithEmptyInput() SerDe.Write(inputStream, (int)SpecialLengths.END_OF_DATA_SECTION); inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(_version).Execute( inputStream, outputStream, 0, @@ -282,7 +283,7 @@ await arrowWriter.WriteRecordBatchAsync( inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(_version).Execute( inputStream, outputStream, 0, @@ -362,7 +363,7 @@ await arrowWriter.WriteRecordBatchAsync( inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(_version).Execute( inputStream, outputStream, 0, @@ -463,7 +464,7 @@ await arrowWriter.WriteRecordBatchAsync( inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(_version).Execute( inputStream, outputStream, 0, @@ -560,7 +561,7 @@ await arrowWriter.WriteRecordBatchAsync( inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(_version).Execute( inputStream, outputStream, 0, @@ -649,7 +650,7 @@ public void TestArrowSqlCommandExecutorWithEmptyInput() inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(_version).Execute( inputStream, outputStream, 0, @@ -734,7 +735,7 @@ public void TestDataFrameSqlCommandExecutorWithEmptyInput() inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(_version).Execute( inputStream, outputStream, 0, @@ -847,7 +848,7 @@ await arrowWriter.WriteRecordBatchAsync( inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(_version).Execute( inputStream, outputStream, 0, @@ -953,7 +954,7 @@ await arrowWriter.WriteRecordBatchAsync( inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(_version).Execute( inputStream, outputStream, 0, @@ -1039,7 +1040,7 @@ public void TestRDDCommandExecutor() inputStream.Seek(0, SeekOrigin.Begin); // Execute the command. - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(_version).Execute( inputStream, outputStream, 0, diff --git a/src/csharp/Microsoft.Spark.Worker/Command/CommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/CommandExecutor.cs index a63177ab5..cc0f2bee3 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/CommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/CommandExecutor.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System; using System.IO; using System.Linq; @@ -24,6 +25,13 @@ internal sealed class CommandExecutorStat /// internal sealed class CommandExecutor { + private readonly Version _version; + + internal CommandExecutor(Version version) + { + _version = version; + } + /// /// Executes the commands on the input data read from input stream /// and writes results to the output stream. @@ -55,10 +63,11 @@ internal CommandExecutorStat Execute( } return SqlCommandExecutor.Execute( + _version, inputStream, outputStream, commandPayload.EvalType, - commandPayload.Commands.Cast().ToArray()).Result; + commandPayload.Commands.Cast().ToArray()); } } } diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index 54ba013a6..01cfcdb54 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -8,7 +8,6 @@ using System.Diagnostics; using System.IO; using System.Linq; -using System.Threading.Tasks; using Apache.Arrow; using Apache.Arrow.Ipc; using Apache.Arrow.Types; @@ -32,12 +31,14 @@ internal abstract class SqlCommandExecutor /// Executes the commands on the input data read from input stream /// and writes results to the output stream. /// + /// Spark version. /// Input stream to read data from /// Output stream to write results to /// Evaluation type for the current commands /// Contains the commands to execute /// Statistics captured during the Execute() run - internal static async Task Execute( + internal static CommandExecutorStat Execute( + Version version, Stream inputStream, Stream outputStream, UdfUtils.PythonEvalType evalType, @@ -62,21 +63,21 @@ internal static async Task Execute( } else if (evalType == UdfUtils.PythonEvalType.SQL_SCALAR_PANDAS_UDF) { - executor = new ArrowOrDataFrameSqlCommandExecutor(); + executor = new ArrowOrDataFrameSqlCommandExecutor(version); } else if (evalType == UdfUtils.PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF) { - executor = new ArrowOrDataFrameGroupedMapCommandExecutor(); + executor = new ArrowOrDataFrameGroupedMapCommandExecutor(version); } else { throw new NotSupportedException($"{evalType} is not supported."); } - return await executor.ExecuteCore(inputStream, outputStream, commands); + return executor.ExecuteCore(inputStream, outputStream, commands); } - protected internal abstract Task ExecuteCore( + protected internal abstract CommandExecutorStat ExecuteCore( Stream inputStream, Stream outputStream, SqlCommand[] commands); @@ -94,7 +95,7 @@ internal class PicklingSqlCommandExecutor : SqlCommandExecutor [ThreadStatic] private static byte[] s_outputBuffer; - protected internal override async Task ExecuteCore( + protected internal override CommandExecutorStat ExecuteCore( Stream inputStream, Stream outputStream, SqlCommand[] commands) @@ -155,7 +156,7 @@ protected internal override async Task ExecuteCore( // The initial (estimated) buffer size for pickling rows is set to the size of // input pickled rows because the number of rows are the same for both input // and output. - await WriteOutputAsync(outputStream, outputRows, messageLength); + WriteOutput(outputStream, outputRows, messageLength); stat.NumEntriesProcessed += inputRows.Length; outputRows.Clear(); } @@ -165,7 +166,7 @@ protected internal override async Task ExecuteCore( } /// - /// Asynchronously writes the given message to the stream. + /// Writes the given message to the stream. /// /// Stream to write to /// Rows to write to @@ -173,7 +174,7 @@ protected internal override async Task ExecuteCore( /// Estimated max size of the serialized output. /// If it's not big enough, pickler increases the buffer. /// - private async Task WriteOutputAsync(Stream stream, IEnumerable rows, int sizeHint) + private void WriteOutput(Stream stream, IEnumerable rows, int sizeHint) { if (s_outputBuffer == null) s_outputBuffer = new byte[sizeHint]; @@ -186,8 +187,8 @@ private async Task WriteOutputAsync(Stream stream, IEnumerable rows, int throw new Exception($"Serialized output size must be positive. Was {bytesWritten}."); } - await SerDe.WriteAsync(stream, bytesWritten); - await SerDe.WriteAsync(stream, s_outputBuffer, bytesWritten); + SerDe.Write(stream, bytesWritten); + SerDe.Write(stream, s_outputBuffer, bytesWritten); } /// @@ -300,34 +301,20 @@ public object Run(int splitId, object input) internal abstract class ArrowBasedCommandExecutor : SqlCommandExecutor { - private static IpcOptions s_arrowIpcOptions; + protected Version _version; - internal static IpcOptions ArrowIpcOptions - { - get + protected IpcOptions ArrowIpcOptions() => + new IpcOptions { - if (s_arrowIpcOptions == null) + //WriteLegacyIpcFormat = true + WriteLegacyIpcFormat = _version.Major switch { - s_arrowIpcOptions = new IpcOptions - { - WriteLegacyIpcFormat = SparkEnvironment.SparkVersion.Major switch - { - 2 => true, - 3 => false, - _ => throw new NotSupportedException( - $"Spark {SparkEnvironment.SparkVersion} not supported.") - } - }; + 2 => true, + 3 => false, + _ => throw new NotSupportedException( + $"Spark {SparkEnvironment.SparkVersion} not supported.") } - - return s_arrowIpcOptions; - } - set - { - // For Tests - s_arrowIpcOptions = value; - } - } + }; protected IEnumerable GetInputIterator(Stream inputStream) { @@ -357,12 +344,17 @@ protected IEnumerable GetInputIterator(Stream inputStream) yield return new RecordBatch(reader.Schema, arrays, 0); } } - } + } } internal class ArrowOrDataFrameSqlCommandExecutor : ArrowBasedCommandExecutor { - protected internal override async Task ExecuteCore( + internal ArrowOrDataFrameSqlCommandExecutor(Version version) + { + _version = version; + } + + protected internal override CommandExecutorStat ExecuteCore( Stream inputStream, Stream outputStream, SqlCommand[] commands) @@ -388,12 +380,12 @@ protected internal override async Task ExecuteCore( } if (useDataFrameCommandExecutor) { - return await ExecuteDataFrameSqlCommand(inputStream, outputStream, commands); + return ExecuteDataFrameSqlCommand(inputStream, outputStream, commands); } - return await ExecuteArrowSqlCommand(inputStream, outputStream, commands); + return ExecuteArrowSqlCommand(inputStream, outputStream, commands); } - private async Task ExecuteArrowSqlCommand( + private CommandExecutorStat ExecuteArrowSqlCommand( Stream inputStream, Stream outputStream, SqlCommand[] commands) @@ -403,6 +395,7 @@ private async Task ExecuteArrowSqlCommand( SerDe.Write(outputStream, (int)SpecialLengths.START_ARROW_STREAM); + IpcOptions ipcOptions = ArrowIpcOptions(); ArrowStreamWriter writer = null; Schema resultSchema = null; foreach (ReadOnlyMemory input in GetArrowInputIterator(inputStream)) @@ -418,21 +411,21 @@ private async Task ExecuteArrowSqlCommand( Debug.Assert(resultSchema == null); resultSchema = BuildSchema(results); - writer = new ArrowStreamWriter(outputStream, resultSchema, true, ArrowIpcOptions); + writer = new ArrowStreamWriter(outputStream, resultSchema, true, ipcOptions); } var recordBatch = new RecordBatch(resultSchema, results, numEntries); - await writer.WriteRecordBatchAsync(recordBatch); + writer.WriteRecordBatchAsync(recordBatch).GetAwaiter().GetResult(); } - await writer.WriteEndAsync(); + writer.WriteEndAsync().GetAwaiter().GetResult(); writer?.Dispose(); return stat; } - private async Task ExecuteDataFrameSqlCommand( + private CommandExecutorStat ExecuteDataFrameSqlCommand( Stream inputStream, Stream outputStream, SqlCommand[] commands) @@ -442,6 +435,7 @@ private async Task ExecuteDataFrameSqlCommand( SerDe.Write(outputStream, (int)SpecialLengths.START_ARROW_STREAM); + IpcOptions ipcOptions = ArrowIpcOptions(); ArrowStreamWriter writer = null; foreach (RecordBatch input in GetInputIterator(inputStream)) { @@ -463,14 +457,14 @@ private async Task ExecuteDataFrameSqlCommand( if (writer == null) { - writer = new ArrowStreamWriter(outputStream, result.Schema, true, ArrowIpcOptions); + writer = new ArrowStreamWriter(outputStream, result.Schema, true, ipcOptions); } - await writer.WriteRecordBatchAsync(result); + writer.WriteRecordBatchAsync(result).GetAwaiter().GetResult(); } } - await writer.WriteEndAsync(); + writer.WriteEndAsync().GetAwaiter().GetResult(); writer?.Dispose(); return stat; @@ -698,33 +692,12 @@ public DataFrameColumn[] Run(ReadOnlyMemory input) internal class ArrowOrDataFrameGroupedMapCommandExecutor : ArrowOrDataFrameSqlCommandExecutor { - private static Func s_recordBatchFunc; - - // Transforms the RecordBatch to something that is compatible with the - // current version of Spark. - internal static Func RecordBatchFunc + internal ArrowOrDataFrameGroupedMapCommandExecutor(Version version) + : base(version) { - get - { - if (s_recordBatchFunc == null) - { - s_recordBatchFunc = SparkEnvironment.SparkVersion.Major switch - { - 2 => (RecordBatch r) => r, - _ => throw new NotSupportedException( - $"Spark {SparkEnvironment.SparkVersion} not supported.") - }; - } - - return s_recordBatchFunc; - } - set - { - // For Tests - s_recordBatchFunc = value; - } } - protected internal override async Task ExecuteCore( + + protected internal override CommandExecutorStat ExecuteCore( Stream inputStream, Stream outputStream, SqlCommand[] commands) @@ -752,12 +725,12 @@ protected internal override async Task ExecuteCore( } if (useDataFrameGroupedMapCommandExecutor) { - return await ExecuteDataFrameGroupedMapCommand(inputStream, outputStream, commands); + return ExecuteDataFrameGroupedMapCommand(inputStream, outputStream, commands); } - return await ExecuteArrowGroupedMapCommand(inputStream, outputStream, commands); - } + return ExecuteArrowGroupedMapCommand(inputStream, outputStream, commands); + } - private async Task ExecuteArrowGroupedMapCommand( + private CommandExecutorStat ExecuteArrowGroupedMapCommand( Stream inputStream, Stream outputStream, SqlCommand[] commands) @@ -770,29 +743,30 @@ private async Task ExecuteArrowGroupedMapCommand( SerDe.Write(outputStream, (int)SpecialLengths.START_ARROW_STREAM); + IpcOptions ipcOptions = ArrowIpcOptions(); ArrowStreamWriter writer = null; foreach (RecordBatch input in GetInputIterator(inputStream)) { - RecordBatch result = RecordBatchFunc(worker.Func(input)); + RecordBatch result = worker.Func(input); int numEntries = result.Length; stat.NumEntriesProcessed += numEntries; if (writer == null) { - writer = new ArrowStreamWriter(outputStream, result.Schema, true, ArrowIpcOptions); + writer = new ArrowStreamWriter(outputStream, result.Schema, true, ipcOptions); } - await writer.WriteRecordBatchAsync(result); + writer.WriteRecordBatchAsync(result).GetAwaiter().GetResult(); } - await writer.WriteEndAsync(); + writer.WriteEndAsync().GetAwaiter().GetResult(); writer?.Dispose(); return stat; } - private async Task ExecuteDataFrameGroupedMapCommand( + private CommandExecutorStat ExecuteDataFrameGroupedMapCommand( Stream inputStream, Stream outputStream, SqlCommand[] commands) @@ -805,6 +779,7 @@ private async Task ExecuteDataFrameGroupedMapCommand( SerDe.Write(outputStream, (int)SpecialLengths.START_ARROW_STREAM); + IpcOptions ipcOptions = ArrowIpcOptions(); ArrowStreamWriter writer = null; foreach (RecordBatch input in GetInputIterator(inputStream)) { @@ -814,19 +789,19 @@ private async Task ExecuteDataFrameGroupedMapCommand( foreach (RecordBatch record in recordBatches) { - RecordBatch result = RecordBatchFunc(record); + RecordBatch result = record; stat.NumEntriesProcessed += result.Length; if (writer == null) { - writer = new ArrowStreamWriter(outputStream, result.Schema, true, ArrowIpcOptions); + writer = new ArrowStreamWriter(outputStream, result.Schema, true, ipcOptions); } - await writer.WriteRecordBatchAsync(result); + writer.WriteRecordBatchAsync(result).GetAwaiter().GetResult(); } } - await writer.WriteEndAsync(); + writer.WriteEndAsync().GetAwaiter().GetResult(); writer?.Dispose(); return stat; diff --git a/src/csharp/Microsoft.Spark.Worker/TaskRunner.cs b/src/csharp/Microsoft.Spark.Worker/TaskRunner.cs index 2c15c2bf4..aab61b6cf 100644 --- a/src/csharp/Microsoft.Spark.Worker/TaskRunner.cs +++ b/src/csharp/Microsoft.Spark.Worker/TaskRunner.cs @@ -150,7 +150,7 @@ private Payload ProcessStream( DateTime initTime = DateTime.UtcNow; - CommandExecutorStat commandExecutorStat = new CommandExecutor().Execute( + CommandExecutorStat commandExecutorStat = new CommandExecutor(version).Execute( inputStream, outputStream, payload.SplitIndex, diff --git a/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs b/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs index 8dde21ff2..c2c742e87 100644 --- a/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs +++ b/src/csharp/Microsoft.Spark/Interop/Ipc/SerDe.cs @@ -6,7 +6,6 @@ using System.Buffers.Binary; using System.IO; using System.Text; -using System.Threading.Tasks; namespace Microsoft.Spark.Interop.Ipc { @@ -283,15 +282,6 @@ public static void Write(Stream s, byte[] value) => public static void Write(Stream s, byte[] value, int count) => s.Write(value, 0, count); - /// - /// Asynchronously writes a byte array to a stream - /// - /// The stream to write - /// The byte array to write - /// The number of bytes in the array to write. - public static async Task WriteAsync(Stream s, byte[] value, int count) => - await s.WriteAsync(value, 0, count); - /// /// Writes a boolean to a stream /// @@ -312,18 +302,6 @@ public static void Write(Stream s, int value) Write(s, buffer, sizeof(int)); } - /// - /// Asynchronously writes an integer to a stream (big-endian). - /// - /// The stream to write - /// The integer to write - public static async Task WriteAsync(Stream s, int value) - { - byte[] buffer = GetThreadLocalBuffer(sizeof(int)); - BinaryPrimitives.WriteInt32BigEndian(buffer, value); - await WriteAsync(s, buffer, sizeof(int)); - } - /// /// Writes a long integer to a stream (big-endian). /// From 0253e8cc5a360dc2c7b0e67cbe74c32b70e6a570 Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Mon, 7 Sep 2020 14:49:49 -0700 Subject: [PATCH 07/21] remove comment --- src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index 01cfcdb54..17c5ba391 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -306,7 +306,6 @@ internal abstract class ArrowBasedCommandExecutor : SqlCommandExecutor protected IpcOptions ArrowIpcOptions() => new IpcOptions { - //WriteLegacyIpcFormat = true WriteLegacyIpcFormat = _version.Major switch { 2 => true, From 5c1ce47ef667604e2f20dde876ebc3e856aaefcd Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Mon, 7 Sep 2020 14:50:59 -0700 Subject: [PATCH 08/21] remove whitespace --- .../Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index 17c5ba391..b5915f783 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -343,7 +343,7 @@ protected IEnumerable GetInputIterator(Stream inputStream) yield return new RecordBatch(reader.Schema, arrays, 0); } } - } + } } internal class ArrowOrDataFrameSqlCommandExecutor : ArrowBasedCommandExecutor @@ -727,7 +727,7 @@ protected internal override CommandExecutorStat ExecuteCore( return ExecuteDataFrameGroupedMapCommand(inputStream, outputStream, commands); } return ExecuteArrowGroupedMapCommand(inputStream, outputStream, commands); - } + } private CommandExecutorStat ExecuteArrowGroupedMapCommand( Stream inputStream, From f39d3ee819eae208a5c2cd837f110454ed9492fb Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Mon, 7 Sep 2020 21:53:21 -0700 Subject: [PATCH 09/21] enable tests for spark 3.0 --- .../Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs | 6 +++--- .../Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs index ef6ea71b3..0c03303dc 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs @@ -35,7 +35,7 @@ public BroadcastTests(SparkFixture fixture) /// /// Test Broadcast support by using multiple broadcast variables in a UDF. /// - [SkipIfSparkVersionIsGreaterOrEqualTo(Versions.V3_0_0)] + [Fact] public void TestMultipleBroadcastWithoutEncryption() { var obj1 = new TestBroadcastVariable(1, "first"); @@ -56,7 +56,7 @@ public void TestMultipleBroadcastWithoutEncryption() /// Test Broadcast.Destroy() that destroys all data and metadata related to the broadcast /// variable and makes it inaccessible from workers. /// - [SkipIfSparkVersionIsGreaterOrEqualTo(Versions.V3_0_0)] + [Fact] public void TestDestroy() { var obj1 = new TestBroadcastVariable(5, "destroy"); @@ -97,7 +97,7 @@ public void TestDestroy() /// Test Broadcast.Unpersist() deletes cached copies of the broadcast on the executors. If /// the broadcast is used after unpersist is called, it is re-sent to the executors. /// - [SkipIfSparkVersionIsGreaterOrEqualTo(Versions.V3_0_0)] + [Fact] public void TestUnpersist() { var obj = new TestBroadcastVariable(1, "unpersist"); diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs index e830a7e42..423f026ca 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs @@ -156,7 +156,7 @@ public void TestUDF() } } - [SkipIfSparkVersionIsGreaterOrEqualTo(Versions.V3_0_0)] + [Fact] public void TestVectorUdf() { Func udf1Func = @@ -224,7 +224,7 @@ public void TestVectorUdf() } } - [SkipIfSparkVersionIsGreaterOrEqualTo(Versions.V3_0_0)] + [Fact] public void TestDataFrameVectorUdf() { Func udf1Func = @@ -368,7 +368,6 @@ private static RecordBatch ArrowBasedCountCharacters(RecordBatch records) returnLength); } - [SkipIfSparkVersionIsGreaterOrEqualTo(Versions.V3_0_0)] public void TestDataFrameGroupedMapUdf() { From dcf8196e5c7832a8783b2399c611785548413739 Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Mon, 7 Sep 2020 21:58:45 -0700 Subject: [PATCH 10/21] fix exception message. --- .../Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index b5915f783..947a967f8 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -310,8 +310,7 @@ protected IpcOptions ArrowIpcOptions() => { 2 => true, 3 => false, - _ => throw new NotSupportedException( - $"Spark {SparkEnvironment.SparkVersion} not supported.") + _ => throw new NotSupportedException($"Spark {_version} not supported.") } }; From e11b17ce3a7d23360798d025dc4accfab7b7078e Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Mon, 7 Sep 2020 21:59:57 -0700 Subject: [PATCH 11/21] clean up usings. --- src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs index 0c03303dc..511f5a122 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs @@ -1,6 +1,5 @@ using System; using System.Linq; -using Microsoft.Spark.E2ETest.Utils; using Microsoft.Spark.Sql; using Xunit; using static Microsoft.Spark.Sql.Functions; From e92a7e65f16d888eb9ff556d3949274e86a0a66d Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Tue, 8 Sep 2020 13:18:38 -0700 Subject: [PATCH 12/21] skip tests in backward compat pipeline. --- azure-pipelines.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index d4ee2238c..ce4c9aec3 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -16,6 +16,8 @@ variables: backwardCompatibleRelease: '0.9.0' TestsToFilterOut: "(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestDataFrameGroupedMapUdf)&\ (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestDataFrameVectorUdf)&\ + (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestGroupedMapUdf)&\ + (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestVectorUdf)&\ (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.BroadcastTests.TestDestroy)&\ (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.BroadcastTests.TestMultipleBroadcastWithoutEncryption)&\ (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.BroadcastTests.TestUnpersist)&\ From fd79786b6d5b1ec8b2853e2712431916837f8d1d Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Tue, 8 Sep 2020 18:57:07 -0700 Subject: [PATCH 13/21] PR comments. --- azure-pipelines.yml | 6 +- .../CommandExecutorTests.cs | 192 ++++++++++-------- .../Command/SqlCommandExecutor.cs | 107 +++++----- 3 files changed, 167 insertions(+), 138 deletions(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index ce4c9aec3..757cbda33 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -16,8 +16,6 @@ variables: backwardCompatibleRelease: '0.9.0' TestsToFilterOut: "(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestDataFrameGroupedMapUdf)&\ (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestDataFrameVectorUdf)&\ - (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestGroupedMapUdf)&\ - (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestVectorUdf)&\ (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.BroadcastTests.TestDestroy)&\ (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.BroadcastTests.TestMultipleBroadcastWithoutEncryption)&\ (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.BroadcastTests.TestUnpersist)&\ @@ -565,7 +563,9 @@ stages: inputs: command: test projects: '**/Microsoft.Spark*.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' + arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)&\ + (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestGroupedMapUdf)&\ + (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestVectorUdf)' env: SPARK_HOME: $(Build.BinariesDirectory)\spark-3.0.0-bin-hadoop2.7 diff --git a/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs b/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs index 450835035..166f233b6 100644 --- a/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs +++ b/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs @@ -26,22 +26,12 @@ namespace Microsoft.Spark.Worker.UnitTest { public class CommandExecutorTests { - private readonly Version _version; - private readonly IpcOptions _ipcOptions; - - public CommandExecutorTests() - { - // Spark 3.x and tests use ArrowStreamReader 0.15.1+ - _version = new Version(Versions.V3_0_0); - _ipcOptions = new IpcOptions - { - WriteLegacyIpcFormat = false - }; - } - - [Fact] - public void TestPicklingSqlCommandExecutorWithSingleCommand() + [Theory] + [InlineData(Versions.V2_4_2)] + [InlineData(Versions.V3_0_0)] + public void TestPicklingSqlCommandExecutorWithSingleCommand(string sparkVersionStr) { + Version sparkVersion = new Version(sparkVersionStr); var udfWrapper = new Sql.PicklingUdfWrapper( (str) => "udf: " + ((str is null) ? "NULL" : str)); var command = new SqlCommand() @@ -75,7 +65,7 @@ public void TestPicklingSqlCommandExecutorWithSingleCommand() SerDe.Write(inputStream, (int)SpecialLengths.END_OF_DATA_SECTION); inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor(_version).Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -112,9 +102,12 @@ public void TestPicklingSqlCommandExecutorWithSingleCommand() Assert.Equal(outputStream.Length, outputStream.Position); } - [Fact] - public void TestPicklingSqlCommandExecutorWithMultiCommands() + [Theory] + [InlineData(Versions.V2_4_2)] + [InlineData(Versions.V3_0_0)] + public void TestPicklingSqlCommandExecutorWithMultiCommands(string sparkVersionStr) { + Version sparkVersion = new Version(sparkVersionStr); var udfWrapper1 = new Sql.PicklingUdfWrapper((str) => $"udf: {str}"); var udfWrapper2 = new Sql.PicklingUdfWrapper( (arg1, arg2) => arg1 * arg2); @@ -159,7 +152,7 @@ public void TestPicklingSqlCommandExecutorWithMultiCommands() SerDe.Write(inputStream, (int)SpecialLengths.END_OF_DATA_SECTION); inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor(_version).Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -196,9 +189,12 @@ public void TestPicklingSqlCommandExecutorWithMultiCommands() Assert.Equal(outputStream.Length, outputStream.Position); } - [Fact] - public void TestPicklingSqlCommandExecutorWithEmptyInput() + [Theory] + [InlineData(Versions.V2_4_2)] + [InlineData(Versions.V3_0_0)] + public void TestPicklingSqlCommandExecutorWithEmptyInput(string sparkVersionStr) { + Version sparkVersion = new Version(sparkVersionStr); var udfWrapper = new Sql.PicklingUdfWrapper((str) => $"udf: {str}"); var command = new SqlCommand() { @@ -222,7 +218,7 @@ public void TestPicklingSqlCommandExecutorWithEmptyInput() SerDe.Write(inputStream, (int)SpecialLengths.END_OF_DATA_SECTION); inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor(_version).Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -236,8 +232,11 @@ public void TestPicklingSqlCommandExecutorWithEmptyInput() Assert.Equal(0, outputStream.Length); } - [Fact] - public async Task TestArrowSqlCommandExecutorWithSingleCommand() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + public async Task TestArrowSqlCommandExecutorWithSingleCommand( + Version sparkVersion, + IpcOptions ipcOptions) { var udfWrapper = new Sql.ArrowUdfWrapper( (strings) => (StringArray)ToArrowArray( @@ -268,7 +267,7 @@ public async Task TestArrowSqlCommandExecutorWithSingleCommand() Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, _ipcOptions); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -283,7 +282,7 @@ await arrowWriter.WriteRecordBatchAsync( inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor(_version).Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -309,9 +308,6 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal($"udf: {i}", array.GetString(i)); } - int continuationToken = SerDe.ReadInt32(outputStream); - Assert.Equal(-1, continuationToken); - int end = SerDe.ReadInt32(outputStream); Assert.Equal(0, end); @@ -319,8 +315,12 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(outputStream.Length, outputStream.Position); } - [Fact] - public async Task TestDataFrameSqlCommandExecutorWithSingleCommand() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + + public async Task TestDataFrameSqlCommandExecutorWithSingleCommand( + Version sparkVersion, + IpcOptions ipcOptions) { var udfWrapper = new Sql.DataFrameUdfWrapper( (strings) => strings.Apply(cur => $"udf: {cur}")); @@ -348,7 +348,7 @@ public async Task TestDataFrameSqlCommandExecutorWithSingleCommand() Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, _ipcOptions); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -363,7 +363,7 @@ await arrowWriter.WriteRecordBatchAsync( inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor(_version).Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -389,9 +389,6 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal($"udf: {i}", array.GetString(i)); } - int continuationToken = SerDe.ReadInt32(outputStream); - Assert.Equal(-1, continuationToken); - int end = SerDe.ReadInt32(outputStream); Assert.Equal(0, end); @@ -399,8 +396,12 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(outputStream.Length, outputStream.Position); } - [Fact] - public async Task TestArrowSqlCommandExecutorWithMultiCommands() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + + public async Task TestArrowSqlCommandExecutorWithMultiCommands( + Version sparkVersion, + IpcOptions ipcOptions) { var udfWrapper1 = new Sql.ArrowUdfWrapper( (strings) => (StringArray)ToArrowArray( @@ -447,7 +448,7 @@ public async Task TestArrowSqlCommandExecutorWithMultiCommands() .Field(b => b.Name("arg2").DataType(Int32Type.Default)) .Field(b => b.Name("arg3").DataType(Int32Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, _ipcOptions); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -464,7 +465,7 @@ await arrowWriter.WriteRecordBatchAsync( inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor(_version).Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -491,9 +492,6 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(i * i, array2.Values[i]); } - int continuationToken = SerDe.ReadInt32(outputStream); - Assert.Equal(-1, continuationToken); - int end = SerDe.ReadInt32(outputStream); Assert.Equal(0, end); @@ -501,8 +499,12 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(outputStream.Length, outputStream.Position); } - [Fact] - public async Task TestDataFrameSqlCommandExecutorWithMultiCommands() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + + public async Task TestDataFrameSqlCommandExecutorWithMultiCommands( + Version sparkVersion, + IpcOptions ipcOptions) { var udfWrapper1 = new Sql.DataFrameUdfWrapper( (strings) => strings.Apply(cur => $"udf: {cur}")); @@ -544,7 +546,7 @@ public async Task TestDataFrameSqlCommandExecutorWithMultiCommands() .Field(b => b.Name("arg2").DataType(Int32Type.Default)) .Field(b => b.Name("arg3").DataType(Int32Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, _ipcOptions); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -561,7 +563,7 @@ await arrowWriter.WriteRecordBatchAsync( inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor(_version).Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -588,9 +590,6 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(i * i, array2.Values[i]); } - int continuationToken = SerDe.ReadInt32(outputStream); - Assert.Equal(-1, continuationToken); - int end = SerDe.ReadInt32(outputStream); Assert.Equal(0, end); @@ -603,8 +602,12 @@ await arrowWriter.WriteRecordBatchAsync( /// Schema, and no record batches, that CommandExecutor writes the /// appropriate response back. /// - [Fact] - public void TestArrowSqlCommandExecutorWithEmptyInput() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + + public void TestArrowSqlCommandExecutorWithEmptyInput( + Version sparkVersion, + IpcOptions ipcOptions) { var udfWrapper = new Sql.ArrowUdfWrapper( (strings) => (StringArray)ToArrowArray( @@ -633,7 +636,7 @@ public void TestArrowSqlCommandExecutorWithEmptyInput() Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, _ipcOptions); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); // The .NET ArrowStreamWriter doesn't currently support writing just a // schema with no batches - but Java does. We use Reflection to simulate @@ -650,7 +653,7 @@ public void TestArrowSqlCommandExecutorWithEmptyInput() inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor(_version).Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -676,9 +679,6 @@ public void TestArrowSqlCommandExecutorWithEmptyInput() var array = (StringArray)outputBatch.Arrays.ElementAt(0); Assert.Equal(0, array.Length); - int continuationToken = SerDe.ReadInt32(outputStream); - Assert.Equal(-1, continuationToken); - int end = SerDe.ReadInt32(outputStream); Assert.Equal(0, end); @@ -691,8 +691,12 @@ public void TestArrowSqlCommandExecutorWithEmptyInput() /// Schema, and no record batches, that CommandExecutor writes the /// appropriate response back. /// - [Fact] - public void TestDataFrameSqlCommandExecutorWithEmptyInput() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + + public void TestDataFrameSqlCommandExecutorWithEmptyInput( + Version sparkVersion, + IpcOptions ipcOptions) { var udfWrapper = new Sql.DataFrameUdfWrapper( (strings) => strings.Apply(cur=> $"udf: {cur}")); @@ -718,7 +722,7 @@ public void TestDataFrameSqlCommandExecutorWithEmptyInput() Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, _ipcOptions); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); // The .NET ArrowStreamWriter doesn't currently support writing just a // schema with no batches - but Java does. We use Reflection to simulate @@ -735,7 +739,7 @@ public void TestDataFrameSqlCommandExecutorWithEmptyInput() inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor(_version).Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -761,9 +765,6 @@ public void TestDataFrameSqlCommandExecutorWithEmptyInput() var array = (StringArray)outputBatch.Arrays.ElementAt(0); Assert.Equal(0, array.Length); - int continuationToken = SerDe.ReadInt32(outputStream); - Assert.Equal(-1, continuationToken); - int end = SerDe.ReadInt32(outputStream); Assert.Equal(0, end); @@ -771,8 +772,12 @@ public void TestDataFrameSqlCommandExecutorWithEmptyInput() Assert.Equal(outputStream.Length, outputStream.Position); } - [Fact] - public async Task TestArrowGroupedMapCommandExecutor() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + + public async Task TestArrowGroupedMapCommandExecutor( + Version sparkVersion, + IpcOptions ipcOptions) { StringArray ConvertStrings(StringArray strings) { @@ -829,7 +834,7 @@ Int64Array ConvertInt64s(Int64Array int64s) .Field(b => b.Name("arg1").DataType(StringType.Default)) .Field(b => b.Name("arg2").DataType(Int64Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, _ipcOptions); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -848,7 +853,7 @@ await arrowWriter.WriteRecordBatchAsync( inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor(_version).Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -880,9 +885,6 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(100 + i, longArray.Values[i]); } - int continuationToken = SerDe.ReadInt32(outputStream); ; - Assert.Equal(-1, continuationToken); - int end = SerDe.ReadInt32(outputStream); Assert.Equal(0, end); @@ -890,8 +892,12 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(outputStream.Length, outputStream.Position); } - [Fact] - public async Task TestDataFrameGroupedMapCommandExecutor() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + + public async Task TestDataFrameGroupedMapCommandExecutor( + Version sparkVersion, + IpcOptions ipcOptions) { ArrowStringDataFrameColumn ConvertStrings(ArrowStringDataFrameColumn strings) { @@ -935,7 +941,7 @@ ArrowStringDataFrameColumn ConvertStrings(ArrowStringDataFrameColumn strings) .Field(b => b.Name("arg1").DataType(StringType.Default)) .Field(b => b.Name("arg2").DataType(Int64Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, _ipcOptions); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -954,7 +960,7 @@ await arrowWriter.WriteRecordBatchAsync( inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor(_version).Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -986,9 +992,6 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(100 + i, doubleArray.Values[i]); } - int continuationToken = SerDe.ReadInt32(outputStream); - Assert.Equal(-1, continuationToken); - int end = SerDe.ReadInt32(outputStream); Assert.Equal(0, end); @@ -996,9 +999,12 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(outputStream.Length, outputStream.Position); } - [Fact] - public void TestRDDCommandExecutor() + [Theory] + [InlineData(Versions.V2_4_2)] + [InlineData(Versions.V3_0_0)] + public void TestRDDCommandExecutor(string sparkVersionStr) { + Version sparkVersion = new Version(sparkVersionStr); static int mapUdf(int a) => a + 3; var command = new RDDCommand() { @@ -1040,7 +1046,7 @@ public void TestRDDCommandExecutor() inputStream.Seek(0, SeekOrigin.Begin); // Execute the command. - CommandExecutorStat stat = new CommandExecutor(_version).Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -1065,4 +1071,28 @@ public void TestRDDCommandExecutor() Assert.Equal(outputStream.Length, outputStream.Position); } } + + public class CommandExecutorData + { + public static IEnumerable Data => + new List + { + new object[] + { + new Version(Versions.V2_4_2), + new IpcOptions + { + WriteLegacyIpcFormat = true + } + }, + new object[] + { + new Version(Versions.V3_0_0), + new IpcOptions + { + WriteLegacyIpcFormat = true + } + } + }; + } } diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index 947a967f8..779c3911b 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -12,7 +12,6 @@ using Apache.Arrow.Ipc; using Apache.Arrow.Types; using Microsoft.Data.Analysis; -using Microsoft.Spark.Interop; using Microsoft.Spark.Interop.Ipc; using Microsoft.Spark.Sql; using Microsoft.Spark.Utils; @@ -31,7 +30,7 @@ internal abstract class SqlCommandExecutor /// Executes the commands on the input data read from input stream /// and writes results to the output stream. /// - /// Spark version. + /// Spark version /// Input stream to read data from /// Output stream to write results to /// Evaluation type for the current commands @@ -316,31 +315,29 @@ protected IpcOptions ArrowIpcOptions() => protected IEnumerable GetInputIterator(Stream inputStream) { - using (var reader = new ArrowStreamReader(inputStream, leaveOpen: true)) + using var reader = new ArrowStreamReader(inputStream, leaveOpen: true); + RecordBatch batch; + bool returnedResult = false; + while ((batch = reader.ReadNextRecordBatch()) != null) { - RecordBatch batch; - bool returnedResult = false; - while ((batch = reader.ReadNextRecordBatch()) != null) - { - yield return batch; - returnedResult = true; - } - - if (!returnedResult) - { - // When no input batches were received, return an empty RecordBatch - // in order to create and write back the result schema. + yield return batch; + returnedResult = true; + } - int columnCount = reader.Schema.Fields.Count; - var arrays = new IArrowArray[columnCount]; - for (int i = 0; i < columnCount; ++i) - { - IArrowType type = reader.Schema.GetFieldByIndex(i).DataType; - arrays[i] = ArrowArrayHelpers.CreateEmptyArray(type); - } + if (!returnedResult) + { + // When no input batches were received, return an empty RecordBatch + // in order to create and write back the result schema. - yield return new RecordBatch(reader.Schema, arrays, 0); + int columnCount = reader.Schema.Fields.Count; + var arrays = new IArrowArray[columnCount]; + for (int i = 0; i < columnCount; ++i) + { + IArrowType type = reader.Schema.GetFieldByIndex(i).DataType; + arrays[i] = ArrowArrayHelpers.CreateEmptyArray(type); } + + yield return new RecordBatch(reader.Schema, arrays, 0); } } } @@ -414,10 +411,11 @@ private CommandExecutorStat ExecuteArrowSqlCommand( var recordBatch = new RecordBatch(resultSchema, results, numEntries); + // TODO: Remove sync-over-async once WriteRecordBatch exists. writer.WriteRecordBatchAsync(recordBatch).GetAwaiter().GetResult(); } - writer.WriteEndAsync().GetAwaiter().GetResult(); + SerDe.Write(outputStream, 0); writer?.Dispose(); return stat; @@ -458,11 +456,12 @@ private CommandExecutorStat ExecuteDataFrameSqlCommand( writer = new ArrowStreamWriter(outputStream, result.Schema, true, ipcOptions); } + // TODO: Remove sync-over-async once WriteRecordBatch exists. writer.WriteRecordBatchAsync(result).GetAwaiter().GetResult(); } } - writer.WriteEndAsync().GetAwaiter().GetResult(); + SerDe.Write(outputStream, 0); writer?.Dispose(); return stat; @@ -479,39 +478,37 @@ private IEnumerable> GetArrowInputIterator(Stream in int columnCount = 0; try { - using (var reader = new ArrowStreamReader(inputStream, leaveOpen: true)) + using var reader = new ArrowStreamReader(inputStream, leaveOpen: true); + RecordBatch batch; + while ((batch = reader.ReadNextRecordBatch()) != null) { - RecordBatch batch; - while ((batch = reader.ReadNextRecordBatch()) != null) + columnCount = batch.ColumnCount; + if (arrays == null) { - columnCount = batch.ColumnCount; - if (arrays == null) - { - // Note that every batch in a stream has the same schema. - arrays = ArrayPool.Shared.Rent(columnCount); - } - - for (int i = 0; i < columnCount; ++i) - { - arrays[i] = batch.Column(i); - } - - yield return new ReadOnlyMemory(arrays, 0, columnCount); + // Note that every batch in a stream has the same schema. + arrays = ArrayPool.Shared.Rent(columnCount); } - if (arrays == null) + for (int i = 0; i < columnCount; ++i) { - // When no input batches were received, return empty IArrowArrays - // in order to create and write back the result schema. - columnCount = reader.Schema.Fields.Count; - arrays = ArrayPool.Shared.Rent(columnCount); + arrays[i] = batch.Column(i); + } - for (int i = 0; i < columnCount; ++i) - { - arrays[i] = null; - } - yield return new ReadOnlyMemory(arrays, 0, columnCount); + yield return new ReadOnlyMemory(arrays, 0, columnCount); + } + + if (arrays == null) + { + // When no input batches were received, return empty IArrowArrays + // in order to create and write back the result schema. + columnCount = reader.Schema.Fields.Count; + arrays = ArrayPool.Shared.Rent(columnCount); + + for (int i = 0; i < columnCount; ++i) + { + arrays[i] = null; } + yield return new ReadOnlyMemory(arrays, 0, columnCount); } } finally @@ -755,10 +752,11 @@ private CommandExecutorStat ExecuteArrowGroupedMapCommand( writer = new ArrowStreamWriter(outputStream, result.Schema, true, ipcOptions); } + // TODO: Remove sync-over-async once WriteRecordBatch exists. writer.WriteRecordBatchAsync(result).GetAwaiter().GetResult(); } - - writer.WriteEndAsync().GetAwaiter().GetResult(); + + SerDe.Write(outputStream, 0); writer?.Dispose(); return stat; @@ -795,11 +793,12 @@ private CommandExecutorStat ExecuteDataFrameGroupedMapCommand( writer = new ArrowStreamWriter(outputStream, result.Schema, true, ipcOptions); } + // TODO: Remove sync-over-async once WriteRecordBatch exists. writer.WriteRecordBatchAsync(result).GetAwaiter().GetResult(); } } - writer.WriteEndAsync().GetAwaiter().GetResult(); + SerDe.Write(outputStream, 0); writer?.Dispose(); return stat; From d421cc02810c6b8367f22a9e8351e99df8e054a1 Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Tue, 8 Sep 2020 20:14:29 -0700 Subject: [PATCH 14/21] double quotes. --- azure-pipelines.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 757cbda33..907de8c0a 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -163,7 +163,7 @@ stages: ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}: _OfficialBuildIdArgs: /p:OfficialBuildId=$(BUILD.BUILDNUMBER) HADOOP_HOME: $(Build.BinariesDirectory)\hadoop - DOTNET_WORKER_DIR: $(CurrentDotnetWorkerDir) + DOTNET_WORKER_DIR: $(CurrentDotnetWorkerDir) steps: - task: DownloadBuildArtifacts@0 @@ -431,7 +431,7 @@ stages: ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}: _OfficialBuildIdArgs: /p:OfficialBuildId=$(BUILD.BUILDNUMBER) HADOOP_HOME: $(Build.BinariesDirectory)\hadoop - DOTNET_WORKER_DIR: $(BackwardCompatibleDotnetWorkerDir) + DOTNET_WORKER_DIR: $(BackwardCompatibleDotnetWorkerDir) steps: - task: DownloadBuildArtifacts@0 @@ -563,9 +563,9 @@ stages: inputs: command: test projects: '**/Microsoft.Spark*.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)&\ + arguments: "--configuration $(buildConfiguration) --filter $(TestsToFilterOut)&\ (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestGroupedMapUdf)&\ - (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestVectorUdf)' + (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestVectorUdf)" env: SPARK_HOME: $(Build.BinariesDirectory)\spark-3.0.0-bin-hadoop2.7 From f4e986301cf2410db32bab0fca8feaeaeb3f3fdb Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Tue, 8 Sep 2020 20:54:53 -0700 Subject: [PATCH 15/21] add comment. --- azure-pipelines.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 907de8c0a..987e7728c 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -558,6 +558,9 @@ stages: env: SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.6-bin-hadoop2.7 + # Spark 3.0.0 uses Arrow 0.15.1, which contains a new Arrow spec. This breaks backward + # compatibility when using Microsoft.Spark.Worker with incompatible versions of Arrow. + # Skip Arrow tests until the backward compatibility Worker version is updated. - task: DotNetCoreCLI@2 displayName: 'E2E tests for Spark 3.0.0' inputs: @@ -568,4 +571,3 @@ stages: (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestVectorUdf)" env: SPARK_HOME: $(Build.BinariesDirectory)\spark-3.0.0-bin-hadoop2.7 - From ee7782dc25fa067190ee2996e39459bd60006fda Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Tue, 8 Sep 2020 21:01:31 -0700 Subject: [PATCH 16/21] fix flag. --- .../Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs b/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs index 166f233b6..b3deb8fdd 100644 --- a/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs +++ b/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs @@ -1090,7 +1090,7 @@ public class CommandExecutorData new Version(Versions.V3_0_0), new IpcOptions { - WriteLegacyIpcFormat = true + WriteLegacyIpcFormat = false } } }; From 905fd0f4daf88d8bad696d957bedde71cffad2cb Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Wed, 9 Sep 2020 10:06:50 -0700 Subject: [PATCH 17/21] write end of stream per arrow spec. --- .../CommandExecutorTests.cs | 36 ++++++++++--------- .../Command/SqlCommandExecutor.cs | 20 ++++++++--- 2 files changed, 35 insertions(+), 21 deletions(-) diff --git a/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs b/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs index b3deb8fdd..33443730c 100644 --- a/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs +++ b/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs @@ -308,8 +308,7 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal($"udf: {i}", array.GetString(i)); } - int end = SerDe.ReadInt32(outputStream); - Assert.Equal(0, end); + CheckEOS(outputStream, ipcOptions); // Validate all the data on the stream is read. Assert.Equal(outputStream.Length, outputStream.Position); @@ -389,8 +388,7 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal($"udf: {i}", array.GetString(i)); } - int end = SerDe.ReadInt32(outputStream); - Assert.Equal(0, end); + CheckEOS(outputStream, ipcOptions); // Validate all the data on the stream is read. Assert.Equal(outputStream.Length, outputStream.Position); @@ -492,8 +490,7 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(i * i, array2.Values[i]); } - int end = SerDe.ReadInt32(outputStream); - Assert.Equal(0, end); + CheckEOS(outputStream, ipcOptions); // Validate all the data on the stream is read. Assert.Equal(outputStream.Length, outputStream.Position); @@ -590,8 +587,7 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(i * i, array2.Values[i]); } - int end = SerDe.ReadInt32(outputStream); - Assert.Equal(0, end); + CheckEOS(outputStream, ipcOptions); // Validate all the data on the stream is read. Assert.Equal(outputStream.Length, outputStream.Position); @@ -679,8 +675,7 @@ public void TestArrowSqlCommandExecutorWithEmptyInput( var array = (StringArray)outputBatch.Arrays.ElementAt(0); Assert.Equal(0, array.Length); - int end = SerDe.ReadInt32(outputStream); - Assert.Equal(0, end); + CheckEOS(outputStream, ipcOptions); // Validate all the data on the stream is read. Assert.Equal(outputStream.Length, outputStream.Position); @@ -765,8 +760,7 @@ public void TestDataFrameSqlCommandExecutorWithEmptyInput( var array = (StringArray)outputBatch.Arrays.ElementAt(0); Assert.Equal(0, array.Length); - int end = SerDe.ReadInt32(outputStream); - Assert.Equal(0, end); + CheckEOS(outputStream, ipcOptions); // Validate all the data on the stream is read. Assert.Equal(outputStream.Length, outputStream.Position); @@ -885,8 +879,7 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(100 + i, longArray.Values[i]); } - int end = SerDe.ReadInt32(outputStream); - Assert.Equal(0, end); + CheckEOS(outputStream, ipcOptions); // Validate all the data on the stream is read. Assert.Equal(outputStream.Length, outputStream.Position); @@ -992,8 +985,7 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(100 + i, doubleArray.Values[i]); } - int end = SerDe.ReadInt32(outputStream); - Assert.Equal(0, end); + CheckEOS(outputStream, ipcOptions); // Validate all the data on the stream is read. Assert.Equal(outputStream.Length, outputStream.Position); @@ -1070,6 +1062,18 @@ public void TestRDDCommandExecutor(string sparkVersionStr) // Validate all the data on the stream is read. Assert.Equal(outputStream.Length, outputStream.Position); } + + private void CheckEOS(Stream stream, IpcOptions ipcOptions) + { + if (!ipcOptions.WriteLegacyIpcFormat) + { + int continuationToken = SerDe.ReadInt32(stream); + Assert.Equal(-1, continuationToken); + } + + int end = SerDe.ReadInt32(stream); + Assert.Equal(0, end); + } } public class CommandExecutorData diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index 779c3911b..aecf0eee6 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -340,6 +340,16 @@ protected IEnumerable GetInputIterator(Stream inputStream) yield return new RecordBatch(reader.Schema, arrays, 0); } } + + protected void WriteEnd(Stream stream, IpcOptions ipcOptions) + { + if (!ipcOptions.WriteLegacyIpcFormat) + { + SerDe.Write(stream, -1); + } + + SerDe.Write(stream, 0); + } } internal class ArrowOrDataFrameSqlCommandExecutor : ArrowBasedCommandExecutor @@ -415,7 +425,7 @@ private CommandExecutorStat ExecuteArrowSqlCommand( writer.WriteRecordBatchAsync(recordBatch).GetAwaiter().GetResult(); } - SerDe.Write(outputStream, 0); + WriteEnd(outputStream, ipcOptions); writer?.Dispose(); return stat; @@ -461,7 +471,7 @@ private CommandExecutorStat ExecuteDataFrameSqlCommand( } } - SerDe.Write(outputStream, 0); + WriteEnd(outputStream, ipcOptions); writer?.Dispose(); return stat; @@ -755,8 +765,8 @@ private CommandExecutorStat ExecuteArrowGroupedMapCommand( // TODO: Remove sync-over-async once WriteRecordBatch exists. writer.WriteRecordBatchAsync(result).GetAwaiter().GetResult(); } - - SerDe.Write(outputStream, 0); + + WriteEnd(outputStream, ipcOptions); writer?.Dispose(); return stat; @@ -798,7 +808,7 @@ private CommandExecutorStat ExecuteDataFrameGroupedMapCommand( } } - SerDe.Write(outputStream, 0); + WriteEnd(outputStream, ipcOptions); writer?.Dispose(); return stat; From 2b37a45c38e49bf900be8ee37040d8411a8d9b96 Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Fri, 11 Sep 2020 13:35:09 -0700 Subject: [PATCH 18/21] PR comments. --- .../CommandExecutorTests.cs | 64 +++++++++++-------- .../Command/SqlCommandExecutor.cs | 12 ++-- 2 files changed, 47 insertions(+), 29 deletions(-) diff --git a/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs b/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs index 33443730c..22ec4ca09 100644 --- a/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs +++ b/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs @@ -27,11 +27,13 @@ namespace Microsoft.Spark.Worker.UnitTest public class CommandExecutorTests { [Theory] - [InlineData(Versions.V2_4_2)] - [InlineData(Versions.V3_0_0)] - public void TestPicklingSqlCommandExecutorWithSingleCommand(string sparkVersionStr) + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + public void TestPicklingSqlCommandExecutorWithSingleCommand( + Version sparkVersion, +#pragma warning disable xUnit1026 // Theory methods should use all of their parameters + IpcOptions ipcOptions) +#pragma warning restore xUnit1026 // Theory methods should use all of their parameters { - Version sparkVersion = new Version(sparkVersionStr); var udfWrapper = new Sql.PicklingUdfWrapper( (str) => "udf: " + ((str is null) ? "NULL" : str)); var command = new SqlCommand() @@ -103,11 +105,13 @@ public void TestPicklingSqlCommandExecutorWithSingleCommand(string sparkVersionS } [Theory] - [InlineData(Versions.V2_4_2)] - [InlineData(Versions.V3_0_0)] - public void TestPicklingSqlCommandExecutorWithMultiCommands(string sparkVersionStr) + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + public void TestPicklingSqlCommandExecutorWithMultiCommands( + Version sparkVersion, +#pragma warning disable xUnit1026 // Theory methods should use all of their parameters + IpcOptions ipcOptions) +#pragma warning restore xUnit1026 // Theory methods should use all of their parameters { - Version sparkVersion = new Version(sparkVersionStr); var udfWrapper1 = new Sql.PicklingUdfWrapper((str) => $"udf: {str}"); var udfWrapper2 = new Sql.PicklingUdfWrapper( (arg1, arg2) => arg1 * arg2); @@ -190,11 +194,13 @@ public void TestPicklingSqlCommandExecutorWithMultiCommands(string sparkVersionS } [Theory] - [InlineData(Versions.V2_4_2)] - [InlineData(Versions.V3_0_0)] - public void TestPicklingSqlCommandExecutorWithEmptyInput(string sparkVersionStr) + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + public void TestPicklingSqlCommandExecutorWithEmptyInput( + Version sparkVersion, +#pragma warning disable xUnit1026 // Theory methods should use all of their parameters + IpcOptions ipcOptions) +#pragma warning restore xUnit1026 // Theory methods should use all of their parameters { - Version sparkVersion = new Version(sparkVersionStr); var udfWrapper = new Sql.PicklingUdfWrapper((str) => $"udf: {str}"); var command = new SqlCommand() { @@ -267,7 +273,8 @@ public async Task TestArrowSqlCommandExecutorWithSingleCommand( Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); + var arrowWriter = + new ArrowStreamWriter(inputStream, schema, leaveOpen: false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -347,7 +354,8 @@ public async Task TestDataFrameSqlCommandExecutorWithSingleCommand( Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); + var arrowWriter = + new ArrowStreamWriter(inputStream, schema, leaveOpen: false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -446,7 +454,8 @@ public async Task TestArrowSqlCommandExecutorWithMultiCommands( .Field(b => b.Name("arg2").DataType(Int32Type.Default)) .Field(b => b.Name("arg3").DataType(Int32Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); + var arrowWriter = + new ArrowStreamWriter(inputStream, schema, leaveOpen: false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -543,7 +552,8 @@ public async Task TestDataFrameSqlCommandExecutorWithMultiCommands( .Field(b => b.Name("arg2").DataType(Int32Type.Default)) .Field(b => b.Name("arg3").DataType(Int32Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); + var arrowWriter = + new ArrowStreamWriter(inputStream, schema, leaveOpen: false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -632,7 +642,8 @@ public void TestArrowSqlCommandExecutorWithEmptyInput( Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); + var arrowWriter = + new ArrowStreamWriter(inputStream, schema, leaveOpen: false, ipcOptions); // The .NET ArrowStreamWriter doesn't currently support writing just a // schema with no batches - but Java does. We use Reflection to simulate @@ -824,11 +835,12 @@ Int64Array ConvertInt64s(Int64Array int64s) int numRows = 10; // Write test data to the input stream. - var schema = new Schema.Builder() + Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Field(b => b.Name("arg2").DataType(Int64Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); + var arrowWriter = + new ArrowStreamWriter(inputStream, schema, leaveOpen: false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -930,11 +942,12 @@ ArrowStringDataFrameColumn ConvertStrings(ArrowStringDataFrameColumn strings) int numRows = 10; // Write test data to the input stream. - var schema = new Schema.Builder() + Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Field(b => b.Name("arg2").DataType(Int64Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); + var arrowWriter = + new ArrowStreamWriter(inputStream, schema, leaveOpen: false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -992,11 +1005,11 @@ await arrowWriter.WriteRecordBatchAsync( } [Theory] - [InlineData(Versions.V2_4_2)] - [InlineData(Versions.V3_0_0)] - public void TestRDDCommandExecutor(string sparkVersionStr) + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] +#pragma warning disable xUnit1026 // Theory methods should use all of their parameters + public void TestRDDCommandExecutor(Version sparkVersion, IpcOptions ipcOptions) +#pragma warning restore xUnit1026 // Theory methods should use all of their parameters { - Version sparkVersion = new Version(sparkVersionStr); static int mapUdf(int a) => a + 3; var command = new RDDCommand() { @@ -1078,6 +1091,7 @@ private void CheckEOS(Stream stream, IpcOptions ipcOptions) public class CommandExecutorData { + // CommandExecutor only changes its behavior between major versions. public static IEnumerable Data => new List { diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index aecf0eee6..f27c0a44d 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -416,7 +416,8 @@ private CommandExecutorStat ExecuteArrowSqlCommand( Debug.Assert(resultSchema == null); resultSchema = BuildSchema(results); - writer = new ArrowStreamWriter(outputStream, resultSchema, true, ipcOptions); + writer = + new ArrowStreamWriter(outputStream, resultSchema, leaveOpen: true, ipcOptions); } var recordBatch = new RecordBatch(resultSchema, results, numEntries); @@ -463,7 +464,8 @@ private CommandExecutorStat ExecuteDataFrameSqlCommand( if (writer == null) { - writer = new ArrowStreamWriter(outputStream, result.Schema, true, ipcOptions); + writer = + new ArrowStreamWriter(outputStream, result.Schema, leaveOpen: true, ipcOptions); } // TODO: Remove sync-over-async once WriteRecordBatch exists. @@ -759,7 +761,8 @@ private CommandExecutorStat ExecuteArrowGroupedMapCommand( if (writer == null) { - writer = new ArrowStreamWriter(outputStream, result.Schema, true, ipcOptions); + writer = + new ArrowStreamWriter(outputStream, result.Schema, leaveOpen: true, ipcOptions); } // TODO: Remove sync-over-async once WriteRecordBatch exists. @@ -800,7 +803,8 @@ private CommandExecutorStat ExecuteDataFrameGroupedMapCommand( if (writer == null) { - writer = new ArrowStreamWriter(outputStream, result.Schema, true, ipcOptions); + writer = + new ArrowStreamWriter(outputStream, result.Schema, leaveOpen: true, ipcOptions); } // TODO: Remove sync-over-async once WriteRecordBatch exists. From 247a1ab9f8691a33f076a2189b919fb80871fd70 Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Fri, 11 Sep 2020 14:42:55 -0700 Subject: [PATCH 19/21] Remove backward compatibility for 3.0 --- azure-pipelines.yml | 14 -------------- .../Command/SqlCommandExecutor.cs | 3 +-- 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 987e7728c..f91021b0c 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -557,17 +557,3 @@ stages: arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' env: SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.6-bin-hadoop2.7 - - # Spark 3.0.0 uses Arrow 0.15.1, which contains a new Arrow spec. This breaks backward - # compatibility when using Microsoft.Spark.Worker with incompatible versions of Arrow. - # Skip Arrow tests until the backward compatibility Worker version is updated. - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 3.0.0' - inputs: - command: test - projects: '**/Microsoft.Spark*.E2ETest/*.csproj' - arguments: "--configuration $(buildConfiguration) --filter $(TestsToFilterOut)&\ - (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestGroupedMapUdf)&\ - (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestVectorUdf)" - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-3.0.0-bin-hadoop2.7 diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index f27c0a44d..930e440be 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -796,9 +796,8 @@ private CommandExecutorStat ExecuteDataFrameGroupedMapCommand( FxDataFrame resultDataFrame = worker.Func(dataFrame); IEnumerable recordBatches = resultDataFrame.ToArrowRecordBatches(); - foreach (RecordBatch record in recordBatches) + foreach (RecordBatch result in recordBatches) { - RecordBatch result = record; stat.NumEntriesProcessed += result.Length; if (writer == null) From f55b79ff338ae3a0b0e3e7d1bf49f38f9099ae69 Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Fri, 11 Sep 2020 14:51:47 -0700 Subject: [PATCH 20/21] readd 3.0 backward compat tests. --- azure-pipelines.yml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index f91021b0c..bdab06d6d 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -557,3 +557,17 @@ stages: arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' env: SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.6-bin-hadoop2.7 + + # Spark 3.0.0 uses Arrow 0.15.1, which contains a new Arrow spec. This breaks backward + # compatibility when using Microsoft.Spark.Worker with incompatible versions of Arrow. + # Skip Arrow tests until the backward compatibility Worker version is updated. + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 3.0.0' + inputs: + command: test + projects: '**/Microsoft.Spark*.E2ETest/*.csproj' + arguments: "--configuration $(buildConfiguration) --filter $(TestsToFilterOut)&\ + (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestGroupedMapUdf)&\ + (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestVectorUdf)" + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-3.0.0-bin-hadoop2.7 From ce37879851c72a4a7076554d97b6ffb73eb799e8 Mon Sep 17 00:00:00 2001 From: Steve Suh Date: Fri, 11 Sep 2020 14:52:58 -0700 Subject: [PATCH 21/21] remove whitespace/tabs. --- azure-pipelines.yml | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index bdab06d6d..987e7728c 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -558,16 +558,16 @@ stages: env: SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.6-bin-hadoop2.7 - # Spark 3.0.0 uses Arrow 0.15.1, which contains a new Arrow spec. This breaks backward - # compatibility when using Microsoft.Spark.Worker with incompatible versions of Arrow. - # Skip Arrow tests until the backward compatibility Worker version is updated. - - task: DotNetCoreCLI@2 - displayName: 'E2E tests for Spark 3.0.0' - inputs: - command: test - projects: '**/Microsoft.Spark*.E2ETest/*.csproj' - arguments: "--configuration $(buildConfiguration) --filter $(TestsToFilterOut)&\ - (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestGroupedMapUdf)&\ - (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestVectorUdf)" - env: - SPARK_HOME: $(Build.BinariesDirectory)\spark-3.0.0-bin-hadoop2.7 + # Spark 3.0.0 uses Arrow 0.15.1, which contains a new Arrow spec. This breaks backward + # compatibility when using Microsoft.Spark.Worker with incompatible versions of Arrow. + # Skip Arrow tests until the backward compatibility Worker version is updated. + - task: DotNetCoreCLI@2 + displayName: 'E2E tests for Spark 3.0.0' + inputs: + command: test + projects: '**/Microsoft.Spark*.E2ETest/*.csproj' + arguments: "--configuration $(buildConfiguration) --filter $(TestsToFilterOut)&\ + (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestGroupedMapUdf)&\ + (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestVectorUdf)" + env: + SPARK_HOME: $(Build.BinariesDirectory)\spark-3.0.0-bin-hadoop2.7