spark-structured-streaming相关内容
我正在尝试对主题数据进行一些充实.因此使用 Spark 结构化流从 Kafka sink 读取回 Kafka. val ds = spark.readStream.format("卡夫卡").option("kafka.bootstrap.servers", bootstrapServers).option("group.id", groupId).option("订阅", "主题名称").加载
..
在我的场景中,我有几个数据集不时出现,我需要在我们的平台中摄取它们.摄取过程涉及几个转换步骤.其中之一是 Spark.到目前为止,我特别使用火花结构化流媒体.基础设施还涉及 kafka,spark 结构化流从中读取数据. 我想知道是否有一种方法可以检测到某个主题在一段时间内没有其他东西可以消费时决定停止工作.那就是我想在消耗该特定数据集所需的时间内运行它,然后停止它.出于特定原因,我们决定不
..
我有一个 Spark 结构化流应用程序 (v2.3.2),它需要从许多 Kafka 主题中读取数据,进行一些相对简单的处理(主要是聚合和一些连接)并将结果发布到许多其他 Kafka 主题.因此在同一个应用程序中处理多个流. 我想知道,如果我只设置 1 个订阅多个主题的直接 readStream,然后使用选择拆分流,从资源的角度(内存、执行程序、线程、Kafka 侦听器等)来看,它是否会有所不
..
我正在使用 Kafka 的结构化流源(集成指南),如前所述,它没有提交任何偏移量. 我的目标之一是监控它(检查它是否落后等).即使它没有提交偏移量,它也会通过不时查询 kafka 并检查下一个要处理的偏移量来处理它们.根据文档,偏移量被写入 HDFS,因此在发生故障时可以恢复,但问题是: 它们存放在哪里?如果不提交偏移量,是否有任何方法可以监视火花流(结构化)的 kafka 消耗(从程
..
我想使用 Python (PySpark) 从 Kafka 源到 MariaDB 执行 Spark Structured Streaming (Spark 2.4.x). 我想使用流式 Spark 数据帧,而不是静态或 Pandas 数据帧. 似乎必须使用 foreach 或 foreachBatch,因为根据 https://spark.apache.org/docs/latest/
..
我在我们的项目中使用 Structured Streaming + Kafka 进行实时数据分析.我使用的是 Spark 2.2,kafka 0.10.2. 我在应用程序启动时从检查点恢复流式查询时遇到问题.由于从单个 kafka 流点派生出多个流查询,并且每个流查询都有不同的检查目录.因此,在作业失败的情况下,当我们重新启动作业时,会出现一些无法从检查点位置恢复的流查询,因此会引发异常读取
..
我在 Spark 2.1.1 中使用结构化流.我需要对传入的消息应用一些业务逻辑(来自 Kafka 源). 本质上,我需要接收消息,获取一些关键值,在 HBase 中查找它们并在数据集上执行更多业务逻辑.最终结果是需要写出到另一个 Kafka 队列的字符串消息. 但是,由于传入消息的抽象是一个数据帧(无界表 - 结构化流),我必须通过 mapPartitions(由于 HBase 客户
..
我使用的是 kafka connect cassandra 源连接器 1.0 版本.我在 cassandra 表中有一个十进制数据类型列(价格),并将其作为来自源连接器的 json 写入 kafka 主题,它以某种字符串格式写入十进制值,例如 "price":"AA==".现在它在我的火花流中出错,同时将浮点数转换为“数字格式异常"....??请建议在 kafka 主题中写入值时可能出现的问题.提
..
我正在尝试读取来自 Kafka 主题的消息.消息格式如下(示例格式): {"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},
..
我遇到过 Spark 只能从 Kafka 2-patition 主题的一个分区流式传输和获取消息的情况. 我的主题:C:\bigdata\kafka_2.11-0.10.1.1\bin\windows>kafka-topics --create --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --topic t
..
有很多从 Kafka 读取 json 的在线示例(写入 parquet) - 但我不知道如何将架构应用于来自 kafka 的 CSV 字符串. 流式数据: customer_1945,cusaccid_995,27999941customer_1459,cusaccid_1102,27999942 架构: schema = StructType() \.add("customer_id
..
我想使用 PySpark Structured Streaming API 将结构流数据写入 Cassandra. 我的数据流如下: REST API -> Kafka -> Spark 结构化流 (PySpark) -> Cassandra 来源和版本如下:星火版本:2.4.3DataStax DSE:6.7.6-1 初始化火花: spark = SparkSessio
..
所以我有一些数据在 Kafka 主题中流式传输,我正在获取这些流式数据并将其放入 DataFrame.我想在 DataFrame 中显示数据: import os从 kafka 导入 KafkaProducer从 pyspark.sql 导入 SparkSession、DataFrame导入时间从日期时间导入日期时间,时间增量os.environ['PYSPARK_SUBMIT_ARGS'] =
..
我有一个用例,其中存储在 s3 中的 json 记录的文件路径作为 kafka卡夫卡中的消息.我必须使用 Spark 结构化流处理数据. 我想到的设计如下: 在 kafka Spark 结构化流中,读取包含数据路径的消息. 在驱动程序中收集消息记录.(消息很小) 从数据位置创建数据框. kafkaDf.select($"value".cast(StringType)).wri
..
我试图从 [Databricks][1] 复制示例并将其应用到 Kafka 的新连接器并激发结构化流,但是我无法使用 Spark 中的开箱即用方法正确解析 JSON... 注意:主题以JSON格式写入Kafka. val ds1 = spark.readStream.format("卡夫卡").option("kafka.bootstrap.servers", IP + ":9092").
..
基于Spark 3.0中的介绍,https:///spark.apache.org/docs/latest/structured-streaming-kafka-integration.html.应该可以设置“kafka.group.id"跟踪偏移量.对于我们的用例,如果流式 Spark 作业失败并重新启动,我想避免潜在的数据丢失.根据我之前的问题,我觉得 Spark 3.0 中的 kafka.g
..
我使用 Spark 2.1. 我正在尝试使用 Spark Structured Streaming 从 Kafka 读取记录,反序列化它们并在之后应用聚合. 我有以下代码: SparkSession spark = SparkSession.builder().appName("统计").getOrCreate();数据集df = 火花.readStream().format("卡夫
..
我读到 Spark Structured Streaming 不支持将 Kafka 消息读取为 JSON 的模式推断.有没有办法像 Spark Streaming 一样检索架构: val dataFrame = spark.read.json(rdd.map(_.value()))dataFrame.printschema 解决方案 这是一种可行的方法: 在开始流式传输之前,从 Kaf
..
我想使用 Spark Structured Streaming 从安全的 kafka 中读取数据.这意味着我需要强制使用特定的 group.id.但是,正如文档中所述,这是不可能的.尽管如此,在 databricks 文档 https://docs 中.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ss
..
Spark 2.2 引入了 Kafka 的结构化流媒体源.据我了解,它依赖于 HDFS 检查点目录来存储偏移量并保证“恰好一次"消息传递. 但是旧码头(例如 https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/) 说 Spark Stream
..