diff --git a/azure-pipelines.yml b/azure-pipelines.yml index d4ee2238c..987e7728c 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -163,7 +163,7 @@ stages: ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}: _OfficialBuildIdArgs: /p:OfficialBuildId=$(BUILD.BUILDNUMBER) HADOOP_HOME: $(Build.BinariesDirectory)\hadoop - DOTNET_WORKER_DIR: $(CurrentDotnetWorkerDir) + DOTNET_WORKER_DIR: $(CurrentDotnetWorkerDir) steps: - task: DownloadBuildArtifacts@0 @@ -431,7 +431,7 @@ stages: ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}: _OfficialBuildIdArgs: /p:OfficialBuildId=$(BUILD.BUILDNUMBER) HADOOP_HOME: $(Build.BinariesDirectory)\hadoop - DOTNET_WORKER_DIR: $(BackwardCompatibleDotnetWorkerDir) + DOTNET_WORKER_DIR: $(BackwardCompatibleDotnetWorkerDir) steps: - task: DownloadBuildArtifacts@0 @@ -558,12 +558,16 @@ stages: env: SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.6-bin-hadoop2.7 + # Spark 3.0.0 uses Arrow 0.15.1, which contains a new Arrow spec. This breaks backward + # compatibility when using Microsoft.Spark.Worker with incompatible versions of Arrow. + # Skip Arrow tests until the backward compatibility Worker version is updated. - task: DotNetCoreCLI@2 displayName: 'E2E tests for Spark 3.0.0' inputs: command: test projects: '**/Microsoft.Spark*.E2ETest/*.csproj' - arguments: '--configuration $(buildConfiguration) --filter $(TestsToFilterOut)' + arguments: "--configuration $(buildConfiguration) --filter $(TestsToFilterOut)&\ + (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestGroupedMapUdf)&\ + (FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestVectorUdf)" env: SPARK_HOME: $(Build.BinariesDirectory)\spark-3.0.0-bin-hadoop2.7 - diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs index ef6ea71b3..511f5a122 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs @@ -1,6 +1,5 @@ using System; using System.Linq; -using Microsoft.Spark.E2ETest.Utils; using Microsoft.Spark.Sql; using Xunit; using static Microsoft.Spark.Sql.Functions; @@ -35,7 +34,7 @@ public BroadcastTests(SparkFixture fixture) /// /// Test Broadcast support by using multiple broadcast variables in a UDF. /// - [SkipIfSparkVersionIsGreaterOrEqualTo(Versions.V3_0_0)] + [Fact] public void TestMultipleBroadcastWithoutEncryption() { var obj1 = new TestBroadcastVariable(1, "first"); @@ -56,7 +55,7 @@ public void TestMultipleBroadcastWithoutEncryption() /// Test Broadcast.Destroy() that destroys all data and metadata related to the broadcast /// variable and makes it inaccessible from workers. /// - [SkipIfSparkVersionIsGreaterOrEqualTo(Versions.V3_0_0)] + [Fact] public void TestDestroy() { var obj1 = new TestBroadcastVariable(5, "destroy"); @@ -97,7 +96,7 @@ public void TestDestroy() /// Test Broadcast.Unpersist() deletes cached copies of the broadcast on the executors. If /// the broadcast is used after unpersist is called, it is re-sent to the executors. /// - [SkipIfSparkVersionIsGreaterOrEqualTo(Versions.V3_0_0)] + [Fact] public void TestUnpersist() { var obj = new TestBroadcastVariable(1, "unpersist"); diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs index e830a7e42..423f026ca 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs @@ -156,7 +156,7 @@ public void TestUDF() } } - [SkipIfSparkVersionIsGreaterOrEqualTo(Versions.V3_0_0)] + [Fact] public void TestVectorUdf() { Func udf1Func = @@ -224,7 +224,7 @@ public void TestVectorUdf() } } - [SkipIfSparkVersionIsGreaterOrEqualTo(Versions.V3_0_0)] + [Fact] public void TestDataFrameVectorUdf() { Func udf1Func = @@ -368,7 +368,6 @@ private static RecordBatch ArrowBasedCountCharacters(RecordBatch records) returnLength); } - [SkipIfSparkVersionIsGreaterOrEqualTo(Versions.V3_0_0)] public void TestDataFrameGroupedMapUdf() { diff --git a/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs b/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs index 8978e321e..22ec4ca09 100644 --- a/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs +++ b/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System; using System.Collections; using System.Collections.Generic; using System.IO; @@ -25,8 +26,13 @@ namespace Microsoft.Spark.Worker.UnitTest { public class CommandExecutorTests { - [Fact] - public void TestPicklingSqlCommandExecutorWithSingleCommand() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + public void TestPicklingSqlCommandExecutorWithSingleCommand( + Version sparkVersion, +#pragma warning disable xUnit1026 // Theory methods should use all of their parameters + IpcOptions ipcOptions) +#pragma warning restore xUnit1026 // Theory methods should use all of their parameters { var udfWrapper = new Sql.PicklingUdfWrapper( (str) => "udf: " + ((str is null) ? "NULL" : str)); @@ -61,7 +67,7 @@ public void TestPicklingSqlCommandExecutorWithSingleCommand() SerDe.Write(inputStream, (int)SpecialLengths.END_OF_DATA_SECTION); inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -98,8 +104,13 @@ public void TestPicklingSqlCommandExecutorWithSingleCommand() Assert.Equal(outputStream.Length, outputStream.Position); } - [Fact] - public void TestPicklingSqlCommandExecutorWithMultiCommands() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + public void TestPicklingSqlCommandExecutorWithMultiCommands( + Version sparkVersion, +#pragma warning disable xUnit1026 // Theory methods should use all of their parameters + IpcOptions ipcOptions) +#pragma warning restore xUnit1026 // Theory methods should use all of their parameters { var udfWrapper1 = new Sql.PicklingUdfWrapper((str) => $"udf: {str}"); var udfWrapper2 = new Sql.PicklingUdfWrapper( @@ -145,7 +156,7 @@ public void TestPicklingSqlCommandExecutorWithMultiCommands() SerDe.Write(inputStream, (int)SpecialLengths.END_OF_DATA_SECTION); inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -182,8 +193,13 @@ public void TestPicklingSqlCommandExecutorWithMultiCommands() Assert.Equal(outputStream.Length, outputStream.Position); } - [Fact] - public void TestPicklingSqlCommandExecutorWithEmptyInput() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + public void TestPicklingSqlCommandExecutorWithEmptyInput( + Version sparkVersion, +#pragma warning disable xUnit1026 // Theory methods should use all of their parameters + IpcOptions ipcOptions) +#pragma warning restore xUnit1026 // Theory methods should use all of their parameters { var udfWrapper = new Sql.PicklingUdfWrapper((str) => $"udf: {str}"); var command = new SqlCommand() @@ -208,7 +224,7 @@ public void TestPicklingSqlCommandExecutorWithEmptyInput() SerDe.Write(inputStream, (int)SpecialLengths.END_OF_DATA_SECTION); inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -222,8 +238,11 @@ public void TestPicklingSqlCommandExecutorWithEmptyInput() Assert.Equal(0, outputStream.Length); } - [Fact] - public async Task TestArrowSqlCommandExecutorWithSingleCommand() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + public async Task TestArrowSqlCommandExecutorWithSingleCommand( + Version sparkVersion, + IpcOptions ipcOptions) { var udfWrapper = new Sql.ArrowUdfWrapper( (strings) => (StringArray)ToArrowArray( @@ -254,7 +273,8 @@ public async Task TestArrowSqlCommandExecutorWithSingleCommand() Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema); + var arrowWriter = + new ArrowStreamWriter(inputStream, schema, leaveOpen: false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -269,7 +289,7 @@ await arrowWriter.WriteRecordBatchAsync( inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -295,15 +315,18 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal($"udf: {i}", array.GetString(i)); } - int end = SerDe.ReadInt32(outputStream); - Assert.Equal(0, end); + CheckEOS(outputStream, ipcOptions); // Validate all the data on the stream is read. Assert.Equal(outputStream.Length, outputStream.Position); } - [Fact] - public async Task TestDataFrameSqlCommandExecutorWithSingleCommand() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + + public async Task TestDataFrameSqlCommandExecutorWithSingleCommand( + Version sparkVersion, + IpcOptions ipcOptions) { var udfWrapper = new Sql.DataFrameUdfWrapper( (strings) => strings.Apply(cur => $"udf: {cur}")); @@ -331,7 +354,8 @@ public async Task TestDataFrameSqlCommandExecutorWithSingleCommand() Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema); + var arrowWriter = + new ArrowStreamWriter(inputStream, schema, leaveOpen: false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -346,7 +370,7 @@ await arrowWriter.WriteRecordBatchAsync( inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -372,15 +396,18 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal($"udf: {i}", array.GetString(i)); } - int end = SerDe.ReadInt32(outputStream); - Assert.Equal(0, end); + CheckEOS(outputStream, ipcOptions); // Validate all the data on the stream is read. Assert.Equal(outputStream.Length, outputStream.Position); } - [Fact] - public async Task TestArrowSqlCommandExecutorWithMultiCommands() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + + public async Task TestArrowSqlCommandExecutorWithMultiCommands( + Version sparkVersion, + IpcOptions ipcOptions) { var udfWrapper1 = new Sql.ArrowUdfWrapper( (strings) => (StringArray)ToArrowArray( @@ -427,7 +454,8 @@ public async Task TestArrowSqlCommandExecutorWithMultiCommands() .Field(b => b.Name("arg2").DataType(Int32Type.Default)) .Field(b => b.Name("arg3").DataType(Int32Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema); + var arrowWriter = + new ArrowStreamWriter(inputStream, schema, leaveOpen: false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -444,7 +472,7 @@ await arrowWriter.WriteRecordBatchAsync( inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -471,15 +499,18 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(i * i, array2.Values[i]); } - int end = SerDe.ReadInt32(outputStream); - Assert.Equal(0, end); + CheckEOS(outputStream, ipcOptions); // Validate all the data on the stream is read. Assert.Equal(outputStream.Length, outputStream.Position); } - [Fact] - public async Task TestDataFrameSqlCommandExecutorWithMultiCommands() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + + public async Task TestDataFrameSqlCommandExecutorWithMultiCommands( + Version sparkVersion, + IpcOptions ipcOptions) { var udfWrapper1 = new Sql.DataFrameUdfWrapper( (strings) => strings.Apply(cur => $"udf: {cur}")); @@ -521,7 +552,8 @@ public async Task TestDataFrameSqlCommandExecutorWithMultiCommands() .Field(b => b.Name("arg2").DataType(Int32Type.Default)) .Field(b => b.Name("arg3").DataType(Int32Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema); + var arrowWriter = + new ArrowStreamWriter(inputStream, schema, leaveOpen: false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -538,7 +570,7 @@ await arrowWriter.WriteRecordBatchAsync( inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -565,8 +597,7 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(i * i, array2.Values[i]); } - int end = SerDe.ReadInt32(outputStream); - Assert.Equal(0, end); + CheckEOS(outputStream, ipcOptions); // Validate all the data on the stream is read. Assert.Equal(outputStream.Length, outputStream.Position); @@ -577,8 +608,12 @@ await arrowWriter.WriteRecordBatchAsync( /// Schema, and no record batches, that CommandExecutor writes the /// appropriate response back. /// - [Fact] - public void TestArrowSqlCommandExecutorWithEmptyInput() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + + public void TestArrowSqlCommandExecutorWithEmptyInput( + Version sparkVersion, + IpcOptions ipcOptions) { var udfWrapper = new Sql.ArrowUdfWrapper( (strings) => (StringArray)ToArrowArray( @@ -607,7 +642,8 @@ public void TestArrowSqlCommandExecutorWithEmptyInput() Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema); + var arrowWriter = + new ArrowStreamWriter(inputStream, schema, leaveOpen: false, ipcOptions); // The .NET ArrowStreamWriter doesn't currently support writing just a // schema with no batches - but Java does. We use Reflection to simulate @@ -624,7 +660,7 @@ public void TestArrowSqlCommandExecutorWithEmptyInput() inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -650,8 +686,7 @@ public void TestArrowSqlCommandExecutorWithEmptyInput() var array = (StringArray)outputBatch.Arrays.ElementAt(0); Assert.Equal(0, array.Length); - int end = SerDe.ReadInt32(outputStream); - Assert.Equal(0, end); + CheckEOS(outputStream, ipcOptions); // Validate all the data on the stream is read. Assert.Equal(outputStream.Length, outputStream.Position); @@ -662,8 +697,12 @@ public void TestArrowSqlCommandExecutorWithEmptyInput() /// Schema, and no record batches, that CommandExecutor writes the /// appropriate response back. /// - [Fact] - public void TestDataFrameSqlCommandExecutorWithEmptyInput() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + + public void TestDataFrameSqlCommandExecutorWithEmptyInput( + Version sparkVersion, + IpcOptions ipcOptions) { var udfWrapper = new Sql.DataFrameUdfWrapper( (strings) => strings.Apply(cur=> $"udf: {cur}")); @@ -689,7 +728,7 @@ public void TestDataFrameSqlCommandExecutorWithEmptyInput() Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema); + var arrowWriter = new ArrowStreamWriter(inputStream, schema, false, ipcOptions); // The .NET ArrowStreamWriter doesn't currently support writing just a // schema with no batches - but Java does. We use Reflection to simulate @@ -706,7 +745,7 @@ public void TestDataFrameSqlCommandExecutorWithEmptyInput() inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -732,15 +771,18 @@ public void TestDataFrameSqlCommandExecutorWithEmptyInput() var array = (StringArray)outputBatch.Arrays.ElementAt(0); Assert.Equal(0, array.Length); - int end = SerDe.ReadInt32(outputStream); - Assert.Equal(0, end); + CheckEOS(outputStream, ipcOptions); // Validate all the data on the stream is read. Assert.Equal(outputStream.Length, outputStream.Position); } - [Fact] - public async Task TestArrowGroupedMapCommandExecutor() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + + public async Task TestArrowGroupedMapCommandExecutor( + Version sparkVersion, + IpcOptions ipcOptions) { StringArray ConvertStrings(StringArray strings) { @@ -793,11 +835,12 @@ Int64Array ConvertInt64s(Int64Array int64s) int numRows = 10; // Write test data to the input stream. - var schema = new Schema.Builder() + Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Field(b => b.Name("arg2").DataType(Int64Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema); + var arrowWriter = + new ArrowStreamWriter(inputStream, schema, leaveOpen: false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -816,7 +859,7 @@ await arrowWriter.WriteRecordBatchAsync( inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -848,15 +891,18 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(100 + i, longArray.Values[i]); } - int end = SerDe.ReadInt32(outputStream); - Assert.Equal(0, end); + CheckEOS(outputStream, ipcOptions); // Validate all the data on the stream is read. Assert.Equal(outputStream.Length, outputStream.Position); } - [Fact] - public async Task TestDataFrameGroupedMapCommandExecutor() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] + + public async Task TestDataFrameGroupedMapCommandExecutor( + Version sparkVersion, + IpcOptions ipcOptions) { ArrowStringDataFrameColumn ConvertStrings(ArrowStringDataFrameColumn strings) { @@ -896,11 +942,12 @@ ArrowStringDataFrameColumn ConvertStrings(ArrowStringDataFrameColumn strings) int numRows = 10; // Write test data to the input stream. - var schema = new Schema.Builder() + Schema schema = new Schema.Builder() .Field(b => b.Name("arg1").DataType(StringType.Default)) .Field(b => b.Name("arg2").DataType(Int64Type.Default)) .Build(); - var arrowWriter = new ArrowStreamWriter(inputStream, schema); + var arrowWriter = + new ArrowStreamWriter(inputStream, schema, leaveOpen: false, ipcOptions); await arrowWriter.WriteRecordBatchAsync( new RecordBatch( schema, @@ -919,7 +966,7 @@ await arrowWriter.WriteRecordBatchAsync( inputStream.Seek(0, SeekOrigin.Begin); - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -951,15 +998,17 @@ await arrowWriter.WriteRecordBatchAsync( Assert.Equal(100 + i, doubleArray.Values[i]); } - int end = SerDe.ReadInt32(outputStream); - Assert.Equal(0, end); + CheckEOS(outputStream, ipcOptions); // Validate all the data on the stream is read. Assert.Equal(outputStream.Length, outputStream.Position); } - [Fact] - public void TestRDDCommandExecutor() + [Theory] + [MemberData(nameof(CommandExecutorData.Data), MemberType = typeof(CommandExecutorData))] +#pragma warning disable xUnit1026 // Theory methods should use all of their parameters + public void TestRDDCommandExecutor(Version sparkVersion, IpcOptions ipcOptions) +#pragma warning restore xUnit1026 // Theory methods should use all of their parameters { static int mapUdf(int a) => a + 3; var command = new RDDCommand() @@ -1002,7 +1051,7 @@ public void TestRDDCommandExecutor() inputStream.Seek(0, SeekOrigin.Begin); // Execute the command. - CommandExecutorStat stat = new CommandExecutor().Execute( + CommandExecutorStat stat = new CommandExecutor(sparkVersion).Execute( inputStream, outputStream, 0, @@ -1026,5 +1075,42 @@ public void TestRDDCommandExecutor() // Validate all the data on the stream is read. Assert.Equal(outputStream.Length, outputStream.Position); } + + private void CheckEOS(Stream stream, IpcOptions ipcOptions) + { + if (!ipcOptions.WriteLegacyIpcFormat) + { + int continuationToken = SerDe.ReadInt32(stream); + Assert.Equal(-1, continuationToken); + } + + int end = SerDe.ReadInt32(stream); + Assert.Equal(0, end); + } + } + + public class CommandExecutorData + { + // CommandExecutor only changes its behavior between major versions. + public static IEnumerable Data => + new List + { + new object[] + { + new Version(Versions.V2_4_2), + new IpcOptions + { + WriteLegacyIpcFormat = true + } + }, + new object[] + { + new Version(Versions.V3_0_0), + new IpcOptions + { + WriteLegacyIpcFormat = false + } + } + }; } } diff --git a/src/csharp/Microsoft.Spark.Worker/Command/CommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/CommandExecutor.cs index c981842f6..cc0f2bee3 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/CommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/CommandExecutor.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System; using System.IO; using System.Linq; @@ -24,6 +25,13 @@ internal sealed class CommandExecutorStat /// internal sealed class CommandExecutor { + private readonly Version _version; + + internal CommandExecutor(Version version) + { + _version = version; + } + /// /// Executes the commands on the input data read from input stream /// and writes results to the output stream. @@ -55,11 +63,11 @@ internal CommandExecutorStat Execute( } return SqlCommandExecutor.Execute( + _version, inputStream, outputStream, commandPayload.EvalType, commandPayload.Commands.Cast().ToArray()); - } } } diff --git a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs index f6fffd31e..930e440be 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/SqlCommandExecutor.cs @@ -30,12 +30,14 @@ internal abstract class SqlCommandExecutor /// Executes the commands on the input data read from input stream /// and writes results to the output stream. /// + /// Spark version /// Input stream to read data from /// Output stream to write results to /// Evaluation type for the current commands /// Contains the commands to execute /// Statistics captured during the Execute() run internal static CommandExecutorStat Execute( + Version version, Stream inputStream, Stream outputStream, UdfUtils.PythonEvalType evalType, @@ -60,11 +62,11 @@ internal static CommandExecutorStat Execute( } else if (evalType == UdfUtils.PythonEvalType.SQL_SCALAR_PANDAS_UDF) { - executor = new ArrowOrDataFrameSqlCommandExecutor(); + executor = new ArrowOrDataFrameSqlCommandExecutor(version); } else if (evalType == UdfUtils.PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF) { - executor = new ArrowOrDataFrameGroupedMapCommandExecutor(); + executor = new ArrowOrDataFrameGroupedMapCommandExecutor(version); } else { @@ -176,7 +178,7 @@ private void WriteOutput(Stream stream, IEnumerable rows, int sizeHint) if (s_outputBuffer == null) s_outputBuffer = new byte[sizeHint]; - Pickler pickler = s_pickler ?? (s_pickler = new Pickler(false)); + Pickler pickler = s_pickler ??= new Pickler(false); pickler.dumps(rows, ref s_outputBuffer, out int bytesWritten); if (bytesWritten <= 0) @@ -298,39 +300,65 @@ public object Run(int splitId, object input) internal abstract class ArrowBasedCommandExecutor : SqlCommandExecutor { - protected IEnumerable GetInputIterator(Stream inputStream) - { - using (var reader = new ArrowStreamReader(inputStream, leaveOpen: true)) + protected Version _version; + + protected IpcOptions ArrowIpcOptions() => + new IpcOptions { - RecordBatch batch; - bool returnedResult = false; - while ((batch = reader.ReadNextRecordBatch()) != null) + WriteLegacyIpcFormat = _version.Major switch { - yield return batch; - returnedResult = true; + 2 => true, + 3 => false, + _ => throw new NotSupportedException($"Spark {_version} not supported.") } + }; - if (!returnedResult) - { - // When no input batches were received, return an empty RecordBatch - // in order to create and write back the result schema. + protected IEnumerable GetInputIterator(Stream inputStream) + { + using var reader = new ArrowStreamReader(inputStream, leaveOpen: true); + RecordBatch batch; + bool returnedResult = false; + while ((batch = reader.ReadNextRecordBatch()) != null) + { + yield return batch; + returnedResult = true; + } - int columnCount = reader.Schema.Fields.Count; - var arrays = new IArrowArray[columnCount]; - for (int i = 0; i < columnCount; ++i) - { - IArrowType type = reader.Schema.GetFieldByIndex(i).DataType; - arrays[i] = ArrowArrayHelpers.CreateEmptyArray(type); - } + if (!returnedResult) + { + // When no input batches were received, return an empty RecordBatch + // in order to create and write back the result schema. - yield return new RecordBatch(reader.Schema, arrays, 0); + int columnCount = reader.Schema.Fields.Count; + var arrays = new IArrowArray[columnCount]; + for (int i = 0; i < columnCount; ++i) + { + IArrowType type = reader.Schema.GetFieldByIndex(i).DataType; + arrays[i] = ArrowArrayHelpers.CreateEmptyArray(type); } + + yield return new RecordBatch(reader.Schema, arrays, 0); + } + } + + protected void WriteEnd(Stream stream, IpcOptions ipcOptions) + { + if (!ipcOptions.WriteLegacyIpcFormat) + { + SerDe.Write(stream, -1); } + + SerDe.Write(stream, 0); } } internal class ArrowOrDataFrameSqlCommandExecutor : ArrowBasedCommandExecutor { + internal ArrowOrDataFrameSqlCommandExecutor(Version version) + { + _version = version; + } + protected internal override CommandExecutorStat ExecuteCore( Stream inputStream, Stream outputStream, @@ -372,6 +400,7 @@ private CommandExecutorStat ExecuteArrowSqlCommand( SerDe.Write(outputStream, (int)SpecialLengths.START_ARROW_STREAM); + IpcOptions ipcOptions = ArrowIpcOptions(); ArrowStreamWriter writer = null; Schema resultSchema = null; foreach (ReadOnlyMemory input in GetArrowInputIterator(inputStream)) @@ -387,7 +416,8 @@ private CommandExecutorStat ExecuteArrowSqlCommand( Debug.Assert(resultSchema == null); resultSchema = BuildSchema(results); - writer = new ArrowStreamWriter(outputStream, resultSchema, leaveOpen: true); + writer = + new ArrowStreamWriter(outputStream, resultSchema, leaveOpen: true, ipcOptions); } var recordBatch = new RecordBatch(resultSchema, results, numEntries); @@ -396,12 +426,8 @@ private CommandExecutorStat ExecuteArrowSqlCommand( writer.WriteRecordBatchAsync(recordBatch).GetAwaiter().GetResult(); } - SerDe.Write(outputStream, 0); - - if (writer != null) - { - writer.Dispose(); - } + WriteEnd(outputStream, ipcOptions); + writer?.Dispose(); return stat; } @@ -416,6 +442,7 @@ private CommandExecutorStat ExecuteDataFrameSqlCommand( SerDe.Write(outputStream, (int)SpecialLengths.START_ARROW_STREAM); + IpcOptions ipcOptions = ArrowIpcOptions(); ArrowStreamWriter writer = null; foreach (RecordBatch input in GetInputIterator(inputStream)) { @@ -437,7 +464,8 @@ private CommandExecutorStat ExecuteDataFrameSqlCommand( if (writer == null) { - writer = new ArrowStreamWriter(outputStream, result.Schema, leaveOpen: true); + writer = + new ArrowStreamWriter(outputStream, result.Schema, leaveOpen: true, ipcOptions); } // TODO: Remove sync-over-async once WriteRecordBatch exists. @@ -445,12 +473,8 @@ private CommandExecutorStat ExecuteDataFrameSqlCommand( } } - SerDe.Write(outputStream, 0); - - if (writer != null) - { - writer.Dispose(); - } + WriteEnd(outputStream, ipcOptions); + writer?.Dispose(); return stat; } @@ -466,39 +490,37 @@ private IEnumerable> GetArrowInputIterator(Stream in int columnCount = 0; try { - using (var reader = new ArrowStreamReader(inputStream, leaveOpen: true)) + using var reader = new ArrowStreamReader(inputStream, leaveOpen: true); + RecordBatch batch; + while ((batch = reader.ReadNextRecordBatch()) != null) { - RecordBatch batch; - while ((batch = reader.ReadNextRecordBatch()) != null) + columnCount = batch.ColumnCount; + if (arrays == null) { - columnCount = batch.ColumnCount; - if (arrays == null) - { - // Note that every batch in a stream has the same schema. - arrays = ArrayPool.Shared.Rent(columnCount); - } - - for (int i = 0; i < columnCount; ++i) - { - arrays[i] = batch.Column(i); - } - - yield return new ReadOnlyMemory(arrays, 0, columnCount); + // Note that every batch in a stream has the same schema. + arrays = ArrayPool.Shared.Rent(columnCount); } - if (arrays == null) + for (int i = 0; i < columnCount; ++i) { - // When no input batches were received, return empty IArrowArrays - // in order to create and write back the result schema. - columnCount = reader.Schema.Fields.Count; - arrays = ArrayPool.Shared.Rent(columnCount); + arrays[i] = batch.Column(i); + } - for (int i = 0; i < columnCount; ++i) - { - arrays[i] = null; - } - yield return new ReadOnlyMemory(arrays, 0, columnCount); + yield return new ReadOnlyMemory(arrays, 0, columnCount); + } + + if (arrays == null) + { + // When no input batches were received, return empty IArrowArrays + // in order to create and write back the result schema. + columnCount = reader.Schema.Fields.Count; + arrays = ArrayPool.Shared.Rent(columnCount); + + for (int i = 0; i < columnCount; ++i) + { + arrays[i] = null; } + yield return new ReadOnlyMemory(arrays, 0, columnCount); } } finally @@ -677,6 +699,11 @@ public DataFrameColumn[] Run(ReadOnlyMemory input) internal class ArrowOrDataFrameGroupedMapCommandExecutor : ArrowOrDataFrameSqlCommandExecutor { + internal ArrowOrDataFrameGroupedMapCommandExecutor(Version version) + : base(version) + { + } + protected internal override CommandExecutorStat ExecuteCore( Stream inputStream, Stream outputStream, @@ -723,6 +750,7 @@ private CommandExecutorStat ExecuteArrowGroupedMapCommand( SerDe.Write(outputStream, (int)SpecialLengths.START_ARROW_STREAM); + IpcOptions ipcOptions = ArrowIpcOptions(); ArrowStreamWriter writer = null; foreach (RecordBatch input in GetInputIterator(inputStream)) { @@ -733,19 +761,16 @@ private CommandExecutorStat ExecuteArrowGroupedMapCommand( if (writer == null) { - writer = new ArrowStreamWriter(outputStream, result.Schema, leaveOpen: true); + writer = + new ArrowStreamWriter(outputStream, result.Schema, leaveOpen: true, ipcOptions); } // TODO: Remove sync-over-async once WriteRecordBatch exists. writer.WriteRecordBatchAsync(result).GetAwaiter().GetResult(); } - SerDe.Write(outputStream, 0); - - if (writer != null) - { - writer.Dispose(); - } + WriteEnd(outputStream, ipcOptions); + writer?.Dispose(); return stat; } @@ -763,6 +788,7 @@ private CommandExecutorStat ExecuteDataFrameGroupedMapCommand( SerDe.Write(outputStream, (int)SpecialLengths.START_ARROW_STREAM); + IpcOptions ipcOptions = ArrowIpcOptions(); ArrowStreamWriter writer = null; foreach (RecordBatch input in GetInputIterator(inputStream)) { @@ -776,7 +802,8 @@ private CommandExecutorStat ExecuteDataFrameGroupedMapCommand( if (writer == null) { - writer = new ArrowStreamWriter(outputStream, result.Schema, leaveOpen: true); + writer = + new ArrowStreamWriter(outputStream, result.Schema, leaveOpen: true, ipcOptions); } // TODO: Remove sync-over-async once WriteRecordBatch exists. @@ -784,12 +811,8 @@ private CommandExecutorStat ExecuteDataFrameGroupedMapCommand( } } - SerDe.Write(outputStream, 0); - - if (writer != null) - { - writer.Dispose(); - } + WriteEnd(outputStream, ipcOptions); + writer?.Dispose(); return stat; } diff --git a/src/csharp/Microsoft.Spark.Worker/TaskRunner.cs b/src/csharp/Microsoft.Spark.Worker/TaskRunner.cs index 2c15c2bf4..aab61b6cf 100644 --- a/src/csharp/Microsoft.Spark.Worker/TaskRunner.cs +++ b/src/csharp/Microsoft.Spark.Worker/TaskRunner.cs @@ -150,7 +150,7 @@ private Payload ProcessStream( DateTime initTime = DateTime.UtcNow; - CommandExecutorStat commandExecutorStat = new CommandExecutor().Execute( + CommandExecutorStat commandExecutorStat = new CommandExecutor(version).Execute( inputStream, outputStream, payload.SplitIndex, diff --git a/src/csharp/Microsoft.Spark/Broadcast.cs b/src/csharp/Microsoft.Spark/Broadcast.cs index 2791ec546..99025b7c2 100644 --- a/src/csharp/Microsoft.Spark/Broadcast.cs +++ b/src/csharp/Microsoft.Spark/Broadcast.cs @@ -128,6 +128,7 @@ private JvmObjectReference CreateBroadcast(SparkContext sc, T value) CreateBroadcast_V2_3_1_AndBelow(javaSparkContext, value), (2, 3) => CreateBroadcast_V2_3_2_AndAbove(javaSparkContext, sc, value), (2, 4) => CreateBroadcast_V2_3_2_AndAbove(javaSparkContext, sc, value), + (3, 0) => CreateBroadcast_V2_3_2_AndAbove(javaSparkContext, sc, value), _ => throw new NotSupportedException($"Spark {version} not supported.") }; } diff --git a/src/csharp/Microsoft.Spark/Microsoft.Spark.csproj b/src/csharp/Microsoft.Spark/Microsoft.Spark.csproj index 2cddc5627..0205701f2 100644 --- a/src/csharp/Microsoft.Spark/Microsoft.Spark.csproj +++ b/src/csharp/Microsoft.Spark/Microsoft.Spark.csproj @@ -29,7 +29,7 @@ - + diff --git a/src/csharp/Microsoft.Spark/Utils/UdfUtils.cs b/src/csharp/Microsoft.Spark/Utils/UdfUtils.cs index ccb5e5209..66c96a2a2 100644 --- a/src/csharp/Microsoft.Spark/Utils/UdfUtils.cs +++ b/src/csharp/Microsoft.Spark/Utils/UdfUtils.cs @@ -72,7 +72,11 @@ internal enum PythonEvalType SQL_SCALAR_PANDAS_UDF = 200, SQL_GROUPED_MAP_PANDAS_UDF = 201, SQL_GROUPED_AGG_PANDAS_UDF = 202, - SQL_WINDOW_AGG_PANDAS_UDF = 203 + SQL_WINDOW_AGG_PANDAS_UDF = 203, + + SQL_SCALAR_PANDAS_ITER_UDF = 204, + SQL_MAP_PANDAS_ITER_UDF = 205, + SQL_COGROUPED_MAP_PANDAS_UDF = 206 } ///