spark-structured-streaming相关内容

Simple Spark Structured Streaming 等效于 KafkaUtils.createRDD,即通过指定偏移量将 kafka 主题读取到 RDD?

如何通过指定开始和结束偏移量将kafka主题中的数据读取到RDD? KafkaUtils.createRDD is 是实验性的,API 相当不愉快(它返回一个巨大的 Java ConsumerRecord 类,甚至无法序列化并把它放在 KafkaRDD 中,它覆盖了很多方法(比如persist)来抛出一个异常. 我想要的是这样一个简单的 API: case class Message ..

如何使用 Foreach Spark Structure 流更改插入 Cassandra 的记录的数据类型

我正在尝试使用 Spark Structure Streaming 和 Foreach Sink 将反序列化的 Kafka 记录插入到 Data Stax Cassandra. 例如,我的反序列化数据框数据和所有数据一样都是字符串格式. id 名称 日期100 'test' 系统日期 使用 foreach Sink 我创建了一个类并尝试通过转换来插入如下记录. session.exec ..

来自 Kafka 的 pySpark Structured Streaming 不会输出到控制台进行调试

下面是我的代码.我尝试了许多不同的选择变体,但应用程序可以运行,但没有显示每秒写入的消息.我有一个 Spark Streaming 示例,它使用 pprint() 确认 kafka 实际上每秒都在获取消息.Kafka 中的消息是 JSON 格式的,请参阅字段/列标签的架构: from pyspark.sql.functions import *从 pyspark.sql.types 导入 *进口 ..

如何将 kafka 上的火花流嵌套 json 转换为平面数据帧?

我第一次尝试将来自 Kafka 的 JSON 解析为 Spark 结构化流时需要一些帮助. 我正在努力转换传入的 JSON 并将其转换为平面数据帧以供进一步处理. 我的输入json是 [{ "siteId": "30:47:47:BE:16:8F", "siteData":[{ "dataseries": "trend-255", "values":[{"ts": 150271560 ..

Spark Structured Streaming 检查点在生产中的使用

在使用 Spark Structured 流时,我无法理解检查点的工作原理. 我有一个生成一些事件的 spark 进程,我将这些事件登录到 Hive 表中.对于这些事件,我会在 kafka 流中收到一个确认事件. 我创建了一个新的火花过程 将 Hive 日志表中的事件读入 DataFrame 使用 Spark Structured Streaming 将这些事件与确认事件流结合 ..

如何在 Spark Streaming + Kafka 中摆脱 NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe 错误

我想使用 Spark Streaming 并将其与 Kafka 连接.但是我仍然收到 NoSuchMethodError:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe 错误,现在我不知道下一步该做什么. 我的设置: Ubuntu 16.04 斯卡拉 2.11 Kafka 2.11-1.0.0(我也试过用 ..