spark-streaming-kafka相关内容

未能找到话题的领导者;java.lang.NullPointerException NullPointerException 在 org.apache.kafka.common.utils.Utils.formatAddress

当我们尝试从启用 SSL 的 Kafka 主题流式传输数据时,我们面临以下错误.你能帮我们解决这个问题吗? 19/11/07 13:26:54 INFO ConsumerFetcherManager:[ConsumerFetcherManager-1573151189884] 为分区 ArrayBuffer() 添加了提取器19/11/07 13:26:54 WARN ConsumerFetch ..

spark如何计算给定窗口间隔的窗口开始时间?

考虑我有一个带有时间戳字段列的输入 df 并且在将窗口持续时间(没有滑动间隔)设置为: 10 分钟 输入时间(2019-02-28 22:33:02) 形成的窗口为 (2019-02-28 22:30:02) to (2019-02-28 22:40:02) 8 分钟 输入相同的时间(2019-02-28 22:33:02) 形成的窗口为 (2019-02-28 22: ..

反序列化来自 Kafka 主题的 Spark 结构化流数据

我正在使用 Kafka 2.3.0 和 Spark 2.3.4.我已经构建了一个 Kafka 连接器,它读取 CSV 文件并将一行从 CSV 发布到相关的 Kafka 主题.该行是这样的:"201310,XYZ001,Sup,XYZ,A,0,Presales,6,Callout,0,0,1,N,Prospect".CSV 包含 1000 条这样的行.连接器能够成功地将它们发布到主题上,我也能够在 ..
发布时间:2021-11-12 02:36:21 其他开发

Spark Streaming Kafka createDirectStream - Spark UI 将输入事件大小显示为零

我已经使用 createDirectStream 实现了 Spark Streaming.我的 Kafka 生产者每秒向具有两​​个分区的主题发送多条消息. 在 Spark 流媒体方面,我每秒读取一次 kafka 消息,然后我将它们按 5 秒的窗口大小和频率进行窗口化. Kafka 消息得到了正确处理,我看到了正确的计算和打印. 但在 Spark Web UI 中,在 Strea ..

Spark Streaming kafka 偏移管理

我一直在做火花流作业,通过 kafka 消费和生产数据.我用的是directDstream,所以必须自己管理offset,我们采用redis来写和读offset.现在有一个问题,当我启动我的客户端时,我的客户端需要从redis中获取offset,而不是kafka中存在的offset它自己.如何显示我编写的代码?现在我已经在下面编写了代码: kafka_stream = KafkaUtils.c ..

未能找到话题的领导者;java.lang.NullPointerException NullPointerException at org.apache.kafka.common.utils.Utils.formatAddress

当我们尝试从启用 SSL 的 Kafka 主题流式传输数据时,我们面临以下错误.你能帮我们解决这个问题吗? 19/11/07 13:26:54 INFO ConsumerFetcherManager:[ConsumerFetcherManager-1573151189884] 为分区 ArrayBuffer() 添加了提取器19/11/07 13:26:54 警告 ConsumerFetcher ..

pyspark 结构化流式写入分批写入镶木地板

我正在对 Spark 结构化流数据帧进行一些转换.我将转换后的数据帧作为镶木地板文件存储在 hdfs 中.现在我希望写入 hdfs 应该分批进行,而不是先转换整个数据帧然后存储数据帧. 解决方案 这是一个镶木地板水槽示例: # 镶木地板水槽示例targetParquetHDFS = sourceTopicKAFKA.writeStream.format("parquet") # 可以是 ..

spark如何计算给定窗口间隔下的窗口开始时间?

考虑一下,当将窗口持续时间(无滑动间隔)设置为时,我有一个带有时间戳字段列的输入df: 10分钟 输入时间(2019-02-28 22:33:02) 形成的窗口为(2019-02-28 22:30:02)到(2019-02-28 22:40:02) 8分钟 使用相同的时间输入(2019-02-28 22:33:02) 形成的窗口为((2019-02-28 22:26:0 ..

如何在Spark结构化流上执行单元测试?

我想了解Spark结构化流的单元测试方面.我的情况是,我要从Kafka获取数据,然后使用Spark结构化流对其进行消费,并在数据之上进行一些转换. 我不确定如何使用Scala和Spark进行测试.有人可以告诉我如何使用Scala在结构化流中进行单元测试.我是流媒体新手. 解决方案 tl; dr 使用 MemoryStream 为输出添加事件和内存接收器. 以下代码应有助于入门: ..

在同一个Spark会话中运行多个Spark Kafka结构化流查询会增加偏移量,但显示numInputRows 0

我有一个Spark结构化流,使用了来自Kafka主题的2个分区的记录. Spark Job::2个查询,每个查询都来自2个单独的分区,并在同一spark会话中运行. val df1 = session.readStream.format("kafka").option("kafka.bootstrap.servers",kafkaBootstrapServer).option(“分配", ..
发布时间:2021-04-08 19:25:55 其他开发