spark-structured-streaming相关内容
我正在阅读 Spark 结构化流媒体 - Kafka 集成指南 此处. 在这个链接中被告知 enable.auto.commit:Kafka 源不提交任何偏移量. 那么,一旦我的 Spark 应用程序成功处理了每条记录,我该如何手动提交偏移量? 解决方案 tl;dr 无法向 Kafka 提交任何消息.从 Spark 3.x 版本开始,您可以定义 Kafka 消费者组
..
我正在尝试使用 结构化流方法使用基于DataFrame/Dataset API的Spark-Streaming从Kafka加载数据流. 我使用: Spark 2.10 卡夫卡 0.10 spark-sql-kafka-0-10 Spark Kafka DataSource 已经定义了底层架构: |key|value|topic|partition|offset|timest
..
我在 Spark Structured Streaming 中使用 Kafka Source 来接收 Confluent 编码的 Avro 记录.我打算使用 Confluent Schema Registry,但是与 Spark 结构化流的集成似乎是不可能的. 我见过这个问题,但无法在 Confluent Schema Registry 中使用.使用 Spark 2.0 从 Kafka 读取
..
我有一个从 kafka 传入 dataFrame 的流数据.我想根据 Id 删除重复项并根据时间戳保留最新记录. 样本数据是这样的: Id 名称计数时间戳1 维卡斯 20 2018-09-19T10:10:102 维杰 50 2018-09-19T10:10:203 维拉斯 30 2018-09-19T10:10:304 维沙尔 10 2018-09-19T10:10:401 维卡斯 50
..
文档说: enable.auto.commit:Kafka 源不提交任何偏移量. 因此我的问题是,如果工作程序或分区崩溃/重启: startingOffsets 设置为最新,我们如何不丢失消息? startingOffsets 设置为最早,我们如何不重新处理所有消息? 这似乎很重要.有关如何处理的任何指示? 解决方案 我也遇到了这个问题. 您对 2 个选项的
..
问题 我正在尝试使用 Spark HDInsight 集群 (HDI 4.0) 通过 IntelliJ 运行远程 Spark 作业.在我的 Spark 应用程序中,我尝试使用内置 readStream 函数的 Spark 结构化流从 Azure blob 存储中的镶木地板文件文件夹读取输入流. 当我在连接到 HDInsight 群集的 Zeppelin 笔记本上运行代码时,该代码按预期
..
我在将 Spark 结构化流数据帧与批处理数据帧结合时遇到问题,我的场景我有一个 S3 流,它需要与历史数据进行左反连接,该数据返回历史中不存在的记录(找出新记录)我将这些记录作为新的追加写入历史记录(按列分区磁盘数据分区而不是内存). 当我刷新已分区的历史数据框时,我的历史数据框没有更新. 下面是两个代码片段,一个有效,另一个无效. 工作代码和非工作代码之间的唯一区别是 par
..
我正在使用 Spark Structured Streaming 并遇到问题. 在StreamingContext、DStreams中,我们可以定义一个批处理间隔如下: from pyspark.streaming import StreamingContextssc = StreamingContext(sc, 5) # 5 秒批处理间隔 如何在结构化流媒体中做到这一点? 我的流
..
我在使用 PySpark 结构化流时遇到以下问题. 我的流数据中的每一行都有一个用户 ID 和一个时间戳.现在,对于每一行和每个用户,我想添加一个具有时间戳差异的列. 例如,假设我收到的第一行内容是:“用户 A,08:00:00".如果第二行显示“用户 A,08:00:10",那么我想在第二行中添加一个名为“间隔"的列,表示“10 秒". 有没有人知道如何实现这一目标?我尝试使用
..
我正在尝试在 Postgresql 中实现流输入更新.具体来说,我想在 Spark 的流输入中使用 Postgresql 作为数据源. 看着文档,我不确定这是否可能. https://spark.apache.org/docs/latest/streaming-programming-guide.html 是否可以从 PostgresQL 流式传输输入,也许作为微批处理?
..
我第一次尝试将来自 Kafka 的 JSON 解析为 Spark 结构化流时需要一些帮助. 我正在努力转换传入的 JSON 并将其转换为平面数据帧以供进一步处理. 我的输入json是 [{ "siteId": "30:47:47:BE:16:8F", "siteData":[{ "dataseries": "trend-255", "values":[{"ts": 150271560
..
我正在对 Spark 结构化流数据帧进行一些转换.我将转换后的数据帧作为镶木地板文件存储在 hdfs 中.现在我希望写入 hdfs 应该分批进行,而不是先转换整个数据帧然后存储数据帧. 解决方案 这是一个镶木地板水槽示例: # 镶木地板水槽示例targetParquetHDFS = sourceTopicKAFKA.writeStream.format("parquet") # 可以是
..
我正在学习使用Databricks的结构化流,并且正在使用DataStreamWriter控制台模式. 我的程序: 模拟文件流式传输到达文件夹"monitoring_dir"(每10秒从"source_dir"传输一个新文件). 使用DataStreamReader用每个新文件的内容填充Unbounded DataFrame"inputUDF". 使用DataStreamWrite
..
我们有一个基于Hadoop的解决方案(CDH 5.15),我们在其中的某些目录中获取HDFS中的新文件.在这些目录的顶部,我们有4-5个Impala(2.1)表.在HDFS中写入这些文件的过程是Spark结构化流式传输(2.3.1) 现在,一旦将文件写入HDFS,我们就会运行一些DDL查询: ALTER TABLE table1恢复分区,以检测添加到表中的新分区(及其HDFS目录和文件
..
如何每5分钟触发一次并获取最近1个小时的数据?我想出了这一点,但似乎并没有给我最后1个小时的所有记录.我的理由是: 读取流, 根据时间戳列过滤最近1小时的数据,并且 使用 forEachbatch 进行写入/打印.还有 为它添加水印,以免保留所有过去的数据. 火花.readStream.format("delta").table("xxx").withWatermark(
..
我想从MQTT接收JSON字符串并将其解析为DataFrames df .我该怎么办? 这是我发送到MQTT队列以便在Spark中处理的Json消息的示例: {"id":1“时间戳记":1532609003,“距离":[2,5,7,8]} 这是我的代码: 从pyspark.sql中的 导入SparkSessionspark = SparkSession \.builder \.a
..
我已经创建了如下所示的数据框,其中使用了to_json()方法来创建JSON数组值. + ----------------------------------------------------------------------------------------------------| json_data |+ --------------------------------------
..
这是我以前的 map1:org.apache.spark.sql.DataFrame = [lookup:map] scala>val ds1 = spark.sql(“选择"p1"作为p,将Array('s2','s3')选择为c") ds1:org.apache.spark.sql.DataFrame = [p:字符串,c:数组] scala>
..
我使用spark(3.0.0)结构化流从kafka中读取主题. 我先使用 joins ,然后再使用 mapGropusWithState 来获取我的流数据,所以我必须使用 update https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks ). 我刚刚意
..
考虑一下,当将窗口持续时间(无滑动间隔)设置为时,我有一个带有时间戳字段列的输入df: 10分钟 输入时间(2019-02-28 22:33:02) 形成的窗口为(2019-02-28 22:30:02)到(2019-02-28 22:40:02) 8分钟 使用相同的时间输入(2019-02-28 22:33:02) 形成的窗口为((2019-02-28 22:26:0
..