Skip to content

Commit 9bf35a6

Browse files
zjffdualex-the-man
authored andcommitted
LIVY-230. SparkSession is not avalible for PySparkInterperter and SparkRInterperter
Closes #210
1 parent f3d6cd5 commit 9bf35a6

File tree

4 files changed

+20
-8
lines changed

4 files changed

+20
-8
lines changed

repl/src/main/resources/fake_shell.py

+3
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,7 @@ def main():
514514
sys.stdout = UnicodeDecodingStringIO()
515515
sys.stderr = UnicodeDecodingStringIO()
516516

517+
spark_major_version = os.getenv("LIVY_SPARK_MAJOR_VERSION")
517518
try:
518519
listening_port = 0
519520
if os.environ.get("LIVY_TEST") != "true":
@@ -523,6 +524,8 @@ def main():
523524
exec('from pyspark.sql import HiveContext', global_dict)
524525
exec('from pyspark.streaming import StreamingContext', global_dict)
525526
exec('import pyspark.cloudpickle as cloudpickle', global_dict)
527+
if spark_major_version >= "2":
528+
exec('from pyspark.shell import spark', global_dict)
526529

527530
#Start py4j callback server
528531
from py4j.protocol import ENTRY_POINT_OBJECT_ID

repl/src/main/scala/com/cloudera/livy/repl/PythonInterpreter.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ object PythonInterpreter extends Logging {
6868
env.put("PYTHONUNBUFFERED", "YES")
6969
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
7070
env.put("SPARK_HOME", sys.env.getOrElse("SPARK_HOME", "."))
71-
71+
env.put("LIVY_SPARK_MAJOR_VERSION", conf.get("spark.livy.spark_major_version", "1"))
7272
builder.redirectError(Redirect.PIPE)
7373
val process = builder.start()
7474
new PythonInterpreter(process, gatewayServer, kind.toString)

repl/src/main/scala/com/cloudera/livy/repl/SparkRInterpreter.scala

+14-6
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ object SparkRInterpreter {
114114

115115
builder.redirectError(Redirect.PIPE)
116116
val process = builder.start()
117-
new SparkRInterpreter(process, backendInstance, backendThread)
117+
new SparkRInterpreter(process, backendInstance, backendThread,
118+
conf.get("spark.livy.spark_major_version", "1"))
118119
} catch {
119120
case e: Exception =>
120121
if (backendThread != null) {
@@ -125,9 +126,8 @@ object SparkRInterpreter {
125126
}
126127
}
127128

128-
class SparkRInterpreter(process: Process, backendInstance: Any, backendThread: Thread)
129-
extends ProcessInterpreter(process)
130-
{
129+
class SparkRInterpreter(process: Process, backendInstance: Any, backendThread: Thread,
130+
sparkMajorVersion: String) extends ProcessInterpreter(process) {
131131
import SparkRInterpreter._
132132

133133
implicit val formats = DefaultFormats
@@ -141,8 +141,16 @@ class SparkRInterpreter(process: Process, backendInstance: Any, backendThread: T
141141
sendRequest("options(error = dump.frames)")
142142
if (!ClientConf.TEST_MODE) {
143143
sendRequest("library(SparkR)")
144-
sendRequest("sc <- sparkR.init()")
145-
sendRequest("sqlContext <- sparkRSQL.init(sc)")
144+
145+
if (sparkMajorVersion >= "2") {
146+
sendRequest("spark <- SparkR::sparkR.session()")
147+
sendRequest(
148+
"""sc <- SparkR:::callJStatic("org.apache.spark.sql.api.r.SQLUtils",
149+
"getJavaSparkContext", spark)""")
150+
} else {
151+
sendRequest("sc <- sparkR.init()")
152+
sendRequest("sqlContext <- sparkRSQL.init(sc)")
153+
}
146154
}
147155

148156
isStarted.countDown()

server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@ class InteractiveSession(
125125
val enableHiveContext = livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT)
126126
val sparkMajorVersion =
127127
LivySparkUtils.formatSparkVersion(LivySparkUtils.sparkSubmitVersion(livyConf))._1
128-
128+
// pass spark.livy.spark_major_version to driver
129+
builderProperties.put("spark.livy.spark_major_version", sparkMajorVersion.toString)
129130
if (sparkMajorVersion <= 1) {
130131
builderProperties.put("spark.repl.enableHiveContext",
131132
livyConf.getBoolean(LivyConf.ENABLE_HIVE_CONTEXT).toString)

0 commit comments

Comments
 (0)