spark-structured-streaming相关内容
如何在不使用 flatMapsGroupWithState 或 Dstream API 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合?寻找更具声明性的方式 示例: select count(*) from some_view 我希望输出只计算每批中可用的任何记录,而不是从上一批中汇总 解决方案 要在 Spark 中使用 St
..
如何在 Python Spark 结构化流中使用 foreach 来触发输出操作. query = wordCounts\.writeStream\.outputMode('更新')\.foreach(函数)\.开始()定义函数():操作(字数) 解决方案 在 Spark 2.4.0 中添加了对 Python 中 foreach sink 的支持,并且文档已更新:http://spark.a
..
如何在不使用 flatMapsGroupWithState 或 Dstream API 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合?寻找更具声明性的方式 示例: select count(*) from some_view 我希望输出只计算每批中可用的任何记录,而不是从上一批中汇总 解决方案 要在 Spark 中使用 St
..
是否可以将流式o.a.s.sql.Dataset 转换为DStream?如果是这样,如何? 我知道如何将其转换为 RDD,但它是在流上下文中. 解决方案 这是不可能的.Structured Streaming 和传统 Spark Streaming (DStreams) 使用完全不同的语义并且彼此不兼容,因此: DStream 无法转换为 Streaming Dataset.
..
我是 Spark 世界的新手,并且在一些概念上苦苦挣扎. 使用来自 Kafka 的 Spark Structured Streaming 时,并行性如何发生? 让我们考虑以下代码片段: SparkSession spark = SparkSession.builder().appName("myApp").getOrCreate();数据集ds = 火花.readStream().f
..
我有一个包含数百万行的静态 DataFrame,如下所示. 静态DataFrame: --------------ID|时间戳|--------------|1|1540527851||2|1540525602||3|1530529187||4|1520529185||5|1510529182||6|1578945709|-------------- 现在在每个批次中,正在形成一个流Da
..
我正在尝试使用 Spark 结构化流从 Kafka 主题读取 XML 数据. 我尝试使用 Databricks spark-xml 包,但我收到错误消息,说该包不支持流式读取.有什么方法可以使用结构化流从 Kafka 主题中提取 XML 数据? 我当前的代码: df = spark \.readStream \.format("kafka") \.format('com.databr
..
我正在编写一个 Spark Structured Streaming 程序.我需要创建一个具有滞后差异的附加列. 为了重现我的问题,我提供了代码片段.此代码使用存储在 data 文件夹中的 data.json 文件: [{"id": 77,"type": "person","timestamp": 1532609003},{"id": 77,"type": "person","timest
..
一直在玩 Spark Structured Streaming 和 mapGroupsWithState(特别是按照 StructuredSessionization 示例在 Spark 源中).鉴于我的用例,我想确认我认为 mapGroupsWithState 存在的一些限制. 就我而言,会话是一组不间断的用户活动,这样两个按时间顺序(按事件时间,而不是处理时间)排序的事件之间的间隔不会超
..
我正在尝试读取来自 Kafka 主题的消息.消息格式如下(示例格式): {"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},
..
我正在编写一个 Python 应用程序,它在一系列值上滑动一个窗口,每个值都带有时间戳.我想对滑动窗口中的值应用一个函数,以便从 N 个最新值计算分数,如图所示.我们已经使用 Python 库实现了该功能以利用 GPU. 我发现 Apache Spark 2.0 附带结构化流,它支持事件时间的窗口操作.如果你想从一个.csv文件中读取一个有限的记录序列,并且想对这样一个滑动窗口中的记录进行计
..
我正在构建一个 Spark Structured Streaming 应用程序,我正在其中执行批处理流连接.并且批处理数据的来源会定期更新. 因此,我计划定期对该批数据进行持久化/非持久化. 以下是我用来持久化和取消持久化批处理数据的示例代码. 流程: 读取批处理数据 保留批处理数据 每隔一小时,取消持久化数据并读取批处理数据并再次持久化. 但是,我没有看到批处理
..
Kinesis firehose 管理文件的持久性,在这种情况下是时间序列 JSON,到一个文件夹层次结构中,该层次结构按 YYYY/MM/DD/HH(精确到 24 小时编号)进行分区......很棒. 如何使用 Spark 2.0 然后我可以读取这些嵌套的子文件夹并从所有叶 json 文件创建静态数据帧?数据帧阅读器有“选项"吗? 我的下一个目标是使其成为流式 DF,其中 Fireh
..
我正在读取来自 Kafka 的流,并将值从 Kafka(即 JSON)转换为 Structure. from_json 有一个采用 String 类型模式的变体,但我找不到示例.请告知以下代码中的错误. 错误 线程“main"org.apache.spark.sql.catalyst.parser.ParseException 中的异常:外来输入 '(' 期待 {'SELECT',
..
我在 Spark 2.2.0 中使用聚合和分区运行结构化流时遇到内存问题: session.readStream().schema(inputSchema).option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB).option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF).csv("s3://
..
我有一个带架构的数据框 - |-- record_id: integer (nullable = true)|-- 数据 1:字符串(可为空 = 真)|-- 数据 2:字符串(可为空 = 真)|-- Data3: 字符串 (nullable = true)|-- 时间:时间戳(可为空 = 真) 我想检索数据中的最后一条记录,按 record_id 和最大时间戳分组. 所以,如果数据最初
..
我想在 Spark Structured Streaming 中进行多次聚合. 像这样: 读取输入文件流(从文件夹中) 执行聚合 1(带有一些转换) 执行聚合 2(以及更多转换) 当我在结构化流中运行它时,它给我一个错误“流数据帧/数据集不支持多个流聚合". 有没有办法在 Structured Streaming 中进行这种多重聚合? 解决方案 这个不支持,但也
..
我在 java 中使用 spark-sql-2.4.3v.我有下面的场景 val data = List((“20",“分数",“学校",14 ,12),(“21"、“分数"、“学校"、13、13),(“22",“比率",“学校",11 ,14),(“23",“分数",“学校",11 ,14),(“24",“比率",“学校",12 ,12),(“25"、“分数"、“学校"、11 ,14))val
..
如何在 PySpark 中为流式 DataFrame 设置架构. from pyspark.sql import SparkSession从 pyspark.sql.functions 导入爆炸从 pyspark.sql.functions 导入拆分# 导入数据类型从 pyspark.sql.types 导入 *火花 = SparkSession\.builder\.appName("Struc
..
SparkSession.builder.master("本地[*]").config("spark.sql.warehouse.dir", "C:/tmp/spark").config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint").appName("我的测试").getOrCreate.rea
..