spark-structured-streaming相关内容
如何为PySpark中的流式DataFrame设置架构. from pyspark.sql import SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split # Import data types from pyspark.sql.types import
..
我想使用Spark结构化流从安全的Kafka中读取内容.这意味着我将需要强制使用特定的group.id.但是,如文档中所述,这是不可能的. 不过,在databricks文档中 https://docs. azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl 表示可能.难道这只指的是天蓝色的星团吗?
..
我正在尝试使用结构化流媒体方法使用基于DataFrame/Dataset API的Spark-Streaming从Kafka加载数据流. 我使用: 火花2.10 卡夫卡0.10 spark-sql-kafka-0-10 Spark Kafka数据源已定义了基础架构: |key|value|topic|partition|offset|timestamp|timestam
..
我在Spark结构化流媒体中使用Kafka源来接收Confluent编码的Avro记录.我打算使用Confluent Schema Registry,但似乎无法与spark结构化流集成. 我已经看到了这个问题,但是无法与Confluent Schema Registry一起使用. 使用Spark 2.0从Kafka读取Avro消息.2(结构化流) 解决方案 这是需要的依赖项.
..
我正在使用kafka connect cassandra源连接器1.0版本.我在cassandra表中有一个十进制数据类型列(价格),并将其作为json从源连接器写入到kafka主题,它以某种字符串格式(如"price":"AA==")写入十进制值. 现在它在我的火花流中出错,同时转换为“数字格式异常"浮动.... ??请提出在kafka主题中编写值时可能出了什么问题. 预先感谢. 解决方案
..
我试图重现[Databricks] [1]中的示例,并将其应用于Kafka的新连接器并进行Spark结构化流式传输,但是我无法使用Spark中的现成方法正确解析JSON. .. 注意:该主题以JSON格式写入Kafka. val ds1 = spark .readStream .format("kafka") .option(
..
我正在使用Spark Structured Streaming的流查询通过以下代码将镶木地板文件写入S3: ds.writeStream().format("parquet").outputMode(OutputMode.Append()) .option("queryName", "myStreamingQuery") .op
..
当我运行命令 时,我正在尝试运行Python Spark结构化流+ Kafka Master@MacBook-Pro spark-3.0.0-preview2-bin-hadoop2.7 % bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.5 \ examples/src/main/pytho
..
我在Spark2软件包2.2.0中使用HDP-2.6.3.0. 我正在尝试使用结构化流API编写Kafka使用者,但是在将作业提交到集群后出现以下错误: Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages
..
有很多在线示例,这些示例从Kafka读取json(以写入镶木地板)-但我不知道如何将模式应用于来自kafka的CSV字符串. 流式数据: customer_1945,cusaccid_995,27999941 customer_1459,cusaccid_1102,27999942 架构: schema = StructType() \ .add("customer_
..
我是第一次使用pyspark. Spark版本:2.3.0 Kafka版本:2.2.0 我有一个kafka生产者,它以avro格式发送嵌套数据,我正尝试在pyspark中以spark-streaming/结构化流编写代码,这会将来自kafka的avro反序列化为数据帧,然后以拼花格式将其写入到s3中. 我能够在spark/scala中找到avro转换器,但尚未添加对pyspark的支持.我如
..
我想从融合的云主题中读取数据,然后再写入另一个主题. 在本地主机上,我没有遇到任何重大问题.但是融合云的架构注册表需要传递一些我不知道如何输入的身份验证数据: basic.auth.credentials.source = USER_INFO schema.registry.basic.auth.user.info =: schema.registry.url = http
..
我有一个设置自Kafka主题的Spark Streaming App,我需要使用一些Pandas Dataframe的API,但是当我尝试对其进行转换时,却得到了 : org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
..
我正在使用Kafka的结构化流源 (集成指南),如上所述,它不会产生任何偏移. 我的目标之一是监视它(检查它是否滞后等).即使不提交偏移量,它也会通过不时查询kafka并检查下一个要处理的偏移量来处理它们.根据文档,偏移量已写入HDFS,因此在发生故障的情况下可以将其恢复,但问题是: 它们存储在哪里? 如果没有提交偏移量(结构化的),是否有任何方法可以监视火花累积(结构化)来监视卡夫卡
..
Spark 2.2引入了Kafka的结构化流媒体源.据我了解,它依靠HDFS检查点目录来存储偏移量并保证“完全一次"的消息传递. 但是旧码头(例如如果我想将Kafka源的偏移量存储到事务性数据库中,如何从结构化流批处理中获取偏移量? 以前,可以通过将RDD强制转换为HasOffsetRanges: val offsetRanges = rdd.asInstanceOf[HasOf
..
我正在尝试将Apache Spark结构化流连接到MQTT主题(在本例中为IBM Bluemix上的IBM Watson IoT Platform). 我正在创建结构化流,如下所示: val df = spark.readStream .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
..
此问题已经在之前进行过讨论在这里,但是在撰写本文时,我没有足够的声誉来评论Algomeisters解决方案(最终没有为我工作) 我有一个使用Kafka和结构化流媒体的火花工作.因此,这需要我具有spark-sql-kafka-0-10模块的依赖项. Jacek Laskowski 表示您必须在Spark提交命令行选项中包含此软件包 对Kafka的结构化流媒体支持单独存在 spar
..
我在kafka客户端上使用spark-sql-2.4.x版本. 即使在设置使用者配置参数之后 IE. 最大分区提取字节数max.poll.records 设置不正确,并显示以下默认值 Dataset df = sparkSession .readStream() .format("k
..
我正在尝试从Kafka主题中读取消息.消息采用以下格式(示例格式): {"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{
..
尽管我正在使用 withWatermark(),但是当我运行我的spark工作时,我收到以下错误消息: 线程“main”中的异常org.apache.spark.sql.AnalysisException:当没有水印的流式DataFrames / DataSets上有流式聚合时,不支持追加输出模式;; 从我在编程指南,这与预期用法(和示例代码)完全匹配。有谁知道可能出错了什么?
..