lundi 13 juin 2016

SPARK Uncaught exception in thread driver-heartbeater

I have an error in my spark project. I use IntelliJ. I tried sbt asembly and cache(). I wondering maybe I mixed up in my bulid.sbt.

I don't have a problem with val numFollows I have count() there. When I tried show() - val results about pageRank or find function for searching a path in graph -val motifs then there is the error about driver-heartbeater, so I tried to use cache() but it didn't work.

My code:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};
import org.graphframes._

object Graph extends App {

  override def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Graph").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
try {

  val v = sc.textFile("data/VERTICES.txt")
  val e = sc.textFile("data/EDGES.txt")

  val schemaStringVertices = "id code"
  val schema = StructType(
    schemaStringVertices.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
  val vRDD = v.map(_.split("t")).map(p => Row(p(0).trim, p(1).trim))
  val vDF = sqlContext.createDataFrame(vRDD, schema).toDF
  vDF.cache()

  val schemaStringEdges = "src dst weight"
  val schemaEdges = StructType(
    schemaStringEdges.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
  val eRDD = e.map(_.split("t")).map(p => Row(p(0).trim, p(1).trim, p(2).trim))
  val eDF = sqlContext.createDataFrame(eRDD, schemaEdges).toDF
  eDF.cache()

  val g = GraphFrame(vDF, eDF)

  val numFollows = g.vertices.filter("code = '170'").count()
  println(numFollows)

  val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
  motifs.cache()
  motifs.showt()



  val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
  results.vertices.select("id", "pagerank").show()


}
    finally {
      sc.stop()
    }
  }

}

the error:

  ERROR Utils: Uncaught exception in thread driver-heartbeater
    java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
            at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
            at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
            at java.lang.reflect.Method.invoke(Unknown Source)
            at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
            at java.io.ObjectInputStream.readSerialData(Unknown Source)
            at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
            at java.io.ObjectInputStream.readObject0(Unknown Source)
            at java.io.ObjectInputStream.readObject(Unknown Source)
            at org.apache.spark.util.Utils$.deserialize(Utils.scala:91)
            at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:440)
            at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:430)
            at scala.Option.foreach(Option.scala:236)
            at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:430)
            at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:428)
            at scala.collection.Iterator$class.foreach(Iterator.scala:727)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
            at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
            at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
            at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:428)
            at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:472)
            at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:472)
            at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:472)
            at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
            at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:472)
            at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
            at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
            at java.lang.Thread.run(Unknown Source)
    Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
            at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(Unknown Source)
            at java.io.ObjectStreamClass.setObjFieldValues(Unknown Source)
            at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
            at java.io.ObjectInputStream.defaultReadObject(Unknown Source)
            at org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
            at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
            ... 33 more
    [Stage 5:===============================================>       (173 + 1) / 200]16/06/13 12:01:14 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 179269 ms exceeds timeout 120000 ms
    16/06/13 12:01:14 ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 179269 ms
    16/06/13 12:01:14 WARN TaskSetManager: Lost task 173.0 in stage 5.0 (TID 377, localhost): ExecutorLostFailure (executor driver lost)
    16/06/13 12:01:14 ERROR TaskSetManager: Task 173 in stage 5.0 failed 1 times; aborting job
    16/06/13 12:01:14 WARN SparkContext: Killing executors is only supported in coarse-grained mode
    16/06/13 12:01:15 ERROR TaskSchedulerImpl: Exception in statusUpdate
    java.util.concurrent.RejectedExecutionException: Task org.apache.spark.scheduler.TaskResultGetter$$anon$2@523f95e5 rejected from java.util.concurrent.ThreadPoolExecutor@7731ae85[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 377]
            at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
            at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
            at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
            at org.apache.spark.scheduler.TaskResultGetter.enqueueSuccessfulTask(TaskResultGetter.scala:49)
            at org.apache.spark.scheduler.TaskSchedulerImpl.liftedTree2$1(TaskSchedulerImpl.scala:347)
            at org.apache.spark.scheduler.TaskSchedulerImpl.statusUpdate(TaskSchedulerImpl.scala:330)
            at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalBackend.scala:65)
            at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
            at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
            at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
            at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
            at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
            at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
            at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
            at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
            at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
            at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
            at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
            at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
            at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92)
            at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
            at akka.actor.ActorCell.invoke(ActorCell.scala:487)
            at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
            at akka.dispatch.Mailbox.run(Mailbox.scala:220)
            at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
            at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
            at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
            at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
            at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 173 in stage 5.0 failed 1 times, most recent failure: Lost task 173.0 in stage 5.0 (TID 377, localhost): ExecutorLostFailure (executor driver lost)
    [error] Driver stacktrace:
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 173 in stage 5.0 failed 1 times, most recent failure: Lost task 173.0 in stage 5.0 (TID 377, localhost): ExecutorLostFailure (executor driver lost)
    Driver stacktrace:
            at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
            at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
            at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
            at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
            at scala.Option.foreach(Option.scala:236)
            at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
            at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
            at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)
            at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:909)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
            at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
            at org.apache.spark.rdd.RDD.collect(RDD.scala:908)
            at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:177)
            at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
            at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
            at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
            at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
            at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
            at course2.module1.Graph$.main(Graph.scala:41)
            at course2.module1.Graph.main(Graph.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
            at java.lang.reflect.Method.invoke(Unknown Source)
    [trace] Stack trace suppressed: run last compile:run for the full output.
    java.lang.RuntimeException: Nonzero exit code: 1
            at scala.sys.package$.error(package.scala:27)
    [trace] Stack trace suppressed: run last compile:run for the full output.
    [error] (compile:run) Nonzero exit code: 1

bulid.sbt

import sbt.complete.Parsers._

name := "spark-examples"
version := "0.1.0"
scalaVersion := "2.10.5"

//default Spark version
val sparkVersion = "1.5.2"

resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core"              % sparkVersion withSources(),
  "org.apache.spark" %% "spark-streaming"         % sparkVersion withSources(),
  "org.apache.spark" %% "spark-sql"               % sparkVersion withSources(),
  "org.apache.spark" %% "spark-hive"              % sparkVersion withSources(),
  "org.apache.spark" %% "spark-streaming-twitter" % sparkVersion withSources(),
  "org.apache.spark" %% "spark-mllib"             % sparkVersion withSources(),
  "org.twitter4j"    %  "twitter4j-core"          % "3.0.3" withSources(),
  "com.databricks"   %% "spark-csv"               % "1.3.0"      withSources(),
  "graphframes" % "graphframes" % "0.1.0-spark1.4"
)


unmanagedResourceDirectories in Compile += baseDirectory.value / "conf"
unmanagedResourceDirectories in Test += baseDirectory.value / "conf"

initialCommands += """
  import org.apache.spark.{SparkConf, SparkContext}
  import org.apache.spark.SparkContext._
  import org.apache.spark.sql.SQLContext
  val conf = new SparkConf().
    setMaster("local[*]").
    setAppName("Console").
    set("spark.app.id", "Console").   // To silence Metrics warning.
    set("spark.sql.shuffle.partitions", "4")  // for smaller data sets.
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)
  import sqlContext.implicits._
  """

cleanupCommands += """
  println("Closing the SparkContext:")
  sc.stop()
  """.stripMargin

addCommandAlias("ex1a",         "run-main course2.module1.WordCount")
addCommandAlias("ex1c",         "run-main course2.module1.Graph")
addCommandAlias("ex1d",         "run-main course2.module1.example")

Thanks for help, aola

Aucun commentaire:

Enregistrer un commentaire