HDFS:java.io.FileNotFoundException:文件不存在:name._COPYING [英] HDFS : java.io.FileNotFoundException : File does not exist: name._COPYING

查看:2383
本文介绍了HDFS:java.io.FileNotFoundException:文件不存在:name._COPYING的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Scala进行Spark Streaming。我需要从HDFS目录下面用这行读取一个.csv文件:

  val lines = ssc.textFileStream(/ user / root /)

我使用以下命令行将文件放入HDFS中:

  hdfs dfs -put ./head40k.csv 

它可以很好地处理相对较小的文件。
当我尝试使用较大的一个时,出现此错误:

  org.apache.hadoop.ipc.RemoteException (java.io.FileNotFoundException):文件不存在:/user/root/head800k.csv._COPYING 



<我可以理解为什么,但我不知道如何解决它。我也试过这个解决方案:

  hdfs dfs -put ./head800k.csv / user 
hdfs dfs - mv /usr/head800k.csv / user / root

但是我的程序没有读取文件。
任何想法?
提前致谢

计划:

  import org .apache.spark.SparkContext 
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.rdd.RDDFunctions._
import scala.sys.process._
import org.apache.spark.mllib.linalg.Vectors
import org.apache .kafka.clients.producer。{KafkaProducer,ProducerConfig,ProducerRecord}
import java.util.HashMap
import org.apache.hadoop.io。{LongWritable,NullWritable,Text}
import org .apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import StreamingContext._

object Traccia2014 {
def main(args:Array [String]){
if(args.length< 2){
System.err.println(s
|用法:DirectKafkaWordCount< brokers> <试验>< topicRisultato>
| <中间商>是一个或多个卡夫卡经纪商
|的列表<主题>是从
|使用的一个或多个kafka主题的列表
.stripMargin)
System.exit(1)
}

val Array(brokers,risultato)= args
val sparkConf = new SparkConf()。setAppName(Traccia2014)
val ssc = new StreamingContext(sparkConf,Seconds(5))

val lines = ssc.textFileStream(/ user / root /)

// val lines = ssc.fileStream [LongWritable,Text,TextInputFormat](directory =/ user / root /,
// filter =(path:org.apache.hadoop .fs.Path)=> //(!path.getName.endsWith(\"._COPYING\")),newFilesOnly = true)

// ********** Definizioni Producer ***********
$ b $ val props = new HashMap [String,Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common .serialization.StringSerializer)
$ b $ val生产者=新KafkaProducer [字符串,字符串](道具)

val slice = 30

lines.foreachRDD(rdd => {($!
$ b if(!rdd.isEmpty){
val min = rdd.map(x => x.split(,)(0))。reduce((a,如果(a if(!min.isEmpty){
val ipDst = rdd.map(x =>(((x.split ))(0).toInt-min.toInt).toLong / slice).round * slice ++(x.split(,)(2)),1))。reduceByKey(_ + _) $(b)if(!ipDst.isEmpty){
val ipSrc = rdd.map(x =>(((x.split(,)(0).toInt - min.toInt).toLong /slice).round*slice++(x.split(,)(1)),1))。reduceByKey(_ + _)
if(!ipSrc.isEmpty){

val Rapporto = ipSrc.leftOuterJoin(ip​​Dst).mapValues {case(x,y)=> x.asInstanceOf [Int] / y.getOrElse(1)}

val RapportoFiltrato = Rapporto.filter {case(key,value)=>值> 100}
println(###(ConsumerScala)CalcoloRapporti:###)
Rapporto.collect()。foreach(println)
val str = Rapporto.collect()。mkString (\ n)

println(s###(ConsumerScala)Produco Risultato:$ {str})

val message = new ProducerRecord [String, String](risultato,null,str)
producer.send(message)

Thread.sleep(1000)


} else {
println(src vuoto)
}
} else {
println(dst vuoto)
}
} else {
println( min vuoto)
}
} else
{
println(rdd vuoto)
}

})// foreach


ssc.start()
ssc.awaitTermination()


}}


解决方案

/user/root/head800k.csv._COPYING 是在复制过程正在进行时创建的临时文件。等待复制过程完成,如果没有 _COPYING 后缀,即 /user/root/head800k.csv

可以在您的spark-streaming作业中过滤这些瞬态过程,您可以使用记录的 fileStream 方法 here
如下图所示


  ssc.fileStream [LongWritable,Text,TextInputFormat](
directory =/ user / root /,
filter =(path:org.apache.hadoop.fs.Path)=>(!path.getName。 endsWith(_ COPYING)),//添加其他过滤器,如以点开始的文件等
newFilesOnly = true)

编辑

正在将文件从本地文件系统移动到HDFS,最好的解决方案是将文件移动到HDFS中的临时登台位置,然后将它们移动到目标目录。在HDFS文件系统中复制或移动应该避免瞬态文件

I'm working with Spark Streaming using Scala. I need to read a .csv file dinamically from HDFS directory with this line:

 val lines = ssc.textFileStream("/user/root/")

I use the following command line to put the file into HDFS:

hdfs dfs -put ./head40k.csv

It works fine with a relatively small file. When I try with a larger one, I get this error:

org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /user/root/head800k.csv._COPYING

I can understand why, but I don't know how to fix it. I've tried this solution too:

hdfs dfs -put ./head800k.csv /user
hdfs dfs -mv /usr/head800k.csv /user/root

but my program doesn't read the file. Any ideas? Thanks in advance

PROGRAM:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.rdd.RDDFunctions._
import scala.sys.process._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import java.util.HashMap
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import StreamingContext._

object Traccia2014{
  def main(args: Array[String]){
if (args.length < 2) {
  System.err.println(s"""
    |Usage: DirectKafkaWordCount <brokers> <test><topicRisultato>
    |  <brokers> is a list of one or more Kafka brokers
    |  <topics> is a list of one or more kafka topics to consume from
    |
    """.stripMargin)
  System.exit(1)
}

val Array(brokers,risultato) = args
val sparkConf = new SparkConf().setAppName("Traccia2014")
val ssc = new StreamingContext(sparkConf, Seconds(5))

  val lines = ssc.textFileStream("/user/root/")

 //val lines= ssc.fileStream[LongWritable, Text, TextInputFormat](directory="/user/root/",
     // filter = (path: org.apache.hadoop.fs.Path) => //(!path.getName.endsWith("._COPYING")),newFilesOnly = true)

  //********** Definizioni Producer***********

val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[String, String](props)

val slice=30

lines.foreachRDD( rdd => {

     if(!rdd.isEmpty){
         val min=rdd.map(x => x.split(",")(0)).reduce((a, b) => if (a < b) a else b)
         if(!min.isEmpty){
             val ipDst= rdd.map(x => (((x.split(",")(0).toInt - min.toInt).toLong/slice).round*slice+" "+(x.split(",")(2)),1)).reduceByKey(_ + _)
             if(!ipDst.isEmpty){
                val ipSrc=rdd.map(x => (((x.split(",")(0).toInt - min.toInt).toLong/slice).round*slice+" "+(x.split(",")(1)),1)).reduceByKey(_ + _)
                 if(!ipSrc.isEmpty){

                    val Rapporto=ipSrc.leftOuterJoin(ipDst).mapValues{case (x,y) => x.asInstanceOf[Int] / y.getOrElse(1) }

                    val RapportoFiltrato=Rapporto.filter{case (key, value) => value > 100 }
                    println("###(ConsumerScala) CalcoloRapporti: ###")
                    Rapporto.collect().foreach(println)
                   val str = Rapporto.collect().mkString("\n")

                      println(s"###(ConsumerScala) Produco Risultato : ${str}")

                      val message = new ProducerRecord[String, String](risultato, null, str)
                      producer.send(message)

  Thread.sleep(1000)


                 }else{
                   println("src vuoto")
            }
                 }else{
                    println("dst vuoto")
             }
             }else{
                println("min vuoto")
            }
                }else
                { 
                 println("rdd vuoto")
              }

              })//foreach


ssc.start()
ssc.awaitTermination()


} }

解决方案

/user/root/head800k.csv._COPYING is a transient file that is created while the copy process is on going. Wait for the copy process to complete and you will have a fail without the _COPYING suffix ie /user/root/head800k.csv.

to filter these transient in your spark-streaming job you can use the fileStream method documented here as shown below for example

 ssc.fileStream[LongWritable, Text, TextInputFormat](
      directory="/user/root/",
      filter = (path: org.apache.hadoop.fs.Path) => (!path.getName.endsWith("_COPYING")), // add other filters like files starting with dot etc
      newFilesOnly = true)

EDIT

since you are moving your file from local filesystem to HDFS, the best solution is to move your file to a temporary staging location in the HDFS and then move them to your target directory. copying or moving within the HDFS filesystem should avoid the transient files

这篇关于HDFS:java.io.FileNotFoundException:文件不存在:name._COPYING的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆