Spark Streaming 不会在应用程序 UI 上显示任何记录 [英] Spark Streaming does not display any record on application UI

查看:88
本文介绍了Spark Streaming 不会在应用程序 UI 上显示任何记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 spark 新手,我正在尝试运行一个简单的 spark 流应用程序,该应用程序从 csv 文件中读取数据并显示它.似乎火花流工作,但它仍然在流用户界面应用程序上显示0"条记录.这是我的代码:

public class App {public static void main(String[] args) 抛出异常 {//获取 spark-conf 的实例,需要构建 spark 会话SparkConf conf = new SparkConf().setAppName("StreamingExample").setMaster("local");JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(3000));//JavaSparkContext ssc= new JavaSparkContext(conf);jsc.checkpoint("检查点");System.out.println("会话创建");JavaDStream <字符串 >lines = jsc.textFileStream("C:\\Users\\Areeha\\eclipse-workspace\\learnspark\\src\\main\\java\\com\\example\\learnspark");线.打印();lines.foreachRDD(rdd - > rdd.foreach(x - > System.out.println(x)));JavaPairDStream <LongWritable,文本 >streamedFile = jsc.fileStream("C:\\Users\\Areeha\\eclipse-workspace\\learnspark\\src\\main\\java\\com\\example\\learnspark", LongWritable.class, Text.class, TextInputFormat.class);streamedFile.print();System.out.println("文件已加载!");System.out.println(streamedFile.count());System.out.println(lines.count());jsc.start();尝试 {jsc.awaitTermination();} catch (InterruptedException e) {//TODO 自动生成的 catch 块e.printStackTrace();}}}

这是我在控制台上得到的:

使用 Spark 的默认 log4j 配置文件:org/apache/spark/log4j-defaults.properties19/11/21 09:24:50 INFO SparkContext:运行 Spark 版本 2.4 .419/11/21 09:24:50 警告 NativeCodeLoader:无法加载本机 - hadoop 库适用于您的平台...使用内置 - 适用的 java 类19/11/21 09:24:50 INFO SparkContext:提交的应用程序:StreamingExample19/11/21 09:24:50 INFO SecurityManager:将视图 acls 更改为:Areeha19/11/21 09:24:50 INFO SecurityManager:将修改 acls 更改为:Areeha19/11/21 09:24:50 INFO SecurityManager:将视图 acls 组更改为:19/11/21 09:24:50 INFO SecurityManager:将修改 acls 组更改为:19/11/21 09:24:50 信息安全管理器:安全管理器:身份验证已禁用;ui acls 已禁用;具有查看权限的用户:Set(Areeha);具有查看权限的组:Set();具有修改权限的用户:Set(Areeha);具有修改权限的组:Set()19/11/21 09:24:51 INFO Utils:成功启动服务sparkDriver"在端口 57635 上.19/11/21 09:24:51 INFO SparkEnv:注册 MapOutputTracker19/11/21 09:24:51 INFO SparkEnv:注册 BlockManagerMaster19/11/21 09:24:51 INFO BlockManagerMasterEndpoint:使用 org.apache.spark.storage.DefaultTopologyMapper用于获取拓扑信息19/11/21 09:24:51 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up19/11/21 09: 24: 51 INFO DiskBlockManager:在 C:\Users\Areeha\AppData\Local\Temp\blockmgr 创建本地目录 - 9 d8ba7c2 - 3 b21 - 419 c - 8711 - d85f7d1704a119/11/21 09:24:51 INFO MemoryStore:MemoryStore 启动容量为 1443.6 MB19/11/21 09:24:51 INFO SparkEnv:注册 OutputCommitCoordinator19/11/21 09:24:52 INFO Utils:成功启动服务SparkUI"在端口 4040 上.19/11/21 09:24:52 INFO SparkUI:将 SparkUI 绑定到 0.0 .0 .0,并从 http://192.168.2.8:4040 开始19/11/21 09:24:52 INFO Executor:在主机本地主机上启动执行程序 ID 驱动程序19/11/21 09:24:52 INFO Utils:成功启动服务'org.apache.spark.network.netty.NettyBlockTransferService'在端口 57648 上.19/11/21 09:24:52 信息 NettyBlockTransferService:在 192.168 .2 .8 上创建的服务器:5764819/11/21 09:24:52 信息块管理器:使用 org.apache.spark.storage.RandomBlockReplicationPolicy用于块复制策略19/11/21 09:24:52 INFO BlockManagerMaster:注册 BlockManager BlockManagerId(driver, 192.168 .2 .8, 57648, None)19/11/21 09: 24: 52 INFO BlockManagerMasterEndpoint: 注册块管理器 192.168 .2 .8: 57648 与 1443.6 MB RAM, BlockManagerId(driver, 192.168 .2 .8, 57648, None)19/11/21 09:24:52 INFO BlockManagerMaster:注册的BlockManager BlockManagerId(驱动程序,192.168 .2 .8,57648,无)19/11/21 09:24:52 INFO BlockManager: 初始化 BlockManager: BlockManagerId(driver, 192.168 .2 .8, 57648, None)19/11/21 09:24:52 WARN StreamingContext:spark.master 应设置为 local[n],n >1 本地模式如果您有接收器来获取数据,否则 Spark 作业将无法获取资源来处理接收到的数据.会话已创建19/11/21 09:24:52 信息 FileInputDStream:持续时间用于记住设置为 60000 毫秒的 RDD对于 org.apache.spark.streaming.dstream.FileInputDStream @14151bc519/11/21 09:24:52 信息 FileInputDStream:持续时间用于记住设置为 60000 毫秒的 RDD对于 org.apache.spark.streaming.dstream.FileInputDStream @151335cb文件已加载!org.apache.spark.streaming.api.java.JavaDStream @46d8f407org.apache.spark.streaming.api.java.JavaDStream @2788d0fe19/11/21 09:24:53 INFO FileBasedWriteAheadLog_ReceivedBlockTracker:从文件中恢复了 4 个预写日志文件:/C:/Users/Areeha/eclipse-workspace/learnspark/checkpoint/receiveBlockMetadata19/11/21 09:24:53 INFO FileInputDStream:滑动时间 = 3000 毫秒19/11/21 09:24:53 INFO FileInputDStream:存储级别 = 序列化 1 x 复制19/11/21 09:24:53 INFO FileInputDStream:检查点间隔=空19/11/21 09:24:53 INFO FileInputDStream:记住间隔 = 60000 毫秒19/11/21 09:24:53 INFO FileInputDStream:初始化并验证 org.apache.spark.streaming.dstream.FileInputDStream @14151bc519/11/21 09:24:53 INFO MappedDStream:滑动时间 = 3000 ms19/11/21 09:24:53 INFO MappedDStream:存储级别 = 序列化 1 x 复制19/11/21 09:24:53 INFO MappedDStream:检查点间隔=空19/11/21 09:24:53 INFO MappedDStream:记住间隔 = 3000 毫秒19/11/21 09:24:53 INFO MappedDStream:初始化并验证 org.apache.spark.streaming.dstream.MappedDStream @528f8f8b19/11/21 09:24:53 信息 ForEachDStream:滑动时间 = 3000 毫秒19/11/21 09:24:53 信息 ForEachDStream:存储级别 = 序列化 1 x 复制19/11/21 09:24:53 INFO ForEachDStream:检查点间隔 = null19/11/21 09:24:53 信息 ForEachDStream:记住间隔 = 3000 毫秒19/11/21 09:24:53 信息 ForEachDStream:初始化并验证 org.apache.spark.streaming.dstream.ForEachDStream @4cbf4f5319/11/21 09:24:53 INFO FileInputDStream:滑动时间 = 3000 ms19/11/21 09:24:53 INFO FileInputDStream:存储级别 = 序列化 1 x 复制19/11/21 09:24:53 INFO FileInputDStream:检查点间隔=空19/11/21 09:24:53 INFO FileInputDStream:记住间隔 = 60000 毫秒19/11/21 09:24:53 INFO FileInputDStream:初始化并验证 org.apache.spark.streaming.dstream.FileInputDStream @14151bc519/11/21 09:24:53 INFO MappedDStream:滑动时间 = 3000 ms19/11/21 09:24:53 INFO MappedDStream:存储级别 = 序列化 1 x 复制19/11/21 09:24:53 INFO MappedDStream:检查点间隔=空19/11/21 09:24:53 INFO MappedDStream:记住间隔 = 3000 毫秒19/11/21 09:24:53 INFO MappedDStream:初始化并验证 org.apache.spark.streaming.dstream.MappedDStream @528f8f8b19/11/21 09:24:53 信息 ForEachDStream:滑动时间 = 3000 毫秒19/11/21 09:24:53 信息 ForEachDStream:存储级别 = 序列化 1 x 复制19/11/21 09:24:53 INFO ForEachDStream:检查点间隔 = null19/11/21 09:24:53 信息 ForEachDStream:记住间隔 = 3000 毫秒19/11/21 09:24:53 信息 ForEachDStream:初始化并验证 org.apache.spark.streaming.dstream.ForEachDStream @58d63b1619/11/21 09:24:53 INFO FileInputDStream:滑动时间 = 3000 ms19/11/21 09:24:53 INFO FileInputDStream:存储级别 = 序列化 1 x 复制19/11/21 09:24:53 INFO FileInputDStream:检查点间隔=空19/11/21 09:24:53 INFO FileInputDStream:记住间隔 = 60000 毫秒19/11/21 09:24:53 INFO FileInputDStream:初始化并验证 org.apache.spark.streaming.dstream.FileInputDStream @151335cb19/11/21 09:24:53 信息 ForEachDStream:滑动时间 = 3000 毫秒19/11/21 09:24:53 信息 ForEachDStream:存储级别 = 序列化 1 x 复制19/11/21 09:24:53 INFO ForEachDStream:检查点间隔 = null19/11/21 09:24:53 信息 ForEachDStream:记住间隔 = 3000 毫秒19/11/21 09:24:53 信息 ForEachDStream:初始化并验证 org.apache.spark.streaming.dstream.ForEachDStream @748e9b2019/11/21 09: 24: 53 INFO RecurringTimer:启动计时器对于 JobGenerator 在时间 157434989400019/11/21 09:24:53 INFO JobGenerator:在 1574349894000 ms 启动 JobGenerator19/11/21 09:24:53 INFO JobScheduler:启动JobScheduler19/11/21 09:24:53 INFO StreamingContext:StreamingContext 已启动19/11/21 09:24:54 INFO FileInputDStream:查找新文件耗时 9 毫秒19/11/21 09: 24: 54 INFO FileInputDStream: 时间为 1574349894000 毫秒的新文件:19/11/21 09:24:54 INFO FileInputDStream:查找新文件耗时 3 毫秒19/11/21 09: 24: 54 INFO FileInputDStream: 时间为 1574349894000 毫秒的新文件:19/11/21 09:24:54 INFO JobScheduler:添加的工作时间 1574349894000 毫秒19/11/21 09:24:54 INFO JobGenerator:检查点图时间 1574349894000 毫秒19/11/21 09:24:54 INFO DStreamGraph:更新检查点数据时间 1574349894000 毫秒19/11/21 09: 24: 54 INFO JobScheduler:从作业集时间 1574349894000 ms 开始作业流作业 1574349894000 ms .019/11/21 09:24:54 INFO DStreamGraph:更新检查点数据时间 1574349894000 毫秒19/11/21 09:24:54 INFO CheckpointWriter: 提交时间为 1574349894000 毫秒的检查点到写入器队列19/11/21 09:24:54 INFO CheckpointWriter:保存检查点时间 1574349894000 毫秒到文件 'file:/C:/Users/Areeha/eclipse-workspace/learnspark/checkpoint/checkpoint-1574349894000'-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --时间:1574349894000毫秒-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --19/11/21 09: 24: 54 INFO JobScheduler:完成作业流作业 1574349894000 ms .0 从作业集时间 1574349894000 ms19/11/21 09: 24: 54 INFO JobScheduler:从作业集时间 1574349894000 ms 开始作业流作业 1574349894000 ms .119/11/21 09:24:54 INFO SparkContext:开始工作:App.java 上的 foreach:7919/11/21 09:24:54 INFO DAGScheduler:作业 0 完成:App.java 的 foreach:79,耗时 0.002286 秒19/11/21 09: 24: 54 INFO JobScheduler:完成作业流作业 1574349894000 ms .1 从作业集时间 1574349894000 ms19/11/21 09: 24: 54 INFO JobScheduler:从作业时间 1574349894000 ms 开始作业流作业 1574349894000 ms .2-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --时间:1574349894000毫秒-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -

以下内容出现在我的流式 UI 应用程序中:在此处输入图片说明

我不知道我做错了什么.它既不显示任何内容,也不向其中添加任何记录.我之前指定了确切的 csv 文件,但它不起作用,所以我尝试提供包含 csv 的整个文件夹的路径.有人知道我错过了什么吗?提前致谢.

解决方案

TextFileStream 不使用 Receiver 线程,因此不会像其他线程一样在 UI 中记录记录来源:

文件流为了从与 HDFS API 兼容的任何文件系统(即 HDFS、S3、NFS 等)上的文件读取数据,可以通过 StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass] 创建 DStream.文件流不需要运行接收器,因此不需要分配任何内核来接收文件数据.

来源:https://spark.apache.org/docs/2.3.1/streaming-custom-receivers.html

有人在 此 JIRA 票 上打开了 PR,并更改了 Spark逻辑,因此此信息但票证没有设置修复版本.

为了知道每批输入了多少条记录,我通常做的是在forEachRDD中记录RDD时的计数:

lines.forEachRDD( rdd -> {//如果您正在处理大型 RDD,您可能希望在计数之前缓存 rddlogger.debug(s${rdd.count() 记录找到")})

另外关于你的文件没有被处理,你可能想在你的日志记录中设置这个包org.apache.spark.streaming.dstream.FileInputDStreamDEBUG配置,因为它说明它看到"哪些文件以及为什么需要它(主要是因为时间戳太旧了).

I am new to spark and I am trying to run a simple spark streaming application that reads data from a csv file and displays it. Seems like spark streaming works but it still shows "0" records on the Streaming UI application.Here is my code:

public class App {
  public static void main(String[] args) throws Exception {
    // Get an instance of spark-conf, required to build the spark session
    SparkConf conf = new SparkConf().setAppName("StreamingExample").setMaster("local");
    JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(3000));
    //JavaSparkContext ssc= new JavaSparkContext(conf);
    jsc.checkpoint("checkpoint");

    System.out.println("Session created");

    JavaDStream < String > lines = jsc.textFileStream("C:\\Users\\Areeha\\eclipse-workspace\\learnspark\\src\\main\\java\\com\\example\\learnspark");
    lines.print();
    lines.foreachRDD(rdd - > rdd.foreach(x - > System.out.println(x)));

    JavaPairDStream < LongWritable, Text > streamedFile = jsc.fileStream("C:\\Users\\Areeha\\eclipse-workspace\\learnspark\\src\\main\\java\\com\\example\\learnspark", LongWritable.class, Text.class, TextInputFormat.class);
    streamedFile.print();
    System.out.println("File loaded!");
    System.out.println(streamedFile.count());
    System.out.println(lines.count());

    jsc.start();
    try {
      jsc.awaitTermination();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }


  }
}

This is what I get on console:

Using Spark 's default log4j profile: org/apache/spark/log4j-defaults.properties
19 / 11 / 21 09: 24: 50 INFO SparkContext: Running Spark version 2.4 .4
19 / 11 / 21 09: 24: 50 WARN NativeCodeLoader: Unable to load native - hadoop library
for your platform...using builtin - java classes where applicable
19 / 11 / 21 09: 24: 50 INFO SparkContext: Submitted application: StreamingExample
19 / 11 / 21 09: 24: 50 INFO SecurityManager: Changing view acls to: Areeha
19 / 11 / 21 09: 24: 50 INFO SecurityManager: Changing modify acls to: Areeha
19 / 11 / 21 09: 24: 50 INFO SecurityManager: Changing view acls groups to:
  19 / 11 / 21 09: 24: 50 INFO SecurityManager: Changing modify acls groups to:
  19 / 11 / 21 09: 24: 50 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled;
users with view permissions: Set(Areeha);
groups with view permissions: Set();
users with modify permissions: Set(Areeha);
groups with modify permissions: Set()
19 / 11 / 21 09: 24: 51 INFO Utils: Successfully started service 'sparkDriver'
on port 57635.
19 / 11 / 21 09: 24: 51 INFO SparkEnv: Registering MapOutputTracker
19 / 11 / 21 09: 24: 51 INFO SparkEnv: Registering BlockManagerMaster
19 / 11 / 21 09: 24: 51 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper
for getting topology information
19 / 11 / 21 09: 24: 51 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19 / 11 / 21 09: 24: 51 INFO DiskBlockManager: Created local directory at C: \Users\ Areeha\ AppData\ Local\ Temp\ blockmgr - 9 d8ba7c2 - 3 b21 - 419 c - 8711 - d85f7d1704a1
19 / 11 / 21 09: 24: 51 INFO MemoryStore: MemoryStore started with capacity 1443.6 MB
19 / 11 / 21 09: 24: 51 INFO SparkEnv: Registering OutputCommitCoordinator
19 / 11 / 21 09: 24: 52 INFO Utils: Successfully started service 'SparkUI'
on port 4040.
19 / 11 / 21 09: 24: 52 INFO SparkUI: Bound SparkUI to 0.0 .0 .0, and started at http: //192.168.2.8:4040
  19 / 11 / 21 09: 24: 52 INFO Executor: Starting executor ID driver on host localhost
19 / 11 / 21 09: 24: 52 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService'
on port 57648.
19 / 11 / 21 09: 24: 52 INFO NettyBlockTransferService: Server created on 192.168 .2 .8: 57648
19 / 11 / 21 09: 24: 52 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy
for block replication policy
19 / 11 / 21 09: 24: 52 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168 .2 .8, 57648, None)
19 / 11 / 21 09: 24: 52 INFO BlockManagerMasterEndpoint: Registering block manager 192.168 .2 .8: 57648 with 1443.6 MB RAM, BlockManagerId(driver, 192.168 .2 .8, 57648, None)
19 / 11 / 21 09: 24: 52 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168 .2 .8, 57648, None)
19 / 11 / 21 09: 24: 52 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168 .2 .8, 57648, None)
19 / 11 / 21 09: 24: 52 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode
if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
Session created
19 / 11 / 21 09: 24: 52 INFO FileInputDStream: Duration
for remembering RDDs set to 60000 ms
for org.apache.spark.streaming.dstream.FileInputDStream @14151bc5
19 / 11 / 21 09: 24: 52 INFO FileInputDStream: Duration
for remembering RDDs set to 60000 ms
for org.apache.spark.streaming.dstream.FileInputDStream @151335cb
File loaded!
  org.apache.spark.streaming.api.java.JavaDStream @46d8f407
org.apache.spark.streaming.api.java.JavaDStream @2788d0fe
19 / 11 / 21 09: 24: 53 INFO FileBasedWriteAheadLog_ReceivedBlockTracker: Recovered 4 write ahead log files from file: /C:/Users / Areeha / eclipse - workspace / learnspark / checkpoint / receivedBlockMetadata
19 / 11 / 21 09: 24: 53 INFO FileInputDStream: Slide time = 3000 ms
19 / 11 / 21 09: 24: 53 INFO FileInputDStream: Storage level = Serialized 1 x Replicated
19 / 11 / 21 09: 24: 53 INFO FileInputDStream: Checkpoint interval = null
19 / 11 / 21 09: 24: 53 INFO FileInputDStream: Remember interval = 60000 ms
19 / 11 / 21 09: 24: 53 INFO FileInputDStream: Initialized and validated org.apache.spark.streaming.dstream.FileInputDStream @14151bc5
19 / 11 / 21 09: 24: 53 INFO MappedDStream: Slide time = 3000 ms
19 / 11 / 21 09: 24: 53 INFO MappedDStream: Storage level = Serialized 1 x Replicated
19 / 11 / 21 09: 24: 53 INFO MappedDStream: Checkpoint interval = null
19 / 11 / 21 09: 24: 53 INFO MappedDStream: Remember interval = 3000 ms
19 / 11 / 21 09: 24: 53 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream @528f8f8b
19 / 11 / 21 09: 24: 53 INFO ForEachDStream: Slide time = 3000 ms
19 / 11 / 21 09: 24: 53 INFO ForEachDStream: Storage level = Serialized 1 x Replicated
19 / 11 / 21 09: 24: 53 INFO ForEachDStream: Checkpoint interval = null
19 / 11 / 21 09: 24: 53 INFO ForEachDStream: Remember interval = 3000 ms
19 / 11 / 21 09: 24: 53 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream @4cbf4f53
19 / 11 / 21 09: 24: 53 INFO FileInputDStream: Slide time = 3000 ms
19 / 11 / 21 09: 24: 53 INFO FileInputDStream: Storage level = Serialized 1 x Replicated
19 / 11 / 21 09: 24: 53 INFO FileInputDStream: Checkpoint interval = null
19 / 11 / 21 09: 24: 53 INFO FileInputDStream: Remember interval = 60000 ms
19 / 11 / 21 09: 24: 53 INFO FileInputDStream: Initialized and validated org.apache.spark.streaming.dstream.FileInputDStream @14151bc5
19 / 11 / 21 09: 24: 53 INFO MappedDStream: Slide time = 3000 ms
19 / 11 / 21 09: 24: 53 INFO MappedDStream: Storage level = Serialized 1 x Replicated
19 / 11 / 21 09: 24: 53 INFO MappedDStream: Checkpoint interval = null
19 / 11 / 21 09: 24: 53 INFO MappedDStream: Remember interval = 3000 ms
19 / 11 / 21 09: 24: 53 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream @528f8f8b
19 / 11 / 21 09: 24: 53 INFO ForEachDStream: Slide time = 3000 ms
19 / 11 / 21 09: 24: 53 INFO ForEachDStream: Storage level = Serialized 1 x Replicated
19 / 11 / 21 09: 24: 53 INFO ForEachDStream: Checkpoint interval = null
19 / 11 / 21 09: 24: 53 INFO ForEachDStream: Remember interval = 3000 ms
19 / 11 / 21 09: 24: 53 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream @58d63b16
19 / 11 / 21 09: 24: 53 INFO FileInputDStream: Slide time = 3000 ms
19 / 11 / 21 09: 24: 53 INFO FileInputDStream: Storage level = Serialized 1 x Replicated
19 / 11 / 21 09: 24: 53 INFO FileInputDStream: Checkpoint interval = null
19 / 11 / 21 09: 24: 53 INFO FileInputDStream: Remember interval = 60000 ms
19 / 11 / 21 09: 24: 53 INFO FileInputDStream: Initialized and validated org.apache.spark.streaming.dstream.FileInputDStream @151335cb
19 / 11 / 21 09: 24: 53 INFO ForEachDStream: Slide time = 3000 ms
19 / 11 / 21 09: 24: 53 INFO ForEachDStream: Storage level = Serialized 1 x Replicated
19 / 11 / 21 09: 24: 53 INFO ForEachDStream: Checkpoint interval = null
19 / 11 / 21 09: 24: 53 INFO ForEachDStream: Remember interval = 3000 ms
19 / 11 / 21 09: 24: 53 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream @748e9b20
19 / 11 / 21 09: 24: 53 INFO RecurringTimer: Started timer
for JobGenerator at time 1574349894000
19 / 11 / 21 09: 24: 53 INFO JobGenerator: Started JobGenerator at 1574349894000 ms
19 / 11 / 21 09: 24: 53 INFO JobScheduler: Started JobScheduler
19 / 11 / 21 09: 24: 53 INFO StreamingContext: StreamingContext started
19 / 11 / 21 09: 24: 54 INFO FileInputDStream: Finding new files took 9 ms
19 / 11 / 21 09: 24: 54 INFO FileInputDStream: New files at time 1574349894000 ms:

  19 / 11 / 21 09: 24: 54 INFO FileInputDStream: Finding new files took 3 ms
19 / 11 / 21 09: 24: 54 INFO FileInputDStream: New files at time 1574349894000 ms:

  19 / 11 / 21 09: 24: 54 INFO JobScheduler: Added jobs
for time 1574349894000 ms
19 / 11 / 21 09: 24: 54 INFO JobGenerator: Checkpointing graph
for time 1574349894000 ms
19 / 11 / 21 09: 24: 54 INFO DStreamGraph: Updating checkpoint data
for time 1574349894000 ms
19 / 11 / 21 09: 24: 54 INFO JobScheduler: Starting job streaming job 1574349894000 ms .0 from job set of time 1574349894000 ms
19 / 11 / 21 09: 24: 54 INFO DStreamGraph: Updated checkpoint data
for time 1574349894000 ms
19 / 11 / 21 09: 24: 54 INFO CheckpointWriter: Submitted checkpoint of time 1574349894000 ms to writer queue
19 / 11 / 21 09: 24: 54 INFO CheckpointWriter: Saving checkpoint
for time 1574349894000 ms to file 'file:/C:/Users/Areeha/eclipse-workspace/learnspark/checkpoint/checkpoint-1574349894000'
  -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -
  Time: 1574349894000 ms
  -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -

  19 / 11 / 21 09: 24: 54 INFO JobScheduler: Finished job streaming job 1574349894000 ms .0 from job set of time 1574349894000 ms
19 / 11 / 21 09: 24: 54 INFO JobScheduler: Starting job streaming job 1574349894000 ms .1 from job set of time 1574349894000 ms
19 / 11 / 21 09: 24: 54 INFO SparkContext: Starting job: foreach at App.java: 79
19 / 11 / 21 09: 24: 54 INFO DAGScheduler: Job 0 finished: foreach at App.java: 79, took 0.002286 s
19 / 11 / 21 09: 24: 54 INFO JobScheduler: Finished job streaming job 1574349894000 ms .1 from job set of time 1574349894000 ms
19 / 11 / 21 09: 24: 54 INFO JobScheduler: Starting job streaming job 1574349894000 ms .2 from job set of time 1574349894000 ms
  -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -
  Time: 1574349894000 ms
  -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -

And following appears on my Streaming UI application:enter image description here

I don't know what I am doing wrong. It is neither displaying anything nor adding any record to it. I earlier had specified the exact csv file, which did not work so I tried giving the path of the entire folder that has csv.Does anyone have any idea what am I missing? Thanks in advance.

解决方案

TextFileStream does not use a Receiver thread and therefore does not log the records in the UI as other sources do:

File Streams
For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as via StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass].

File streams do not require running a receiver so there is no need to allocate any cores for receiving file data.

Source: https://spark.apache.org/docs/2.3.1/streaming-custom-receivers.html

Somebody opened a PR on this JIRA ticket with changes in the Spark logic so this information but the ticket does not have a fix version set.

What I usually do to know how many records entered each batch, is to log the count when processing the RDD in the forEachRDD:

lines.forEachRDD( rdd -> {
// You might want to cache the rdd before counting if you are dealing with large RDDs
logger.debug(s"${rdd.count() records found")
})

Edit: Also regarding your file not being processed, you might want to set to DEBUG this package org.apache.spark.streaming.dstream.FileInputDStream in your logging configuration since it says which files it "sees" and why does it take it or not (mostly is because the timestamp being too old).

这篇关于Spark Streaming 不会在应用程序 UI 上显示任何记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆