如何仅从文件处理新记录? [英] How to process new records only from file?

查看:16
本文介绍了如何仅从文件处理新记录?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个方案来处理文件中的记录.文件中的数据会定期(每毫秒)添加一次.所以我需要读取文件并处理它,同时只处理新添加的记录.

I have a scenario to process records from a file. Data in the file are added periodically (every milliseconds). So i need to read the file and process it and at the same time process only newly added records.

我遇到了基于 Spark SQL 构建的 Spark Structured 流的概念.我正在做的是 -

I came across the concepts of Spark Structured streaming which is built on Spark SQL. What i am doing is -

  1. 每 1 秒触发一次文件流处理
  2. 对文件运行 Spark SQL 查询
  3. 以追加模式在控制台上写入查询的输出.

下面是相同的代码 -

public static class SparkStreamer implements Runnable,Serializable {
    @Override
    public void run() {
        processDataStream();

    }

    private void processDataStream() {

        Dataset<Row> rowData = spark.readStream().format("Text").load("C:\\Test\\App\\");

        Dataset<String> data = rowData.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() {

            @Override
            public Iterator<String> call(String row) throws Exception {
                return Arrays.asList(row.split("\\|")).iterator();
            }


        },Encoders.STRING());

        Dataset<Row> dataCount = data.select(new Column("value"));


        StreamingQuery query = dataCount.writeStream()
                  .outputMode("append")
                  .format("console")
                  .start();
        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

通过上述实现,查询执行了 1 次,但如果我在文件中添加新记录,则不会触发第二批执行.

With the above implementation, query is executed 1 time but then if i add new records in the file, second batch execution is not getting triggered.

其他观察:

  • 输出模式为完整&更新没有输出.仅在附加模式下,我才能获得 1 次输出.

有人可以帮助解决这个问题吗?Spark Structured Streaming 是否支持处理来自文件的数据,因为普通 Spark Streaming 不支持.

Can someone help to address this issue ? Does Spark Structured Streaming supports processing data from file, since normal Spark Streaming doesn't.

推荐答案

Spark Structured Streaming 是否支持处理来自文件的数据

Does Spark Structured Streaming supports processing data from file

是的.

查询执行 1 次,但如果我在文件中添加新记录,则不会触发第二批执行.

query is executed 1 time but then if i add new records in the file, second batch execution is not getting triggered.

一旦文件被标记为可见且不再处理,这将不会在处理后立即起作用(查看 FileStreamSource 负责查找其工作方式封面).

That's not going to work as right after a file has been processed once it is marked as seen and never processed again (review FileStreamSource that is responsible for it to find how it works under the covers).

推荐的解决方案是将新内容写入新文件.

The recommended solution is to write new content to a new file.

这篇关于如何仅从文件处理新记录?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆