diff --git a/azure-pipelines-e2e-tests-template.yml b/azure-pipelines-e2e-tests-template.yml
index 17b9580e7..6f926f9d7 100644
--- a/azure-pipelines-e2e-tests-template.yml
+++ b/azure-pipelines-e2e-tests-template.yml
@@ -121,15 +121,38 @@ stages:
targetType: inline
script: |
echo "Download Hadoop utils for Windows."
- curl -k -L -o hadoop.zip https://github.com/steveloughran/winutils/releases/download/tag_2017-08-29-hadoop-2.8.1-native/hadoop-2.8.1.zip
+ $hadoopBinaryUrl = "https://github.com/steveloughran/winutils/releases/download/tag_2017-08-29-hadoop-2.8.1-native/hadoop-2.8.1.zip"
+ # Spark 3.3.3 version binary use Hadoop3 dependency
+ if ("3.3.3" -contains "${{ test.version }}") {
+ $hadoopBinaryUrl = "https://github.com/SparkSnail/winutils/releases/download/hadoop-3.3.5/hadoop-3.3.5.zip"
+ }
+ curl -k -L -o hadoop.zip $hadoopBinaryUrl
Expand-Archive -Path hadoop.zip -Destination .
New-Item -ItemType Directory -Force -Path hadoop\bin
- cp hadoop-2.8.1\winutils.exe hadoop\bin
+ if ("3.3.3" -contains "${{ test.version }}") {
+ cp hadoop-3.3.5\winutils.exe hadoop\bin
+ # Hadoop 3.3 need to add hadoop.dll to environment varibles to avoid UnsatisfiedLinkError
+ cp hadoop-3.3.5\hadoop.dll hadoop\bin
+ cp hadoop-3.3.5\hadoop.dll C:\Windows\System32
+ [System.Environment]::SetEnvironmentVariable("PATH", $Env:Path + ";$(Build.BinariesDirectory)$(PATH_SEPARATOR)hadoop", [System.EnvironmentVariableTarget]::Machine)
+ } else {
+ cp hadoop-2.8.1\winutils.exe hadoop\bin
+ }
- pwsh: |
echo "Downloading Spark ${{ test.version }}"
- curl -k -L -o spark-${{ test.version }}.tgz https://archive.apache.org/dist/spark/spark-${{ test.version }}/spark-${{ test.version }}-bin-hadoop2.7.tgz
+ $sparkBinaryName = "spark-${{ test.version }}-bin-hadoop2.7"
+ # In spark 3.3.0, 3.3.1, 3.3.2, 3.3.4, the binary name with hadoop2 dependency has changed to spark-${{ test.version }}-bin-hadoop2.tgz
+ if ("3.3.0", "3.3.1", "3.3.2", "3.3.4" -contains "${{ test.version }}") {
+ $sparkBinaryName = "spark-${{ test.version }}-bin-hadoop2"
+ }
+ # In spark 3.3.3, the binary don't provide hadoop2 version, so we use hadoop3 version
+ if ("3.3.3" -contains "${{ test.version }}") {
+ $sparkBinaryName = "spark-${{ test.version }}-bin-hadoop3"
+ }
+ curl -k -L -o spark-${{ test.version }}.tgz https://archive.apache.org/dist/spark/spark-${{ test.version }}/${sparkBinaryName}.tgz
tar xzvf spark-${{ test.version }}.tgz
+ move $sparkBinaryName spark-${{ test.version }}-bin-hadoop
displayName: 'Download Spark Distro ${{ test.version }}'
workingDirectory: $(Build.BinariesDirectory)
@@ -142,7 +165,7 @@ stages:
workingDirectory: $(Build.SourcesDirectory)$(PATH_SEPARATOR)dotnet-spark
env:
HADOOP_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)hadoop
- SPARK_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)spark-${{ test.version }}-bin-hadoop2.7
+ SPARK_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)spark-${{ test.version }}-bin-hadoop
DOTNET_WORKER_DIR: $(CURRENT_DOTNET_WORKER_DIR)
- pwsh: |
@@ -167,7 +190,7 @@ stages:
workingDirectory: $(Build.SourcesDirectory)$(PATH_SEPARATOR)dotnet-spark
env:
HADOOP_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)hadoop
- SPARK_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)spark-${{ test.version }}-bin-hadoop2.7
+ SPARK_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)spark-${{ test.version }}-bin-hadoop
DOTNET_WORKER_DIR: $(BACKWARD_COMPATIBLE_DOTNET_WORKER_DIR)
- checkout: forwardCompatibleRelease
@@ -189,5 +212,5 @@ stages:
workingDirectory: $(Build.SourcesDirectory)$(PATH_SEPARATOR)dotnet-spark-${{ parameters.forwardCompatibleRelease }}
env:
HADOOP_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)hadoop
- SPARK_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)spark-${{ test.version }}-bin-hadoop2.7
+ SPARK_HOME: $(Build.BinariesDirectory)$(PATH_SEPARATOR)spark-${{ test.version }}-bin-hadoop
DOTNET_WORKER_DIR: $(CURRENT_DOTNET_WORKER_DIR)
diff --git a/azure-pipelines-pr.yml b/azure-pipelines-pr.yml
index b99408a55..d213cc574 100644
--- a/azure-pipelines-pr.yml
+++ b/azure-pipelines-pr.yml
@@ -37,6 +37,11 @@ variables:
backwardCompatibleTestOptions_Linux_3_2: $(backwardCompatibleTestOptions_Windows_3_2)
forwardCompatibleTestOptions_Linux_3_2: $(backwardCompatibleTestOptions_Windows_3_2)
+ backwardCompatibleTestOptions_Windows_3_3: "--filter FullyQualifiedName=NONE"
+ forwardCompatibleTestOptions_Windows_3_3: $(backwardCompatibleTestOptions_Windows_3_3)
+ backwardCompatibleTestOptions_Linux_3_3: $(backwardCompatibleTestOptions_Windows_3_3)
+ forwardCompatibleTestOptions_Linux_3_3: $(backwardCompatibleTestOptions_Windows_3_3)
+
# Azure DevOps variables are transformed into environment variables, with these variables we
# avoid the first time experience and telemetry to speed up the build.
DOTNET_CLI_TELEMETRY_OPTOUT: 1
@@ -63,6 +68,11 @@ parameters:
- '3.2.1'
- '3.2.2'
- '3.2.3'
+ - '3.3.0'
+ - '3.3.1'
+ - '3.3.2'
+ - '3.3.3'
+ - '3.3.4'
# List of OS types to run E2E tests, run each test in both 'Windows' and 'Linux' environments
- name: listOfE2ETestsPoolTypes
type: object
diff --git a/src/csharp/Extensions/Microsoft.Spark.Extensions.Delta.E2ETest/DeltaFixture.cs b/src/csharp/Extensions/Microsoft.Spark.Extensions.Delta.E2ETest/DeltaFixture.cs
index 6ed6cbc1d..c893336f3 100644
--- a/src/csharp/Extensions/Microsoft.Spark.Extensions.Delta.E2ETest/DeltaFixture.cs
+++ b/src/csharp/Extensions/Microsoft.Spark.Extensions.Delta.E2ETest/DeltaFixture.cs
@@ -16,12 +16,17 @@ public class DeltaFixture
public DeltaFixture()
{
Version sparkVersion = SparkSettings.Version;
- string deltaVersion = (sparkVersion.Major, sparkVersion.Minor) switch
+ string deltaVersion = (sparkVersion.Major, sparkVersion.Minor, sparkVersion.Build) switch
{
- (2, _) => "delta-core_2.11:0.6.1",
- (3, 0) => "delta-core_2.12:0.8.0",
- (3, 1) => "delta-core_2.12:1.0.0",
- (3, 2) => "delta-core_2.12:1.1.0",
+ (2, _, _) => "delta-core_2.11:0.6.1",
+ (3, 0, _) => "delta-core_2.12:0.8.0",
+ (3, 1, _) => "delta-core_2.12:1.0.0",
+ (3, 2, _) => "delta-core_2.12:1.1.0",
+ (3, 3, 0) => "delta-core_2.12:2.1.0",
+ (3, 3, 1) => "delta-core_2.12:2.1.0",
+ (3, 3, 2) => "delta-core_2.12:2.3.0",
+ (3, 3, 3) => "delta-core_2.12:2.3.0",
+ (3, 3, 4) => "delta-core_2.12:2.3.0",
_ => throw new NotSupportedException($"Spark {sparkVersion} not supported.")
};
diff --git a/src/csharp/Microsoft.Spark.Worker.UnitTest/PayloadWriter.cs b/src/csharp/Microsoft.Spark.Worker.UnitTest/PayloadWriter.cs
index d2a2fa70a..4798950c4 100644
--- a/src/csharp/Microsoft.Spark.Worker.UnitTest/PayloadWriter.cs
+++ b/src/csharp/Microsoft.Spark.Worker.UnitTest/PayloadWriter.cs
@@ -112,6 +112,45 @@ public void Write(Stream stream, TaskContext taskContext)
}
}
+ ///
+ /// TaskContextWriter for version 3.3.*.
+ ///
+ internal sealed class TaskContextWriterV3_3_X : ITaskContextWriter
+ {
+ public void Write(Stream stream, TaskContext taskContext)
+ {
+ SerDe.Write(stream, taskContext.IsBarrier);
+ SerDe.Write(stream, taskContext.Port);
+ SerDe.Write(stream, taskContext.Secret);
+
+ SerDe.Write(stream, taskContext.StageId);
+ SerDe.Write(stream, taskContext.PartitionId);
+ SerDe.Write(stream, taskContext.AttemptNumber);
+ SerDe.Write(stream, taskContext.AttemptId);
+ // Add CPUs field for spark 3.3.x
+ SerDe.Write(stream, taskContext.CPUs);
+
+ SerDe.Write(stream, taskContext.Resources.Count());
+ foreach (TaskContext.Resource resource in taskContext.Resources)
+ {
+ SerDe.Write(stream, resource.Key);
+ SerDe.Write(stream, resource.Value);
+ SerDe.Write(stream, resource.Addresses.Count());
+ foreach (string address in resource.Addresses)
+ {
+ SerDe.Write(stream, address);
+ }
+ }
+
+ SerDe.Write(stream, taskContext.LocalProperties.Count);
+ foreach (KeyValuePair kv in taskContext.LocalProperties)
+ {
+ SerDe.Write(stream, kv.Key);
+ SerDe.Write(stream, kv.Value);
+ }
+ }
+ }
+
///////////////////////////////////////////////////////////////////////////
// BroadcastVariable writer for different Spark versions.
///////////////////////////////////////////////////////////////////////////
@@ -311,6 +350,12 @@ internal PayloadWriter Create(Version version = null)
new TaskContextWriterV3_0_X(),
new BroadcastVariableWriterV2_4_X(),
new CommandWriterV2_4_X());
+ case Versions.V3_3_0:
+ return new PayloadWriter(
+ version,
+ new TaskContextWriterV3_3_X(),
+ new BroadcastVariableWriterV2_4_X(),
+ new CommandWriterV2_4_X());
default:
throw new NotSupportedException($"Spark {version} is not supported.");
}
diff --git a/src/csharp/Microsoft.Spark.Worker.UnitTest/TestData.cs b/src/csharp/Microsoft.Spark.Worker.UnitTest/TestData.cs
index dfe255b27..b7a751317 100644
--- a/src/csharp/Microsoft.Spark.Worker.UnitTest/TestData.cs
+++ b/src/csharp/Microsoft.Spark.Worker.UnitTest/TestData.cs
@@ -19,6 +19,7 @@ public static IEnumerable