spark-structured-streaming相关内容
如何通过指定开始和结束偏移量将kafka主题中的数据读取到RDD? KafkaUtils.createRDD is 是实验性的,API 相当不愉快(它返回一个巨大的 Java ConsumerRecord 类,甚至无法序列化并把它放在 KafkaRDD 中,它覆盖了很多方法(比如persist)来抛出一个异常. 我想要的是这样一个简单的 API: case class Message
..
我有一个 Spark 结构的 Steam 应用程序,我正在从 Kafka 读取它.这是我的代码的基本结构. 我创建了 Spark 会话. val spark = SparkSession.builder.appName("app_name").getOrCreate() 然后我从流中读取 val data_stream = spark.readStream.format("卡夫卡").
..
对于以下写主题/读主题air2008rand串联: import org.apache.spark.sql.streaming.Trigger(spark.readStream.format("卡夫卡").option("kafka.bootstrap.servers", "localhost:9092").option("startingOffsets", "最早").option("订阅",
..
我正在尝试使用 Spark Structure Streaming 和 Foreach Sink 将反序列化的 Kafka 记录插入到 Data Stax Cassandra. 例如,我的反序列化数据框数据和所有数据一样都是字符串格式. id 名称 日期100 'test' 系统日期 使用 foreach Sink 我创建了一个类并尝试通过转换来插入如下记录. session.exec
..
是否可以使用 Spark 结构化流从 kafka 读取 protobuf 消息? 解决方案 方法一 sparkSession.udf().register("deserialize", getDeserializer(), schema);DataStreamReader dataStreamReader = sparkSession.readStream().format("kafka
..
我正在尝试使用 PySpark 和 Structured Streaming (Spark 2.3) 在两个 Kafka Stream 之间进行左外连接. import os导入时间从 pyspark.sql.types 导入 *从 pyspark.sql.functions 导入 from_json、col、struct、explode、get_json_object从 ast 导入liter
..
下面是我的代码.我尝试了许多不同的选择变体,但应用程序可以运行,但没有显示每秒写入的消息.我有一个 Spark Streaming 示例,它使用 pprint() 确认 kafka 实际上每秒都在获取消息.Kafka 中的消息是 JSON 格式的,请参阅字段/列标签的架构: from pyspark.sql.functions import *从 pyspark.sql.types 导入 *进口
..
这里是我使用Spark Structured Streaming从Kafka读取数据的代码, //ss:SparkSession 是之前定义的.导入 ss.implicits._val df = ss.readStream.format("卡夫卡").option("kafka.bootstrap.servers", kafka_server).option("订阅", topic_input)
..
我尝试使用文档中提到的以下代码来使用我的 kafka 主题: df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092,") \.option("订阅", "first_topic") \.加载()df.selectExpr("CAST(key AS STRING)"
..
我正在尝试将我的结构化流式 Spark 2.4.5 与 kafka 连接起来,但我一直在尝试此数据源提供程序错误.按照我的 Scala 代码和我的 sbt 构建: import org.apache.spark.sql._导入 org.apache.spark.sql.types._导入 org.apache.spark.sql.functions._导入 org.apache.spark.sq
..
我第一次尝试将来自 Kafka 的 JSON 解析为 Spark 结构化流时需要一些帮助. 我正在努力转换传入的 JSON 并将其转换为平面数据帧以供进一步处理. 我的输入json是 [{ "siteId": "30:47:47:BE:16:8F", "siteData":[{ "dataseries": "trend-255", "values":[{"ts": 150271560
..
我之前成功地将 pyspark 用于 Spark Streaming (Spark 2.0.2) 和 Kafka (0.10.1.0),但我的目的更适合结构化流.我尝试在线使用示例:https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html 使用以下类似代码: ds1 = spark.rea
..
我正在尝试使用 pyspark 从 kafka 读取流.我正在使用 spark 版本 3.0.0-preview2 和 spark-streaming-kafka-0-10_2.12在此之前,我只是统计 zookeeper,kafka 并创建一个新主题: /usr/local/kafka/bin/zookeeper-server-start.sh/usr/local/kafka/config/z
..
我想将 Twitter 上的数据写入 Kafka.出于教育目的,我尝试使用 Structured Streaming 来做到这一点.我基于 socket-Source 创建了一个 Twitter-Source,效果很好. 我按如下方式设置我的来源: val tweets = spark.readStream.format("推特").option("查询", 条款).加载().as[Spa
..
我想将 Twitter 上的数据写入 Kafka.出于教育目的,我尝试使用 Structured Streaming 来做到这一点.我基于 socket-Source 创建了一个 Twitter-Source,效果很好. 我按如下方式设置我的来源: val tweets = spark.readStream.format("推特").option("查询", 条款).加载().as[Spa
..
当我运行命令时,我正在尝试运行 Python Spark Structured Streaming + Kafka Master@MacBook-Pro spark-3.0.0-preview2-bin-hadoop2.7 % bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.5 \示例/src/
..
我正在尝试使用 pyspark 从 kafka 读取流.我正在使用 spark 版本 3.0.0-preview2 和 spark-streaming-kafka-0-10_2.12在此之前,我只是统计 zookeeper,kafka 并创建一个新主题: /usr/local/kafka/bin/zookeeper-server-start.sh/usr/local/kafka/config/z
..
我正在使用 Spark 结构化流从 Kafka 主题读取记录;我打算计算 Spark readstream 中每个“微批次"中收到的记录数 这是一个片段: val kafka_df = sparkSession.readStream.format("卡夫卡").option("kafka.bootstrap.servers", "host:port").option("订阅", "测试计数"
..
在使用 Spark Structured 流时,我无法理解检查点的工作原理. 我有一个生成一些事件的 spark 进程,我将这些事件登录到 Hive 表中.对于这些事件,我会在 kafka 流中收到一个确认事件. 我创建了一个新的火花过程 将 Hive 日志表中的事件读入 DataFrame 使用 Spark Structured Streaming 将这些事件与确认事件流结合
..
我想使用 Spark Streaming 并将其与 Kafka 连接.但是我仍然收到 NoSuchMethodError:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe 错误,现在我不知道下一步该做什么. 我的设置: Ubuntu 16.04 斯卡拉 2.11 Kafka 2.11-1.0.0(我也试过用
..