I have an error in my spark project. I use IntelliJ. I tried sbt asembly
and cache()
. I wondering maybe I mixed up in my bulid.sbt.
I don't have a problem with val numFollows
I have count()
there.
When I tried show()
- val results
about pageRank or find
function for searching a path in graph -val motifs
then there is the error about driver-heartbeater, so I tried to use cache()
but it didn't work.
My code:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};
import org.graphframes._
object Graph extends App {
override def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Graph").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
try {
val v = sc.textFile("data/VERTICES.txt")
val e = sc.textFile("data/EDGES.txt")
val schemaStringVertices = "id code"
val schema = StructType(
schemaStringVertices.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val vRDD = v.map(_.split("t")).map(p => Row(p(0).trim, p(1).trim))
val vDF = sqlContext.createDataFrame(vRDD, schema).toDF
vDF.cache()
val schemaStringEdges = "src dst weight"
val schemaEdges = StructType(
schemaStringEdges.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val eRDD = e.map(_.split("t")).map(p => Row(p(0).trim, p(1).trim, p(2).trim))
val eDF = sqlContext.createDataFrame(eRDD, schemaEdges).toDF
eDF.cache()
val g = GraphFrame(vDF, eDF)
val numFollows = g.vertices.filter("code = '170'").count()
println(numFollows)
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
motifs.cache()
motifs.showt()
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
results.vertices.select("id", "pagerank").show()
}
finally {
sc.stop()
}
}
}
the error:
ERROR Utils: Uncaught exception in thread driver-heartbeater
java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at org.apache.spark.util.Utils$.deserialize(Utils.scala:91)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:440)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:430)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:430)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:428)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:428)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:472)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:472)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:472)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:472)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(Unknown Source)
at java.io.ObjectStreamClass.setObjFieldValues(Unknown Source)
at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.io.ObjectInputStream.defaultReadObject(Unknown Source)
at org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
... 33 more
[Stage 5:===============================================> (173 + 1) / 200]16/06/13 12:01:14 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 179269 ms exceeds timeout 120000 ms
16/06/13 12:01:14 ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 179269 ms
16/06/13 12:01:14 WARN TaskSetManager: Lost task 173.0 in stage 5.0 (TID 377, localhost): ExecutorLostFailure (executor driver lost)
16/06/13 12:01:14 ERROR TaskSetManager: Task 173 in stage 5.0 failed 1 times; aborting job
16/06/13 12:01:14 WARN SparkContext: Killing executors is only supported in coarse-grained mode
16/06/13 12:01:15 ERROR TaskSchedulerImpl: Exception in statusUpdate
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.scheduler.TaskResultGetter$$anon$2@523f95e5 rejected from java.util.concurrent.ThreadPoolExecutor@7731ae85[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 377]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
at org.apache.spark.scheduler.TaskResultGetter.enqueueSuccessfulTask(TaskResultGetter.scala:49)
at org.apache.spark.scheduler.TaskSchedulerImpl.liftedTree2$1(TaskSchedulerImpl.scala:347)
at org.apache.spark.scheduler.TaskSchedulerImpl.statusUpdate(TaskSchedulerImpl.scala:330)
at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalBackend.scala:65)
at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 173 in stage 5.0 failed 1 times, most recent failure: Lost task 173.0 in stage 5.0 (TID 377, localhost): ExecutorLostFailure (executor driver lost)
[error] Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 173 in stage 5.0 failed 1 times, most recent failure: Lost task 173.0 in stage 5.0 (TID 377, localhost): ExecutorLostFailure (executor driver lost)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:909)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
at org.apache.spark.rdd.RDD.collect(RDD.scala:908)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:177)
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
at course2.module1.Graph$.main(Graph.scala:41)
at course2.module1.Graph.main(Graph.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
[trace] Stack trace suppressed: run last compile:run for the full output.
java.lang.RuntimeException: Nonzero exit code: 1
at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last compile:run for the full output.
[error] (compile:run) Nonzero exit code: 1
bulid.sbt
import sbt.complete.Parsers._
name := "spark-examples"
version := "0.1.0"
scalaVersion := "2.10.5"
//default Spark version
val sparkVersion = "1.5.2"
resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion withSources(),
"org.apache.spark" %% "spark-streaming" % sparkVersion withSources(),
"org.apache.spark" %% "spark-sql" % sparkVersion withSources(),
"org.apache.spark" %% "spark-hive" % sparkVersion withSources(),
"org.apache.spark" %% "spark-streaming-twitter" % sparkVersion withSources(),
"org.apache.spark" %% "spark-mllib" % sparkVersion withSources(),
"org.twitter4j" % "twitter4j-core" % "3.0.3" withSources(),
"com.databricks" %% "spark-csv" % "1.3.0" withSources(),
"graphframes" % "graphframes" % "0.1.0-spark1.4"
)
unmanagedResourceDirectories in Compile += baseDirectory.value / "conf"
unmanagedResourceDirectories in Test += baseDirectory.value / "conf"
initialCommands += """
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
val conf = new SparkConf().
setMaster("local[*]").
setAppName("Console").
set("spark.app.id", "Console"). // To silence Metrics warning.
set("spark.sql.shuffle.partitions", "4") // for smaller data sets.
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
"""
cleanupCommands += """
println("Closing the SparkContext:")
sc.stop()
""".stripMargin
addCommandAlias("ex1a", "run-main course2.module1.WordCount")
addCommandAlias("ex1c", "run-main course2.module1.Graph")
addCommandAlias("ex1d", "run-main course2.module1.example")
Thanks for help, aola
Aucun commentaire:
Enregistrer un commentaire