Flink:将带有CSV标头的元组写入文件 [英] Flink: Write tuples with CSV header into file

查看:593
本文介绍了Flink:将带有CSV标头的元组写入文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用Flink(带有Hadoop的1.7.1)进行了一些数据处理.最后,我想将包含2个元组的数据集写入文件中.目前,我正在这样做:

I did some data processing using Flink (1.7.1 with Hadoop). At the end I'd like to write the dataset consisting of 2-tuples into a file. Currently, I am doing it like this:

<Tuple2<Integer, Point>> pointsClustered = points.getClusteredPoints(...);
pointsClustered.writeAsCsv(params.get("output"), "\n", ",");

但是,我想将CSV标头写入第一行.

However, I would like to have the CSV headers written into the first line. The Flink's Javadoc API doesn't state any options for this. Furthermore, I couldn't find any solution googling for it.

请问您如何做到这一点.非常感谢!

Could you kindly advise on how to accomplish that. Thanks a lot!

推荐答案

Flink自己的CsvOutputFormat不支持此功能.您可以做的是扩展CsvOutputFormat并覆盖open方法,该方法在打开格式时会写入标头.然后,您将使用DataSet#output指定新创建的输出格式:

Flink's own CsvOutputFormat does not support this functionality. What you could do is to extend the CsvOutputFormat and override the open method which writes the header when the format is opened. Then you would use DataSet#output to specify the newly created output format:

public static void main(String[] args) throws Exception {
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    DataSource<Integer> input = env.fromElements(1, 2, 3);
    DataSet<Tuple3<Integer, String, Double>> result = input.map((MapFunction<Integer, Tuple3<Integer, String, Double>>) integer -> Tuple3.of(integer, integer.toString(), 42.0));

    Path outputPath = new Path("hdfs:///foobar");
    result.output(new MyCsvOutputFormat(outputPath));

    env.execute();
}

private static class MyCsvOutputFormat<T extends Tuple> extends CsvOutputFormat<T> {

    public MyCsvOutputFormat(Path outputPath) {
        super(outputPath);
    }

    @Override
    public void open(int taskNumber, int numTasks) throws IOException {
        try (PrintWriter wrt = new PrintWriter(stream)) {
            wrt.println("Foo|bar|foobar");
        }
        super.open(taskNumber, numTasks);
    }
}

这篇关于Flink:将带有CSV标头的元组写入文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆