数据同时写入星火流输出到HDFS跳过 [英] Data skipped while writing Spark Streaming output to HDFS

查看:258
本文介绍了数据同时写入星火流输出到HDFS跳过的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在竞选每10秒一个Spark流应用程序,它的任务是从卡夫卡消耗数据,转换并存储到基于密钥HDFS。即,每个独特的密钥的文件。我使用了Hadoop的saveAsHadoopFile()API来存储输出,我看到一个文件被每一个独特的密钥生成,但问题是,只有一行被存储为每一个独特的密钥虽然DSTREAM有更多的行相同的密钥。

I'm running a Spark Streaming application for every 10 seconds, its job is to consume data from kafka, transform it and store it into HDFS based on the key. i.e, a file per unique key. I'm using the Hadoop's saveAsHadoopFile() API to store the output, I see that a file gets generated for every unique key, but the issue is that only one row gets stored for each of the unique key though the DStream has more rows for the same key.

例如,请考虑以下DSTREAM它有一个独特的密钥,

For example, consider the following DStream which has one unique key,

  key                  value
 =====   =====================================
 Key_1   183.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0 
 Key_1   184.33 70.0 1.12 1.0 1.0 1.0 11.0 4.0 
 Key_1   181.33 70.0 2.12 1.0 1.0 1.0 11.0 4.0 
 Key_1   185.33 70.0 1.12 1.0 1.0 1.0 11.0 4.0 
 Key_1   185.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0 

我只看到一排(而不是5行)被存储在HDFS文件,

I see only one row (instead of 5 rows) gets stored in the HDFS file,

185.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0

以下code是用来输出存储到HDFS,

The following code is used to store the output into HDFS,

dStream.foreachRDD(new Function<JavaPairRDD<String, String>, Void> () {
    @Override
    public Void call(JavaPairRDD<String, String> pairRDD) throws Exception {
        long timestamp = System.currentTimeMillis();
        int randomInt = random.nextInt();
        pairRDD.saveAsHadoopFile("hdfs://localhost:9000/application-" + timestamp +"-"+ randomInt, String.class, String.class, RDDMultipleTextOutputFormat.class);
    }
});

在这里RDDMultipleTextOutputFormat实施如下,

where the implementation of RDDMultipleTextOutputFormat is as follows,

public class RDDMultipleTextOutputFormat<K,V> extends    MultipleTextOutputFormat<K,V> {

    public K generateActualKey(K key, V value) { 
        return null;
    }

    public String generateFileNameForKeyValue(K key, V value, String name) { 
        return key.toString();
    }
}

请让我知道如果我错过了什么?感谢您的帮助。

Please let me know if I'm missing anything? Thanks for your help.

推荐答案

由于关键是一样的,价值是越来越更换每一次,因此你越来越供给Hadoop的最后的值。

Because the key is same, the value is getting replaced every time and hence you are getting the last value supplied to hadoop.

这篇关于数据同时写入星火流输出到HDFS跳过的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆