Skip to content

Commit ed4adca

Browse files
author
Jaroslaw Osmanski
committed
Add support for Spark 2.2
1 parent 542eadd commit ed4adca

File tree

65 files changed

+10034
-57
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+10034
-57
lines changed

build/build_and_run_tests.sh

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ run_tests() {
3232
sbt clean
3333
sbt -DSPARK_VERSION=$1 test ds-it
3434
}
35+
run_tests 2.2.0
3536
run_tests 2.1.1
3637
run_tests 2.0.2
3738

build/workflow-executor_build.sh

+1
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,4 @@ build_workflowexecutor "2.0.1"
2929
build_workflowexecutor "2.0.2"
3030
build_workflowexecutor "2.1.0"
3131
build_workflowexecutor "2.1.1"
32+
build_workflowexecutor "2.2.0"

deployment/spark-docker/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ ENV SPARK_PACKAGE spark-$SPARK_VERSION-bin-hadoop$HADOOP_VERSION
2525
ENV SPARK_HOME /opt/spark-$SPARK_VERSION
2626
ENV PATH $PATH:$SPARK_HOME/bin
2727
RUN wget -q -O - \
28-
"http://d3kbcqa49mib13.cloudfront.net/$SPARK_PACKAGE.tgz" \
28+
"https://archive.apache.org/dist/spark/spark-$SPARK_VERSION/$SPARK_PACKAGE.tgz" \
2929
| gunzip \
3030
| tar x -C /tmp/ \
3131
&& mv /tmp/$SPARK_PACKAGE $SPARK_HOME \

project/Dependencies.scala

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ object Version {
3333

3434
val spark = sys.props.getOrElse("SPARK_VERSION", "2.1.1")
3535
val (scala, hadoop, akka, sprayRoutingLib) = spark match {
36+
case "2.2.0" => ("2.11.8", "2.7", "2.4.9", "routing-shapeless2")
3637
case "2.1.0" | "2.1.1" => ("2.11.8", "2.7", "2.4.9", "routing-shapeless2")
3738
case "2.0.0" | "2.0.1" | "2.0.2" => ("2.11.8", "2.7", "2.4.9", "routing")
3839
}

remote_notebook/code/sparkr_kernel/kernel_init.R

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ assign("sc", get(".sparkRjsc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
1616
sparkSQLSession <- SparkR:::callJMethod(entryPoint, "getNewSparkSQLSession")
1717

1818
sparkVersion <- SparkR:::callJMethod(sc, "version")
19-
if (sparkVersion %in% c("2.0.0", "2.0.1", "2.0.2", "2.1.0", "2.1.1")) {
19+
if (sparkVersion %in% c("2.0.0", "2.0.1", "2.0.2", "2.1.0", "2.1.1", "2.2.0", "2.2.1")) {
2020
assign(".sparkRsession", SparkR:::callJMethod(sparkSQLSession, "getSparkSession"), envir = SparkR:::.sparkREnv)
2121
assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
2222
} else {

seahorse-workflow-executor/build.sbt

+30-2
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,51 @@ lazy val settingsForNotPublished = CommonSettingsPlugin.assemblySettings ++
2222

2323
lazy val sparkVersion = Version.spark
2424

25+
26+
2527
lazy val sparkUtils = sparkVersion match {
2628
case "2.0.0" | "2.0.1" | "2.0.2" =>
2729
val sparkUtils2_0_x = project in file("sparkutils2.0.x") settings settingsForPublished
2830
sparkUtils2_0_x
2931
case "2.1.0" | "2.1.1" =>
3032
val sparkUtils2_1_0 = project in file("sparkutils2.1.x") settings settingsForPublished
3133
sparkUtils2_1_0
34+
case "2.2.0" =>
35+
val sparkUtils2_1_0 = project in file("sparkutils2.2.x") settings settingsForPublished
36+
sparkUtils2_1_0
3237
}
3338

34-
lazy val sparkUtils2_x = project in file(s"sparkutils2.x") dependsOn sparkUtils settings settingsForPublished
39+
lazy val sparkUtils2_x = project in file(s"sparkutils2.x") dependsOn (csvlib, sparkUtils) settings settingsForPublished
40+
41+
lazy val csv2_2 = project in file(s"sparkutilsfeatures/csv2_2") settings settingsForPublished
42+
lazy val csv2_0 = project in file(s"sparkutilsfeatures/csv2_0") dependsOn sparkUtils settings settingsForPublished
43+
44+
lazy val csvlib = sparkVersion match {
45+
case "2.0.0" | "2.0.1" | "2.0.2" =>
46+
csv2_0
47+
case "2.1.0" | "2.1.1" =>
48+
csv2_0
49+
case "2.2.0" =>
50+
csv2_2
51+
}
52+
53+
lazy val readjsondataset = project in file(s"sparkutilsfeatures/readjsondataset") dependsOn sparkUtils2_x settings settingsForPublished
54+
lazy val readjsondataframe = project in file(s"sparkutilsfeatures/readjsondataframe") dependsOn sparkUtils2_x settings settingsForPublished
55+
56+
lazy val readjson = sparkVersion match {
57+
case "2.0.0" | "2.0.1" | "2.0.2" => readjsondataframe
58+
case "2.1.0" | "2.1.1" => readjsondataframe
59+
case "2.2.0" => readjsondataset
60+
}
3561

3662
lazy val rootProject = project
3763
.in(file("."))
3864
.settings(name := "seahorse")
3965
.settings(PublishSettings.disablePublishing)
4066
.aggregate(
4167
api,
68+
csvlib,
69+
readjson,
4270
sparkUtils2_x,
4371
sparkUtils,
4472
commons,
@@ -54,7 +82,7 @@ lazy val api = project settings settingsForPublished
5482

5583
lazy val commons = project dependsOn (api, sparkUtils2_x) settings settingsForPublished
5684

57-
lazy val deeplang = project dependsOn (commons,
85+
lazy val deeplang = project dependsOn (commons, readjson, csvlib,
5886
commons % "test->test",
5987
graph,
6088
graph % "test->test",

seahorse-workflow-executor/build/build_and_run_tests.sh

+4-4
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515

1616

1717
TASKS="clean scalastyle test:scalastyle test it:compile ds-it"
18-
#sbt -DSPARK_VERSION=2.0.0 $TASKS
19-
#sbt -DSPARK_VERSION=2.0.1 $TASKS
20-
#sbt -DSPARK_VERSION=2.1.0 $TASKS
18+
sbt -DSPARK_VERSION=2.0.0 $TASKS
19+
sbt -DSPARK_VERSION=2.0.1 $TASKS
20+
sbt -DSPARK_VERSION=2.1.0 $TASKS
2121
sbt -DSPARK_VERSION=2.1.1 $TASKS
22-
sbt -DSPARK_VERSION=2.0.2 $TASKS
22+
sbt -DSPARK_VERSION=2.2.0 $TASKS

seahorse-workflow-executor/deeplang/src/main/scala/ai/deepsense/deeplang/doperations/readwritedataframe/filestorage/DriverFiles.scala

+9-5
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,20 @@ package ai.deepsense.deeplang.doperations.readwritedataframe.filestorage
1919
import java.io.{File, IOException, PrintWriter}
2020

2121
import scala.io.Source
22+
2223
import org.apache.spark.rdd.RDD
2324
import org.apache.spark.sql.execution.datasources.csv.{DataframeToDriverCsvFileWriter, RawCsvRDDToDataframe}
24-
import org.apache.spark.sql.{SaveMode, DataFrame => SparkDataFrame}
25+
import org.apache.spark.sql.{Dataset, Encoders, Row, SaveMode, DataFrame => SparkDataFrame}
2526
import ai.deepsense.commons.resources.ManagedResource
2627
import ai.deepsense.deeplang.ExecutionContext
2728
import ai.deepsense.deeplang.doperables.dataframe.DataFrame
2829
import ai.deepsense.deeplang.doperations.inout.{InputFileFormatChoice, OutputFileFormatChoice}
2930
import ai.deepsense.deeplang.doperations.readwritedataframe.filestorage.csv.CsvOptions
3031
import ai.deepsense.deeplang.doperations.readwritedataframe.{FilePath, FileScheme}
32+
import ai.deepsense.deeplang.readjsondataset.JsonReader
3133
import ai.deepsense.sparkutils.SQL
3234

33-
object DriverFiles {
35+
object DriverFiles extends JsonReader {
3436

3537
def read(driverPath: String, fileFormat: InputFileFormatChoice)
3638
(implicit context: ExecutionContext): SparkDataFrame = fileFormat match {
@@ -59,13 +61,14 @@ object DriverFiles {
5961
val lines = Source.fromFile(driverPath).getLines().toStream
6062
val fileLinesRdd = context.sparkContext.parallelize(lines)
6163

62-
RawCsvRDDToDataframe.parse(fileLinesRdd, context.sparkSQLSession, params)
64+
RawCsvRDDToDataframe.parse(fileLinesRdd, context.sparkSQLSession.sparkSession, params)
6365
}
6466

6567
private def readJson(driverPath: String)(implicit context: ExecutionContext) = {
6668
val lines = Source.fromFile(driverPath).getLines().toStream
6769
val fileLinesRdd = context.sparkContext.parallelize(lines)
68-
context.sparkSQLSession.read.json(fileLinesRdd)
70+
val sparkSession = context.sparkSQLSession.sparkSession
71+
readJsonFromRdd(fileLinesRdd, sparkSession)
6972
}
7073

7174
private def writeCsv
@@ -77,7 +80,8 @@ object DriverFiles {
7780
dataFrame.sparkDataFrame,
7881
params,
7982
dataFrame.schema.get,
80-
path.pathWithoutScheme
83+
path.pathWithoutScheme,
84+
context.sparkSQLSession.sparkSession
8185
)
8286
}
8387

seahorse-workflow-executor/docker/spark-standalone-cluster-manage.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ function networkRm {
5555
ACTION=$1
5656
SPARK_VERSION=$2
5757
export SPARK_VERSION=${SPARK_VERSION}
58-
if [ "$SPARK_VERSION" == "2.1.0" ] || [ "$SPARK_VERSION" == "2.1.1" ]; then
58+
if [ "$SPARK_VERSION" == "2.1.0" ] || [ "$SPARK_VERSION" == "2.1.1" ] || [ "$SPARK_VERSION" == "2.2.0" ]; then
5959
export HADOOP_VERSION="2.7"
6060
# We use 2.7.1 for Spark 2.1.x despite the fact that the latter depends on 2.7.3, but 2.7.3
6161
# doesn't have docker image released yet.

seahorse-workflow-executor/project/Dependencies.scala

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ object Version {
2020

2121
val spark = sys.props.getOrElse("SPARK_VERSION", "2.1.1")
2222
val (scala, java, hadoop, akka, apacheCommons) = spark match {
23+
case "2.2.0" => ("2.11.8", "1.8", "2.7.3", "2.4.9", "3.5")
2324
case "2.1.0" | "2.1.1" => ("2.11.8", "1.8", "2.7.3", "2.4.9", "3.5")
2425
case "2.0.0" | "2.0.1" | "2.0.2" => ("2.11.8", "1.8", "2.7.1", "2.4.9", "3.3.+")
2526
}

seahorse-workflow-executor/python/pyexecutor/code_executor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def _run_custom_code(self, workflow_id, node_id, custom_operation_code):
9191

9292
new_sql_context = None
9393
spark_version = self.spark_context.version
94-
if spark_version in ["2.0.0", "2.0.1", "2.0.2", "2.1.0", "2.1.1"]:
94+
if spark_version in ["2.0.0", "2.0.1", "2.0.2", "2.1.0", "2.1.1", "2.2.0"]:
9595
new_sql_context = SQLContext(self.spark_context, new_spark_session)
9696
else:
9797
raise ValueError("Spark version {} is not supported".format(spark_version))

seahorse-workflow-executor/python/pyexecutor/pyexecutor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def _initialize_spark_contexts(gateway):
7171
java_spark_sql_session = gateway.entry_point.getSparkSQLSession()
7272
spark_version = spark_context.version
7373
spark_sql_session = None
74-
if spark_version in ["2.0.0", "2.0.1", "2.0.2", "2.1.0", "2.1.1"]:
74+
if spark_version in ["2.0.0", "2.0.1", "2.0.2", "2.1.0", "2.1.1", "2.2.0"]:
7575
from pyspark.sql import SparkSession
7676
java_spark_session = java_spark_sql_session.getSparkSession()
7777
spark_sql_session = SparkSession(spark_context, java_spark_session)

seahorse-workflow-executor/scalastyle-config.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<parameters>
2323
<parameter name="regex">true</parameter>
2424
<parameter name="header"><![CDATA[/\*\*
25-
\* Copyright 201[567] deepsense.ai \(CodiLime, Inc\)
25+
\* Copyright 201[56789] deepsense.ai \(CodiLime, Inc\)
2626
\*
2727
\* Licensed under the Apache License, Version 2.0 \(the \"License\"\);
2828
\* you may not use this file except in compliance with the License\.

seahorse-workflow-executor/sparkutils2.1.x/src/main/scala/ai/deepsense/sparkutils/readwritedataframe/SparkCsvReader.scala

+6-7
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,18 @@
1414
* limitations under the License.
1515
*/
1616

17+
1718
package org.apache.spark.sql.execution.datasources.csv
1819

1920
import org.apache.spark.rdd.RDD
20-
import org.apache.spark.sql._
21-
import org.apache.spark.sql.execution.LogicalRDD
22-
import org.apache.spark.sql.types.{StructType, _}
21+
2322

2423
object SparkCsvReader {
2524
def create(options: CSVOptions): CsvReader = new CsvReader(options)
2625
def univocityTokenizer(
27-
rdd: RDD[String],
28-
header: Array[String],
29-
firstLine: String,
30-
options: CSVOptions): RDD[Array[String]] =
26+
rdd: RDD[String],
27+
header: Array[String],
28+
firstLine: String,
29+
options: CSVOptions): RDD[Array[String]] =
3130
CSVRelation.univocityTokenizer(rdd, firstLine, options)
3231
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/**
2+
* Copyright 2017 deepsense.ai (CodiLime, Inc)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
name := "seahorse-executor-sparkutils2.2.x"
18+
19+
libraryDependencies ++= Dependencies.sparkutils(Version.spark)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/**
2+
* Copyright 2016 deepsense.ai (CodiLime, Inc)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.api.r
18+
19+
20+
class SparkRBackend {
21+
22+
private val backend: RBackend = new RBackend()
23+
private val backendThread: Thread = new Thread("SparkRBackend") {
24+
override def run(): Unit = backend.run()
25+
}
26+
27+
private var portNumber: Int = _
28+
private var entryPointTrackingId: String = _
29+
30+
def start(entryPoint: Object): Unit = {
31+
entryPointTrackingId = backend.jvmObjectTracker.addAndGetId(entryPoint).id
32+
portNumber = backend.init()
33+
backendThread.start()
34+
}
35+
36+
def close(): Unit = {
37+
backend.close()
38+
backendThread.join()
39+
}
40+
41+
def port: Int = portNumber
42+
43+
def entryPointId: String = entryPointTrackingId
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Copyright 2016 deepsense.ai (CodiLime, Inc)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.execution.datasources.csv
18+
19+
import ai.deepsense.sparkutils.readwritedataframe.ManagedResource
20+
import org.apache.spark.sql.types._
21+
import org.apache.spark.sql.{DataFrame, SparkSession}
22+
23+
object DataframeToDriverCsvFileWriter {
24+
25+
def write(
26+
dataFrame: DataFrame,
27+
options: Map[String, String],
28+
dataSchema: StructType,
29+
pathWithoutScheme: String,
30+
sparkSession: SparkSession): Unit = {
31+
val data = dataFrame.rdd.collect()
32+
val params = MapToCsvOptions(options, sparkSession.sessionState.conf)
33+
ManagedResource(
34+
new LocalCsvOutputWriter(dataSchema, params, pathWithoutScheme)
35+
) { writer =>
36+
data.foreach(row => {
37+
writer.write(row.toSeq.map(_.asInstanceOf[String]))
38+
})
39+
}
40+
}
41+
42+
}

0 commit comments

Comments
 (0)