Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parsing headers causes a map error #32

Open
stoddabr opened this issue Oct 23, 2024 · 2 comments
Open

Parsing headers causes a map error #32

stoddabr opened this issue Oct 23, 2024 · 2 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@stoddabr
Copy link
Contributor

What version were you using?

Nats V2 code as is.

What environment was the server running in?

Databricks notebook DBR 13.3

Is this defect reproducible?

Yes, it occurs every time for me.

Given the capability you are leveraging, describe your expectation?

No err

Given the expectation, what is the defect you are observing?

Error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 1.0 failed 4 times, most recent failure: Lost task 4.3 in stage 1.0 (TID 23) (REDACTED_IP_ADDRESS executor 2): java.lang.ClassCastException: scala.collection.immutable.Map$Map4 cannot be cast to org.apache.spark.sql.catalyst.util.MapData

Stack trace:

	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getMap(rows.scala:50)
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getMap$(rows.scala:50)
	at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getMap(rows.scala:195)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:120)
	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anon$2.hasNext(InMemoryRelation.scala:287)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:227)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:312)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1654)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1581)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1645)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1438)
	at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1392)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:422)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:372)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:410)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:407)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:410)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:407)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:410)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:407)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
	at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:410)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:407)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:410)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:407)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)
	at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:900)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1709)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:903)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:798)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Replacing the following code with headers = None makes the error go away which is how I know this is the issue:

    val headers: Option[Map[String, Seq[String]]] =
      Option(message.getHeaders).map(
        _.entrySet()
          .toSet
          .map((me: util.Map.Entry[String, util.List[String]]) =>
            (me.getKey, me.getValue.asScala.toSeq))
          .toMap)
@stoddabr stoddabr added the defect Suspected defect such as a bug or regression label Oct 23, 2024
@stoddabr
Copy link
Contributor Author

stoddabr commented Oct 23, 2024

I've found a better fix which preserves the header information as a String using the public serialization method on headers:

    val headers: String = Option(message.getHeaders).orNull match {
      case headers: io.nats.client.impl.Headers => new String(headers.getSerialized, "UTF-8")
      case _ => null
    }

This requires setting the headers column to type String and casting the string to the spark unsafe UTF8String

@jnmoyne
Copy link
Contributor

jnmoyne commented Oct 29, 2024

LGTM on the fix you describe of using the public serialization and column type to string, thanks for this you can create a PR for the change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

2 participants