Spark Hadoop 无法获得广播 [英] Spark Hadoop Failed to get broadcast

查看:30
本文介绍了Spark Hadoop 无法获得广播的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

运行 spark-submit 作业并收到无法获取广播_58_piece0..."错误.我真的不确定我做错了什么.我是否过度使用 UDF?功能太复杂?

Running a spark-submit job and receiving a "Failed to get broadcast_58_piece0..." error. I'm really not sure what I'm doing wrong. Am I overusing UDFs? Too complicated a function?

作为我目标的总结,我正在解析 pdf 中的文本,这些文本以 base64 编码的字符串形式存储在 JSON 对象中.我正在使用 Apache Tika 获取文本,并尝试大量使用数据框以简化操作.

As a summary of my objective, I am parsing text from pdfs, which are stored as base64 encoded strings in JSON objects. I'm using Apache Tika to get the text, and trying to make copious use of data frames to make things easier.

我编写了一段代码,通过 tika 将文本提取作为 RDD 数据的main"之外的函数运行,并且运行完美.但是,当我尝试将提取作为数据帧上的 UDF 引入 main 时,它会以各种不同的方式运行.在我到达这里之前,我实际上是在尝试将最终数据框编写为:

I had written a piece of code that ran the text extraction through tika as a function outside of "main" on the data as a RDD, and that worked flawlessly. When I try to bring the extraction into main as a UDF on data frames, though, it borks in various different ways. Before I got here I was actually trying to write the final data frame as:

valid.toJSON.saveAsTextFile(hdfs_dir)

这让我很头疼文件/路径已经存在".

This was giving me all sorts of "File/Path already exists" headaches.

当前代码:

object Driver {

  def main(args: Array[String]):Unit = {
    val hdfs_dir = args(0)
    val spark_conf = new SparkConf().setAppName("Spark Tika HDFS")
    val sc = new SparkContext(spark_conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._

    // load json data into dataframe
    val df = sqlContext.read.json("hdfs://hadoophost.com:8888/user/spark/data/in/*")

    val extractInfo: (Array[Byte] => String) = (fp: Array[Byte]) => {

      val parser:Parser = new AutoDetectParser()
      val handler:BodyContentHandler = new BodyContentHandler(Integer.MAX_VALUE)
      val config:TesseractOCRConfig  = new TesseractOCRConfig()
      val pdfConfig:PDFParserConfig = new PDFParserConfig()

      val inputstream:InputStream = new ByteArrayInputStream(fp)

      val metadata:Metadata = new  Metadata()
      val parseContext:ParseContext = new ParseContext()
      parseContext.set(classOf[TesseractOCRConfig], config)
      parseContext.set(classOf[PDFParserConfig], pdfConfig)
      parseContext.set(classOf[Parser], parser)
      parser.parse(inputstream, handler, metadata, parseContext)
      handler.toString
    }


    val extract_udf = udf(extractInfo)

    val df2 = df.withColumn("unbased_media", unbase64($"media_file")).drop("media_file")

    val dfRenamed = df2.withColumn("media_corpus", extract_udf(col("unbased_media"))).drop("unbased_media")

    val depuncter: (String => String) = (corpus: String) => {
        val r = corpus.replaceAll("""[\p{Punct}]""", "")
        val s = r.replaceAll("""[0-9]""", "")
        s
    }

    val depuncter_udf = udf(depuncter)

    val withoutPunct = dfRenamed.withColumn("sentence", depuncter_udf(col("media_corpus")))

    val model = sc.objectFile[org.apache.spark.ml.PipelineModel]("hdfs://hadoophost.com:8888/user/spark/hawkeye-nb-ml-v2.0").first()

    val with_predictions = model.transform(withoutPunct)

    val fullNameChecker: ((String, String, String, String, String) => String) = (fname: String, mname: String, lname: String, sfx: String, text: String) =>{
        val newtext = text.replaceAll(" ", "").replaceAll("""[0-9]""", "").replaceAll("""[\p{Punct}]""", "").toLowerCase
        val new_fname = fname.replaceAll(" ", "").replaceAll("""[0-9]""", "").replaceAll("""[\p{Punct}]""", "").toLowerCase
        val new_mname = mname.replaceAll(" ", "").replaceAll("""[0-9]""", "").replaceAll("""[\p{Punct}]""", "").toLowerCase
        val new_lname = lname.replaceAll(" ", "").replaceAll("""[0-9]""", "").replaceAll("""[\p{Punct}]""", "").toLowerCase
        val new_sfx = sfx.replaceAll(" ", "").replaceAll("""[0-9]""", "").replaceAll("""[\p{Punct}]""", "").toLowerCase
        val name_full = new_fname.concat(new_mname).concat(new_lname).concat(new_sfx)
        val c = name_full.r.findAllIn(newtext).length
        c match {
            case 0 => "N"
            case _ => "Y"
        }
    }

    val fullNameChecker_udf = udf(fullNameChecker)

    val stringChecker: ((String, String) => String) = (term: String, text: String) => {
        val termLower = term.replaceAll("""[\p{Punct}]""", "").toLowerCase
        val textLower = text.replaceAll("""[\p{Punct}]""", "").toLowerCase
        val c = termLower.r.findAllIn(textLower).length
        c match {
        case 0 => "N"
        case _ => "Y"
        }
    }

    val stringChecker_udf = udf(stringChecker)


    val stringChecker2: ((String, String) => String) = (term: String, text: String) => {
        val termLower = term takeRight 4
        val textLower = text
        val c = termLower.r.findAllIn(textLower).length
        c match {
        case 0 => "N"
        case _ => "Y"
        }
    }

    val stringChecker2_udf = udf(stringChecker)

    val valids = with_predictions.withColumn("fname_valid", stringChecker_udf(col("first_name"), col("media_corpus")))
                                            .withColumn("lname_valid", stringChecker_udf(col("last_name"), col("media_corpus")))
                                            .withColumn("fname2_valid", stringChecker_udf(col("first_name_2"), col("media_corpus")))
                                            .withColumn("lname2_valid", stringChecker_udf(col("last_name_2"), col("media_corpus")))
                                            .withColumn("camt_valid", stringChecker_udf(col("chargeoff_amount"), col("media_corpus")))
                                            .withColumn("ocan_valid", stringChecker2_udf(col("original_creditor_account_nbr"), col("media_corpus")))
                                            .withColumn("dpan_valid", stringChecker2_udf(col("debt_provider_account_nbr"), col("media_corpus")))
                                            .withColumn("full_name_valid", fullNameChecker_udf(col("first_name"), col("middle_name"), col("last_name"), col("suffix"), col("media_corpus")))
                                            .withColumn("full_name_2_valid", fullNameChecker_udf(col("first_name_2"), col("middle_name_2"), col("last_name_2"), col("suffix_2"), col("media_corpus")))


    valids.write.mode(SaveMode.Overwrite).format("json").save(hdfs_dir)


  }

}

从错误开始的完整堆栈跟踪:

Full stack trace starting with error:

16/06/14 15:02:01 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 53, hdpd11n05.squaretwofinancial.com): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:272)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_58_piece0 of broadcast_58
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1222)
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
    at org.apache.spark.ml.feature.CountVectorizerModel$$anonfun$9$$anonfun$apply$7.apply(CountVectorizer.scala:222)
    at org.apache.spark.ml.feature.CountVectorizerModel$$anonfun$9$$anonfun$apply$7.apply(CountVectorizer.scala:221)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at org.apache.spark.ml.feature.CountVectorizerModel$$anonfun$9.apply(CountVectorizer.scala:221)
    at org.apache.spark.ml.feature.CountVectorizerModel$$anonfun$9.apply(CountVectorizer.scala:218)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr43$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:263)
    ... 8 more
Caused by: org.apache.spark.SparkException: Failed to get broadcast_58_piece0 of broadcast_58
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1219)
    ... 25 more

推荐答案

对于遇到此问题的任何人,事实证明我加载的模型格式不正确.我通过在纱线客户端模式下使用 spark-shell 并逐步执行代码发现了这一点.当我尝试加载模型时它很好,但是由于找不到元数据目录的错误而针对数据报 (model.transform) 运行它.

For anyone coming across this, it turns out the model I was loading was malformed. I found out by using spark-shell in yarn-client mode and stepping through the code. When I tried to load the model it was fine, but running it against the datagram (model.transform) through errors about not finding a metadata directory.

我返回并找到了一个很好的模型,并与之抗衡,并且效果很好.这段代码实际上是合理的.

I went back and found a good model, ran against that and it worked fine. This code is actually sound.

这篇关于Spark Hadoop 无法获得广播的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆