spark-streaming相关内容
检查下面的代码.如果存在重复键,它将生成具有歧义的数据帧.我们应该如何修改代码以添加父列名称作为前缀. 添加了另一个包含 json 数据的列. scala>val df = 序列((77, "email1", """{"key1":38,"key3":39}""",""""{"name":";aaa","age":10}"""),(78, "email2", """{"key1":38,"
..
嗨,我有一个场景,传入的消息是一个 Json,它有一个标题说 tablename,数据部分有表列数据.现在我想把它写到 parquet 到单独的文件夹说 /emp 和 /dept.我可以通过基于表名聚合行在常规流中实现这一点.但是在结构化流媒体中,我无法拆分它.我如何才能在结构化流媒体中实现这一点. {"tableName":"employee","data":{"empid":1","em
..
嗨,我是 Spark Streaming 的新手.我正在尝试读取 xml 文件并将其发送到 kafka 主题.这是我的 Kafka 代码,它向 Kafka-console-consumer 发送数据. 代码: package org.apache.kafka.Kafka_Producer;导入 java.io.BufferedReader;导入 java.io.FileNotFoundEx
..
场景:我正在用 Spark Streaming 做一些测试.大约 100 条记录的文件每 25 秒出现一次. 问题:程序中使用 local[*] 的 4 核 pc 的处理平均需要 23 秒.当我将相同的应用程序部署到具有 16 核的服务器时,我期望处理时间有所改善.但是,我看到它仍然在 16 个内核中花费相同的时间(还检查了 ubuntu 中的 cpu 使用情况,并且 cpu 正在得到充分利
..
我正在尝试添加 JSONSerDe jar 文件以访问 json 数据,将 JSON 数据从 spark 作业加载到 hive 表.我的代码如下所示: SparkConf sparkConf = new SparkConf().setAppName("KafkaStreamToHbase");JavaSparkContext sc = new JavaSparkContext(sparkConf
..
我在结构化流 (spark 2.2.0) 中使用自定义接收器,并注意到 spark 生成的输入行数指标不正确 - 它始终为零. 我的流构建: StreamingQuery writeStream = session.readStream().schema(RecordSchema.fromClass(TestRecord.class)).option(OPTION_KEY_DELIMITE
..
我在结构化流 (spark 2.2.0) 中使用自定义接收器,并注意到 spark 生成的输入行数指标不正确 - 它始终为零. 我的流构建: StreamingQuery writeStream = session.readStream().schema(RecordSchema.fromClass(TestRecord.class)).option(OPTION_KEY_DELIMITE
..
在我的应用程序中,我从 Kafka 队列获得了一个帐户流(使用 Spark 流和 kafka) 而且我需要从 S3 获取与这些帐户相关的属性,因此我计划缓存 S3 结果数据帧,因为 S3 数据目前至少不会更新一天,将来可能会更改为 1 小时或 10 分钟.所以问题是如何在不停止进程的情况下定期刷新缓存的数据帧. **更新:我计划在 S3 中有更新时将事件发布到 kafka,使用 SNS
..
这是一个 Spark Streaming 应用程序,它使用以 Proto Buf 编码的 Kafka 消息.使用 scalapb 库.我收到以下错误.请帮忙. >com.google.protobuf.InvalidProtocolBufferException: 在解析一个>协议消息,输入在一个中间意外结束>场地.这可能意味着输入已被截断或>嵌入的消息误报了自己的长度.在>com.google
..
有一种方法可以通过将属性 spark.streaming.stopGracefullyOnShutdown 设置为 true,然后使用 kill -SIGTERM 命令终止进程来正常关闭 Spark 流.但是,我没有看到此类选项可用于结构化流 (SQLContext.scala). 结构化流媒体的关闭过程是否不同?或者只是还没有实施? 解决方案 此功能尚未实现.但是,spark 结构
..
我正在通过将 SparkStreaming 数据转换为数据帧将其写入 HDFS: 代码 object KafkaSparkHdfs {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkKafka")sparkConf.set("spark.driver.allowMultipleContexts",
..
如何从 Spark 和 Scala 中的 Azure blob 存储中获取文件列表. 我不知道要解决这个问题. 解决方案 如果您正在使用数据块,请尝试以下方法 dbutils.fs.ls(“blob_storage_location")
..
我正在使用 Spark Streaming 通过创建 StreamingContext 从 Twitter 获取推文: val ssc = new StreamingContext("local[3]", "TwitterFeed",Minutes(1)) 并将推特流创建为: val tweetStream = TwitterUtils.createStream(ssc, Some(ne
..
我在开始时创建了“gpsLookUpTable"并保留它,这样我就不需要一遍又一遍地拉它来做映射.但是,当我尝试在 foreach 内部访问它时,我得到了空指针异常.感谢任何帮助. 以下是代码片段: def main(args: Array[String]): Unit = {val conf = new SparkConf() ...val sc = 新的 SparkContext(co
..
我知道 spark 内置方法可以分区和读取大块文件,并使用文本文件作为 rdd 分发.但是,我正在自定义的加密文件系统中阅读此内容,而 Spark 本质上不支持该文件系统.我能想到的一种方法是读取输入流并加载多行并分发给执行程序.继续阅读,直到所有文件都加载完毕.所以没有执行器会因为内存不足错误而崩溃.可以在 spark 中做到这一点吗? 解决方案 你可以针对不同的 n 尝试lines.t
..
数据看起来像: 列 1 列 2 列 3 列 4 第 1 行 1 行 1 行 1 第2行 第2行 第2行 第2行 第3行 第3行 第3行 第3行 第4行 第4行 第4行 第4行 第5行 第5行 第5行 第5行 第6行第6行第6行第6行 问题:我想对这些数据进行分区,假设第 1 行和第 2 行将作为一个分区处理,第 3 行和第 4 行作为另一个处理,第 5 行和第 6 行作为另
..
这是我的源代码,其中我从服务器端获取一些数据,它不断生成数据流.然后对于每个 RDD ,我正在应用 SQL Schema,一旦创建了这个表,我就会尝试从这个 DStream 中选择一些东西. List男性 = 新 ArrayList();JavaDStreamdata = streamingContext.socketTextStream("localhost", (port));数据打印()
..
是否可以将流式o.a.s.sql.Dataset 转换为DStream?如果是这样,如何? 我知道如何将其转换为 RDD,但它是在流上下文中. 解决方案 这是不可能的.Structured Streaming 和传统 Spark Streaming (DStreams) 使用完全不同的语义并且彼此不兼容,因此: DStream 无法转换为 Streaming Dataset.
..
有没有办法指定 Spark Structured File Stream Source 的起始偏移量? 我正在尝试从 HDFS 流式传输镶木地板: spark.sql("SET spark.sql.streaming.schemaInference=true")spark.readStream.parquet("/tmp/streaming/").writeStream.option("c
..
我的 ElasticSearch 集群中有一些与销售相关的 JSON 数据,我想使用 Spark Streaming(使用 Spark 1.4.1)通过 Kafka 从我的电子商务网站动态聚合传入的销售事件,以获取当前视图用户的总销售额(就收入和产品而言). 从我阅读的文档中我不太清楚的是如何在 Spark 应用程序启动时从 ElasticSearch 加载历史数据,并计算例如每个用户的总收
..