vendredi 17 juin 2016

xml output format in spark scala

I am trying to implement XML output format in spark using spark scala. I have written a custom output format in hadoop and able to implement it successfully. But when i am trying to implement in spark i am getting the the error:

java.io.NotSerializableException: org.apache.hadoop.io.Text Serialization stack: - object not serializable (class: org.apache.hadoop.io.Text, value: hadoop) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.SerializationStream.writeKey(Serializer.scala:145) at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180) at org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:55) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:681) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 16/06/17 18:22:04 ERROR Executor: Exception in task 0.0 in stage 30.0 (TID 34) java.io.NotSerializableException: org.apache.hadoop.io.Text Serialization stack: - object not serializable (class: org.apache.hadoop.io.Text, value: Hadoop) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.SerializationStream.writeKey(Serializer.scala:145) at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180) at org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:55) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:681) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 16/06/17 18:22:04 ERROR TaskSetManager: Task 0.0 in stage 30.0 (TID 34) had a not serializable result: org.apache.hadoop.io.Text Serialization stack: - object not serializable (class: org.apache.hadoop.io.Text, value: Hadoop); not retrying 16/06/17 18:22:04 ERROR TaskSetManager: Task 1.0 in stage 30.0 (TID 35) had a not serializable result: org.apache.hadoop.io.Text Serialization stack: - object not serializable (class: org.apache.hadoop.io.Text, value: hadoop); not retrying org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 30.0 (TID 34) had a not serializable result: org.apache.hadoop.io.Text Serialization stack: - object not serializable (class: org.apache.hadoop.io.Text, value: Hadoop) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at org.apache.spark.rdd.RDD.collect(RDD.scala:904) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:35) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:40) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:42) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:44) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:46) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:48) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:50) at $iwC$$iwC$$iwC$$iwC$$iwC.(:52) at $iwC$$iwC$$iwC$$iwC.(:54) at $iwC$$iwC$$iwC.(:56) at $iwC$$iwC.(:58) at $iwC.(:60) at (:62) at .(:66) at .() at .(:7) at .() at $print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I have used Kyro serialization also but of no use.I have build a jar file for the custom output format class and included it in the spark shell successfully. Here is the code i have used.

:cp custom_out.jar
val sparkConf = new SparkConf().setAppName("myAPP").setMaster("local");
sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.io.IntWritable], classOf[org.apache.hadoop.io.Text],classOf[XMLOutputFormat]))
val textFile = sc.textFile("/home/kiran/word_count_input")
val counts = textFile.flatMap(line => line.split(" ")).map(word => (new Text(word), 1)).reduceByKey(_ + _)
val test: RDD[(Text,IntWritable)] =counts.map(x => (x._1,new IntWritable(x._2)))
test.collect

You can download the jar file from the below link https://drive.google.com/open?id=0ByJLBTmJojjzZXBxUVBkZW95U2c

Aucun commentaire:

Enregistrer un commentaire