spark-structured-streaming相关内容

如何为结构化查询获取 Kafka 偏移量以进行手动和可靠的偏移量管理?

Spark 2.2 引入了 Kafka 的结构化流媒体源.据我了解,它依赖于 HDFS 检查点目录来存储偏移量并保证“恰好一次"消息传递. 但是旧码头(例如 https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/) 说 Spark Stream ..

为什么格式(“kafka")失败并显示“无法找到数据源:kafka".(即使是超级罐子)?

我使用 HDP-2.6.3.0 和 Spark2 包 2.2.0. 我正在尝试使用 Structured Streaming API 编写 Kafka 使用者,但在将作业提交到集群后出现以下错误: 线程“main"中的异常java.lang.ClassNotFoundException:无法找到数据源:kafka.请在 http://spark.apache.org/third-party ..

Spark Struded Streaming 自动将时间戳转换为本地时间

我的时间戳为 UTC 和 ISO8601,但使用结构化流,它会自动转换为本地时间.有没有办法阻止这种转换?我想在 UTC 中使用它. 我正在从 Kafka 读取 json 数据,然后使用 from_json Spark 函数解析它们. 输入: {"Timestamp":"2015-01-01T00:00:06.222Z"} 流程: SparkSession.builder().m ..

带有流源的查询必须使用 writeStream.start(); 执行;

我正在尝试使用 Spark 结构化流式传输从 Kafka 读取数据并预测表单传入数据.我正在使用我使用 Spark ML 训练过的模型. val spark = SparkSession.builder().appName("Spark SQL 基本示例").master("本地").getOrCreate()导入 spark.implicits._val toString = udf((pay ..

为什么启动流式查询会导致“ExitCodeException exitCode=-1073741515"?

一直在尝试习惯新的结构化流媒体,但是一旦我开始一个 .writeStream 查询,它就会不断给我以下错误. 知道是什么原因造成的吗?如果您在本地和 HDFS 之间拆分检查点和元数据文件夹,我能找到的最接近的是一个持续的 Spark 错误,但是.在 Windows 10、Spark 2.2 和 IntelliJ 上运行. 17/08/29 21:47:39 错误 StreamMetadat ..
发布时间:2021-11-12 05:45:30 其他开发

为什么 format(“kafka") 失败并显示“Failed to find data source: kafka."(即使是超级罐子)?

我使用 HDP-2.6.3.0 和 Spark2 包 2.2.0. 我正在尝试使用 Structured Streaming API 编写 Kafka 使用者,但在将作业提交到集群后出现以下错误: 线程“main"中的异常java.lang.ClassNotFoundException:无法找到数据源:kafka.请在 http://spark.apache.org/third-party ..

如何在 Spark 结构化流中手动设置 group.id 并提交 kafka 偏移量?

我正在阅读 Spark 结构化流媒体 - Kafka 集成指南 此处. 在这个链接中被告知 enable.auto.commit:Kafka 源不提交任何偏移量. 那么,一旦我的 Spark 应用程序成功处理了每条记录,我该如何手动提交偏移量? 解决方案 tl;dr 无法向 Kafka 提交任何消息.从 Spark 3.x 版本开始,您可以定义 Kafka 消费者组 ..

Spark Struded Streaming 自动将时间戳转换为本地时间

我的时间戳为 UTC 和 ISO8601,但使用结构化流,它会自动转换为本地时间.有没有办法阻止这种转换?我想在 UTC 中使用它. 我正在从 Kafka 读取 json 数据,然后使用 from_json Spark 函数解析它们. 输入: {"Timestamp":"2015-01-01T00:00:06.222Z"} 流程: SparkSession.builder().m ..

将 Spark Structured Streaming 与 Confluent Schema Registry 集成

我在 Spark Structured Streaming 中使用 Kafka Source 来接收 Confluent 编码的 Avro 记录.我打算使用 Confluent Schema Registry,但是与 Spark 结构化流的集成似乎是不可能的. 我见过这个问题,但无法在 Confluent Schema Registry 中使用.使用 Spark 2.0 从 Kafka 读取 ..

Kafka Connect cassandra 源 - 十进制数据类型的错误

我使用的是 kafka connect cassandra 源连接器 1.0 版本.我在 cassandra 表中有一个十进制数据类型列(价格),并将其作为来自源连接器的 json 写入 kafka 主题,它以某种字符串格式写入十进制值,例如 "price":"AA==".现在它在我的火花流中出错,同时将浮点数转换为“数字格式异常"....??请建议在 kafka 主题中写入值时可能出现的问题.提 ..

如何将 from_json 与 Kafka connect 0.10 和 Spark Structured Streaming 一起使用?

我试图从 [Databricks][1] 复制示例并将其应用到 Kafka 的新连接器并激发结构化流,但是我无法使用 Spark 中的开箱即用方法正确解析 JSON... 注意:主题以JSON格式写入Kafka. val ds1 = spark.readStream.format("卡夫卡").option("kafka.bootstrap.servers", IP + ":9092"). ..

将 Spark 流式 PySpark 数据帧写入 Cassandra 会覆盖表而不是附加

我正在运行 Kafka、Spark 和 Cassandra 的 1 节点集群.所有本地都在同一台机器上. 从一个简单的 Python 脚本中,我每 5 秒将一些虚拟数据流式传输到 Kafka 主题中.然后使用 Spark 结构化流,我将此数据流(一次一行)读入 PySpark DataFrame 中,startingOffset = latest.最后,我尝试将此行附加到现有的 Cassan ..

如何使用 Foreach Spark Structure 流更改插入 Cassandra 的记录的数据类型

我正在尝试使用 Spark Structure Streaming 和 Foreach Sink 将反序列化的 Kafka 记录插入到 Data Stax Cassandra. 例如,我的反序列化数据框数据和所有数据一样都是字符串格式. id 名称 日期100 'test' 系统日期 使用 foreach Sink 我创建了一个类并尝试通过转换来插入如下记录. session.exec ..