mercredi 15 juin 2016

Spark Hadoop Failed to get broadcast

Running a spark-submit job and receiving a "Failed to get broadcast_58_piece0..." error. I'm really not sure what I'm doing wrong. Am I overusing UDFs? Too complicated a function?

As a summary of my objective, I am parsing text from pdfs, which are stored as base64 encoded strings in JSON objects. I'm using Apache Tika to get the text, and trying to make copious use of data frames to make things easier.

I had written a piece of code that ran the text extraction through tika as a function outside of "main" on the data as a RDD, and that worked flawlessly. When I try to bring the extraction into main as a UDF on data frames, though, it borks in various different ways. Before I got here I was actually trying to write the final data frame as:

valid.toJSON.saveAsTextFile(hdfs_dir)

This was giving me all sorts of "File/Path already exists" headaches.

Current code:

object Driver {

  def main(args: Array[String]):Unit = {
    val hdfs_dir = args(0)
    val spark_conf = new SparkConf().setAppName("Spark Tika HDFS")
    val sc = new SparkContext(spark_conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._

    // load json data into dataframe
    val df = sqlContext.read.json("hdfs://hadoophost.com:8888/user/spark/data/in/*")

    val extractInfo: (Array[Byte] => String) = (fp: Array[Byte]) => {

      val parser:Parser = new AutoDetectParser()
      val handler:BodyContentHandler = new BodyContentHandler(Integer.MAX_VALUE)
      val config:TesseractOCRConfig  = new TesseractOCRConfig()
      val pdfConfig:PDFParserConfig = new PDFParserConfig()

      val inputstream:InputStream = new ByteArrayInputStream(fp)

      val metadata:Metadata = new  Metadata()
      val parseContext:ParseContext = new ParseContext()
      parseContext.set(classOf[TesseractOCRConfig], config)
      parseContext.set(classOf[PDFParserConfig], pdfConfig)
      parseContext.set(classOf[Parser], parser)
      parser.parse(inputstream, handler, metadata, parseContext)
      handler.toString
    }


    val extract_udf = udf(extractInfo)

    val df2 = df.withColumn("unbased_media", unbase64($"media_file")).drop("media_file")

    val dfRenamed = df2.withColumn("media_corpus", extract_udf(col("unbased_media"))).drop("unbased_media")

    val depuncter: (String => String) = (corpus: String) => {
        val r = corpus.replaceAll("""[p{Punct}]""", "")
        val s = r.replaceAll("""[0-9]""", "")
        s
    }

    val depuncter_udf = udf(depuncter)

    val withoutPunct = dfRenamed.withColumn("sentence", depuncter_udf(col("media_corpus")))

    val model = sc.objectFile[org.apache.spark.ml.PipelineModel]("hdfs://hadoophost.com:8888/user/spark/hawkeye-nb-ml-v2.0").first()

    val with_predictions = model.transform(withoutPunct)

    val fullNameChecker: ((String, String, String, String, String) => String) = (fname: String, mname: String, lname: String, sfx: String, text: String) =>{
        val newtext = text.replaceAll(" ", "").replaceAll("""[0-9]""", "").replaceAll("""[p{Punct}]""", "").toLowerCase
        val new_fname = fname.replaceAll(" ", "").replaceAll("""[0-9]""", "").replaceAll("""[p{Punct}]""", "").toLowerCase
        val new_mname = mname.replaceAll(" ", "").replaceAll("""[0-9]""", "").replaceAll("""[p{Punct}]""", "").toLowerCase
        val new_lname = lname.replaceAll(" ", "").replaceAll("""[0-9]""", "").replaceAll("""[p{Punct}]""", "").toLowerCase
        val new_sfx = sfx.replaceAll(" ", "").replaceAll("""[0-9]""", "").replaceAll("""[p{Punct}]""", "").toLowerCase
        val name_full = new_fname.concat(new_mname).concat(new_lname).concat(new_sfx)
        val c = name_full.r.findAllIn(newtext).length
        c match {
            case 0 => "N"
            case _ => "Y"
        }
    }

    val fullNameChecker_udf = udf(fullNameChecker)

    val stringChecker: ((String, String) => String) = (term: String, text: String) => {
        val termLower = term.replaceAll("""[p{Punct}]""", "").toLowerCase
        val textLower = text.replaceAll("""[p{Punct}]""", "").toLowerCase
        val c = termLower.r.findAllIn(textLower).length
        c match {
        case 0 => "N"
        case _ => "Y"
        }
    }

    val stringChecker_udf = udf(stringChecker)


    val stringChecker2: ((String, String) => String) = (term: String, text: String) => {
        val termLower = term takeRight 4
        val textLower = text
        val c = termLower.r.findAllIn(textLower).length
        c match {
        case 0 => "N"
        case _ => "Y"
        }
    }

    val stringChecker2_udf = udf(stringChecker)

    val valids = with_predictions.withColumn("fname_valid", stringChecker_udf(col("first_name"), col("media_corpus")))
                                            .withColumn("lname_valid", stringChecker_udf(col("last_name"), col("media_corpus")))
                                            .withColumn("fname2_valid", stringChecker_udf(col("first_name_2"), col("media_corpus")))
                                            .withColumn("lname2_valid", stringChecker_udf(col("last_name_2"), col("media_corpus")))
                                            .withColumn("camt_valid", stringChecker_udf(col("chargeoff_amount"), col("media_corpus")))
                                            .withColumn("ocan_valid", stringChecker2_udf(col("original_creditor_account_nbr"), col("media_corpus")))
                                            .withColumn("dpan_valid", stringChecker2_udf(col("debt_provider_account_nbr"), col("media_corpus")))
                                            .withColumn("full_name_valid", fullNameChecker_udf(col("first_name"), col("middle_name"), col("last_name"), col("suffix"), col("media_corpus")))
                                            .withColumn("full_name_2_valid", fullNameChecker_udf(col("first_name_2"), col("middle_name_2"), col("last_name_2"), col("suffix_2"), col("media_corpus")))


    valids.write.mode(SaveMode.Overwrite).format("json").save(hdfs_dir)


  }

}

Full stack trace starting with error:

16/06/14 15:02:01 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 53, hdpd11n05.squaretwofinancial.com): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:272)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_58_piece0 of broadcast_58
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1222)
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
    at org.apache.spark.ml.feature.CountVectorizerModel$$anonfun$9$$anonfun$apply$7.apply(CountVectorizer.scala:222)
    at org.apache.spark.ml.feature.CountVectorizerModel$$anonfun$9$$anonfun$apply$7.apply(CountVectorizer.scala:221)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at org.apache.spark.ml.feature.CountVectorizerModel$$anonfun$9.apply(CountVectorizer.scala:221)
    at org.apache.spark.ml.feature.CountVectorizerModel$$anonfun$9.apply(CountVectorizer.scala:218)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr43$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:263)
    ... 8 more
Caused by: org.apache.spark.SparkException: Failed to get broadcast_58_piece0 of broadcast_58
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1219)
    ... 25 more

Aucun commentaire:

Enregistrer un commentaire