spark-structured-streaming相关内容
是否可以使用 Spark 结构化流从 kafka 读取 protobuf 消息? 解决方案 方法一 sparkSession.udf().register("deserialize", getDeserializer(), schema);DataStreamReader dataStreamReader = sparkSession.readStream().format("kafka
..
我正在尝试将两个流合并为一个并将结果写入主题 代码:1- 阅读两个主题 val PERSONINFORMATION_df: DataFrame = spark.readStream.format(“卡夫卡").option("kafka.bootstrap.servers", "xx:9092").option(“订阅",“个人信息").option("group.id", "info")
..
考虑我有一个带有时间戳字段列的输入 df 并且在将窗口持续时间(没有滑动间隔)设置为: 10 分钟 输入时间(2019-02-28 22:33:02) 形成的窗口为 (2019-02-28 22:30:02) to (2019-02-28 22:40:02) 8 分钟 输入相同的时间(2019-02-28 22:33:02) 形成的窗口为 (2019-02-28 22:
..
尝试在 Spark 结构化流方面理解 SparkSql. Spark Session 从 kafka 主题读取事件,将数据聚合到按不同列名分组的计数并将其打印到控制台. 原始输入数据结构如下: +--------------+------------+----------+-----------+-------+-----------+--------------------+-------
..
我正在尝试将两个流合并为一个并将结果写入主题 代码:1- 阅读两个主题 val PERSONINFORMATION_df: DataFrame = spark.readStream.format(“卡夫卡").option("kafka.bootstrap.servers", "xx:9092").option(“订阅",“个人信息").option("group.id", "info")
..
我正在开发一个 Spark Structured Streaming 应用程序,它流式传输 csv 文件并将它们与静态数据连接起来.加入后我做了一些聚合. 在将查询结果以 CSV 格式写入 HDFS 时,出现以下错误: 19/01/09 14:00:30 错误 MicroBatchExecution: 查询 [id = 830ca987-b55a-4c03-aa13-f71bc57e47a
..
嗨,我有一个场景,传入的消息是一个 Json,它有一个标题说 tablename,数据部分有表列数据.现在我想把它写到 parquet 到单独的文件夹说 /emp 和 /dept.我可以通过基于表名聚合行在常规流中实现这一点.但是在结构化流媒体中,我无法拆分它.我如何才能在结构化流媒体中实现这一点. {"tableName":"employee","data":{"empid":1","em
..
我正在尝试将两个流合并为一个并将结果写入主题 代码:1- 阅读两个主题 val PERSONINFORMATION_df: DataFrame = spark.readStream.format(“卡夫卡").option("kafka.bootstrap.servers", "xx:9092").option(“订阅",“个人信息").option("group.id", "info")
..
嗨,我有一个场景,传入的消息是一个 Json,它有一个标题说 tablename,数据部分有表列数据.现在我想把它写到 parquet 到单独的文件夹说 /emp 和 /dept.我可以通过基于表名聚合行在常规流中实现这一点.但是在结构化流媒体中,我无法拆分它.我如何才能在结构化流媒体中实现这一点. {"tableName":"employee","data":{"empid":1","em
..
我正在开发一个 Spark Structured Streaming 应用程序,它流式传输 csv 文件并将它们与静态数据连接起来.加入后我做了一些聚合. 在将查询结果以 CSV 格式写入 HDFS 时,出现以下错误: 19/01/09 14:00:30 错误 MicroBatchExecution: 查询 [id = 830ca987-b55a-4c03-aa13-f71bc57e47a
..
我在结构化流 (spark 2.2.0) 中使用自定义接收器,并注意到 spark 生成的输入行数指标不正确 - 它始终为零. 我的流构建: StreamingQuery writeStream = session.readStream().schema(RecordSchema.fromClass(TestRecord.class)).option(OPTION_KEY_DELIMITE
..
我在结构化流 (spark 2.2.0) 中使用自定义接收器,并注意到 spark 生成的输入行数指标不正确 - 它始终为零. 我的流构建: StreamingQuery writeStream = session.readStream().schema(RecordSchema.fromClass(TestRecord.class)).option(OPTION_KEY_DELIMITE
..
有一种方法可以通过将属性 spark.streaming.stopGracefullyOnShutdown 设置为 true,然后使用 kill -SIGTERM 命令终止进程来正常关闭 Spark 流.但是,我没有看到此类选项可用于结构化流 (SQLContext.scala). 结构化流媒体的关闭过程是否不同?或者只是还没有实施? 解决方案 此功能尚未实现.但是,spark 结构
..
我有一个包含三列时间的流数据框,col1,col2. +----------------------+--------------------+--------------------+|时间 |col1 |col2 |+----------------------+--------------------+--------------------+|2018-01-10 15:27:21.28
..
在 Spark 结构流上执行 Spark SQL 时遇到一些问题.错误的 PFA. 这是我的代码 对象 sparkSqlIntegration {def main(args: Array[String]) {val spark = SparkSession.builder.appName("StructuredStreaming").master("本地[*]").config("spa
..
我之前成功地将 pyspark 用于 Spark Streaming (Spark 2.0.2) 和 Kafka (0.10.1.0),但我的目的更适合结构化流.我尝试在线使用示例:https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html 使用以下类似代码: ds1 = spark.rea
..
我正在尝试使用 Kafka 和 Python 进行结构化流式传输.需求:我需要在 Spark 中处理来自 Kafka(以 JSON 格式)的流数据(执行转换),然后将其存储在数据库中. 我有 JSON 格式的数据,例如,{"a": 120.56, "b": 143.6865998138807, "name": "niks", "time": "2012-12-01 00:00:09"}
..
我第一次尝试将来自 Kafka 的 JSON 解析为 Spark 结构化流时需要一些帮助. 我正在努力转换传入的 JSON 并将其转换为平面数据帧以供进一步处理. 我的输入json是 [{ "siteId": "30:47:47:BE:16:8F", "siteData":[{ "dataseries": "trend-255", "values":[{"ts": 150271560
..
我有一个方案来处理文件中的记录.文件中的数据会定期(每毫秒)添加一次.所以我需要读取文件并处理它,同时只处理新添加的记录. 我遇到了基于 Spark SQL 构建的 Spark Structured 流的概念.我正在做的是 - 每 1 秒触发一次文件流处理 对文件运行 Spark SQL 查询 以追加模式在控制台上写入查询的输出. 下面是相同的代码 - 公共静态类 Spar
..
我正在尝试使用 pyspark 从 kafka 读取流.我正在使用 spark 版本 3.0.0-preview2 和 spark-streaming-kafka-0-10_2.12在此之前,我只是统计 zookeeper,kafka 并创建一个新主题: /usr/local/kafka/bin/zookeeper-server-start.sh/usr/local/kafka/config/z
..