spark-structured-streaming相关内容
我是Spark和Kafka的新手。使用从免费Kafka服务器提供商(Cloudkarafka)创建的Kafka服务器来使用数据。在运行pyspark代码(在Databricks上)以使用流数据时,流只是保持初始化,并且不获取任何内容。它既不会失败,也不会停止执行,只是将状态保持为流正在初始化。 代码: from pyspark.sql.functions import col kaf
..
技术堆栈详细信息- Scala - 2.11.8 Spark - 2.4.4 Delta - 0.7.0 Running On - AWS EMR 用法- spark.readStream .format("kinesis") .option("streamName", kinesisConfs.streamName) .option("regi
..
我已经在Spark Structure Streaming中实现了MQ源代码。我正在使用IBM MQ Core库和Java 8 com.ibm.mq com.ibm.mq ${mq.version}
..
我对Databricks中以下代码的差异感到困惑 spark.readStream.format('json') vs spark.readStream.format('cloudfiles').option('cloudFiles.format', 'json') 我知道CloudFiles,因为它的格式将被视为Databricks Autoloader。在性能/功能比较中,
..
我当前的结构化流应用程序写入一个巨大的Delta表。当我(停止流)并将其指向写入全新的增量表时: 它变得更快-批量持续时间减少了近四分之一 投入率几乎提高了3倍 我知道它可能会变得更快,因为它在旧的/更大的表上执行的任何聚合/写入在新表上都不需要。但投入率的变化我希望有人能解释一下? 源为Azure EventHubs。 谢谢! 推荐答案 回答我自己的问题:
..
我想从流查询中获取类似triggerExecution, inputRowsPerSecond, numInputRows, processedRowsPerSecond的信息。 我使用rate格式生成10 rows per second,并使用QueryProgressEvent获取所有指标。 但是,在控制台中,当打印QueryProgressEvent.inputRowsPerSe
..
我的Java应用程序使用连接到套接字服务器的Spark Structured Streaming不断获取封装在RDMessage对象中的传感器测量记录(IoT),该对象记录协议中用于控制的消息类型。 当消息到达时,将使用Encoder measurementEncoder = Encoders.bean(RDMeasurement.class)检查它们并将其转换为数据
..
我正在尝试运行像 StructuredKafkaWordCount.我从 Spark 结构化流式编程指南开始. 我的代码是 包 io.boontadata.spark.job1导入 org.apache.spark.sql.SparkSession对象 DirectKafkaAggregateEvents {val FIELD_MESSAGE_ID = 0val FIELD_DEVICE_
..
我正在尝试为 Structured Streaming 编写一个自定义接收器,它将使用来自 RabbitMQ 的消息.Spark 最近发布 DataSource V2 API,看起来很有前途.由于它抽象了许多细节,我想使用这个 API 以既简单又性能好.但是,由于它很新,因此可用的资源并不多.我需要经验丰富的 Spark 人员的说明,因为他们会更容易掌握关键点.我们开始: 我的起点是博客文章
..
我正在尝试使用 Spark 结构化流从 Kafka 主题中读取 XML 数据. 我尝试使用 Databricks spark-xml 包,但我收到一条错误消息,指出此包不支持流式读取.有什么方法可以使用结构化流从 Kafka 主题中提取 XML 数据? 我当前的代码: df = spark \.readStream \.format(“卡夫卡")\.format('com.datab
..
我想使用 Pyspark API 将结构流数据写入 Cassandra. 我的数据流如下: Nifi -> Kafka -> Spark Structure Streaming -> Cassandra 我尝试过以下方式: query = df.writeStream\.format("org.apache.spark.sql.cassandra")\.option("keys
..
关于 Spark 结构化流与 HIVE 表集成的一个查询. 我尝试做一些火花结构化流的例子. 这是我的例子 val spark =SparkSession.builder().appName("StatsAnalyzer").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.
..
基于Spark 3.0中的介绍,https:///spark.apache.org/docs/latest/structured-streaming-kafka-integration.html.应该可以设置“kafka.group.id"跟踪偏移量.对于我们的用例,如果流式 Spark 作业失败并重新启动,我想避免潜在的数据丢失.根据我之前的问题,我觉得 Spark 3.0 中的 kafka.g
..
这看起来应该是显而易见的,但是在查看文档和示例时,我不确定我是否可以找到一种使用 PySpark 获取结构化流和转换的方法. 例如: from pyspark.sql import SparkSession火花=(火花会话.builder.appName('StreamingWordCount').getOrCreate())原始记录 = (火花.readStream.format('so
..
我使用 Spark Structured Streaming 的流式查询使用以下代码将 Parquet 文件写入 S3: ds.writeStream().format("parquet").outputMode(OutputMode.Append()).option("queryName", "myStreamingQuery").option("checkpointLocation", "s
..
我正在努力让 console 接收器与 从 Zeppelin 运行时的 PySpark 结构化流.基本上,我没有看到任何结果打印到屏幕上,也没有看到我找到的任何日志文件. 我的问题: 有没有人有将 PySpark 结构化流与接收器一起使用的工作示例,该接收器产生在 Apache Zeppelin 中可见的输出?理想情况下,它还可以使用套接字源,因为这很容易测试. 我正在使用: U
..
我在将 Spark 结构化流数据帧与批处理数据帧结合时遇到问题,我的场景我有一个 S3 流,它需要与历史数据进行左反连接,该数据返回历史中不存在的记录(找出新记录)并且我将这些记录作为新的追加写入历史记录(按列分区磁盘数据分区而不是内存). 当我刷新已分区的历史数据框时,我的历史数据框没有更新. 下面是两个代码片段,一个有效,另一个无效. 工作代码和非工作代码之间的唯一区别是 p
..
这是我之前的一个后续问题问题 scala>val map1 = spark.sql("select map('s1', 'p1', 's2', 'p2', 's3', 'p3') as lookup") map1: org.apache.spark.sql.DataFrame = [lookup: map] scala>val ds1 = spark.sql("select 'p1' as
..
scala>val map1 = spark.sql("select map('p1', 's1', 'p2', 's2')") map1: org.apache.spark.sql.DataFrame = [map(p1, s1, p2, s2): map] scala>map1.show()+--------------------+|地图(p1,s1,p2,s2)|+----------
..
我正在尝试使用 spark 结构化流(版本 2.3.1)处理来自 kafka 的流 avro 数据,所以我尝试使用 this 反序列化示例.仅当主题 value 部分包含 StringType 时才有效,但在我的情况下,架构包含 long 和 integers,如下所示: public static final String USER_SCHEMA = “{";+ "\"类型\":\"记录\",
..