vendredi 24 juin 2016

Scala - Spark - DataFrame.show() throws java.util.NoSuchElementException after a groupBy().mean()

I load some data, perform some operations, then I want to compute the average weight per contactId val customSchema = StructType(Array( StructField("contactId", StringType, true), StructField("weight", DoubleType, true), StructField("email", StringType, true))) // Load dataframe val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") // File has headers .option("delimiter","t") // Delimiter is tab //.option("inferSchema","true") // Automatically infer data type .option("parserLib", "UNIVOCITY") // Parser, which deals better with the email formatting .schema(customSchema) // Schema of the table .load("outputs/cleanedEmails.txt") // Input file // Print out the first 20 rows of the dataframe //df.show() // Sentiment Analysis val sent = df .select('contactId, 'weight, sentiment('email).as('sentiment)) // Add sentiment analysis output to dataframe .select('contactId, 'weight*'sentiment) // Multiply sentiment score by weight .withColumnRenamed("(weight * sentiment)", "weight") // Rename column .filter('weight >= 0.0005) .groupBy("contactId").mean("weight") // Calculate average weight per unique contactId At this point, every works find. However, if I do sent.show(), spark-submit will throw the following error: ERROR Executor: Managed memory leak detected; size = 16777216 bytes, TID = 1 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.util.NoSuchElementException at java.util.ArrayList$Itr.next(ArrayList.java:854) at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42) at scala.collection.IterableLike$class.head(IterableLike.scala:91) at scala.collection.AbstractIterable.head(Iterable.scala:54) at main.scala.com.databricks.spark.corenlp.functions$$anonfun$sentiment$1.apply(functions.scala:163) at main.scala.com.databricks.spark.corenlp.functions$$anonfun$sentiment$1.apply(functions.scala:158) at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:75) at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:74) at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:964) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source) at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67) at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67) at org.apache.spark.sql.execution.Filter$$anonfun$4$$anonfun$apply$4.apply(basicOperators.scala:117) at org.apache.spark.sql.execution.Filter$$anonfun$4$$anonfun$apply$4.apply(basicOperators.scala:115) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:371) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207) at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385) at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1314) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1377) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:401) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:362) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:370) at main.scala.Sentiment$.main(Sentiment.scala:68) at main.scala.Sentiment.main(Sentiment.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.util.NoSuchElementException at java.util.ArrayList$Itr.next(ArrayList.java:854) at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42) at scala.collection.IterableLike$class.head(IterableLike.scala:91) at scala.collection.AbstractIterable.head(Iterable.scala:54) at main.scala.com.databricks.spark.corenlp.functions$$anonfun$sentiment$1.apply(functions.scala:163) at main.scala.com.databricks.spark.corenlp.functions$$anonfun$sentiment$1.apply(functions.scala:158) at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:75) at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:74) at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:964) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source) at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67) at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67) at org.apache.spark.sql.execution.Filter$$anonfun$4$$anonfun$apply$4.apply(basicOperators.scala:117) at org.apache.spark.sql.execution.Filter$$anonfun$4$$anonfun$apply$4.apply(basicOperators.scala:115) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:371) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) This might means that there are null values in my data frame, but I do not know how to find them. Thoughts?

Aucun commentaire:

Enregistrer un commentaire