如何只处理文件中的新记录? [英] How to process new records only from file?

查看:85
本文介绍了如何只处理文件中的新记录?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个方案来处理文件中的记录.定期(每毫秒)添加文件中的数据.因此,我需要读取文件并进行处理,同时只处理新添加的记录.

I have a scenario to process records from a file. Data in the file are added periodically (every milliseconds). So i need to read the file and process it and at the same time process only newly added records.

我遇到了基于Spark SQL构建的Spark结构化流的概念.我在做什么-

I came across the concepts of Spark Structured streaming which is built on Spark SQL. What i am doing is -

  1. 每1秒触发一次文件流处理
  2. 在文件上运行Spark SQL查询
  3. 以附加模式在控制台上写查询的输出.

下面是相同的代码-

public static class SparkStreamer implements Runnable,Serializable {
    @Override
    public void run() {
        processDataStream();

    }

    private void processDataStream() {

        Dataset<Row> rowData = spark.readStream().format("Text").load("C:\\Test\\App\\");

        Dataset<String> data = rowData.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() {

            @Override
            public Iterator<String> call(String row) throws Exception {
                return Arrays.asList(row.split("\\|")).iterator();
            }


        },Encoders.STRING());

        Dataset<Row> dataCount = data.select(new Column("value"));


        StreamingQuery query = dataCount.writeStream()
                  .outputMode("append")
                  .format("console")
                  .start();
        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

通过上述实现,查询将执行1次,但是如果我在文件中添加新记录,则不会触发第二批执行.

With the above implementation, query is executed 1 time but then if i add new records in the file, second batch execution is not getting triggered.

其他观察结果:

  • 输出模式为完整&更新后没有输出.仅在追加模式下,我才能获得1次输出.

有人可以帮助解决这个问题吗? Spark结构化流是否支持处理文件中的数据,因为普通的Spark流不支持.

Can someone help to address this issue ? Does Spark Structured Streaming supports processing data from file, since normal Spark Streaming doesn't.

推荐答案

Spark结构化流是否支持处理文件中的数据

Does Spark Structured Streaming supports processing data from file

是的

查询执行1次,但是如果我在文件中添加新记录,则不会触发第二批执行.

query is executed 1 time but then if i add new records in the file, second batch execution is not getting triggered.

在文件被标记为可见且不再进行处理之后,处理完文件后就无法正常工作(请查看

That's not going to work as right after a file has been processed once it is marked as seen and never processed again (review FileStreamSource that is responsible for it to find how it works under the covers).

推荐的解决方案是将新内容写入新文件.

The recommended solution is to write new content to a new file.

这篇关于如何只处理文件中的新记录?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆