spark-streaming相关内容
我已经编写了一个数据集火花作业(批处理)代码来扁平化数据,运行正常,但当我尝试在火花流作业中使用相同的代码片段时,它抛出以下错误 必须使用WriteStream.start(); 执行具有流来源的查询 那么,有什么方法可以在流作业中展平嵌套的JSON吗? 样本输入嵌套JSON- { "name":" Akash", "age":26, "watches":{
..
无法使用wasbs://...url从检查点恢复到Azure Blob存储 在群集模式下使用独立Spark 2.0.2。 val ssc = StreamingContext.getOrCreate(checkpointPath, () => createSSC(), hadoopConf) 我通过hadoopConf.set中的hadoopConf设置了fs.azure和fs.a
..
在我的电光流媒体项目中,我使用HBase-电光录制PV/UV。然后,当我关闭应用程序并重启它时,在检查点恢复时出现以下异常: 16/03/02 10:17:21错误HBaseContext:无法从广播获取配置 java.lang.ClassCastException:[B不能强制转换为org.apache.partk.SerializableWritable 在com.paitao.xmli
..
我希望基于多个条件使用另外两列更新一列中的值。对于Eg-流如下: +---+---+----+---+ | A | B | C | D | +---+---+----+---+ | a | T | 10 | 0 | | a | T | 100| 0 | | a | L | 0 | 0 | | a | L | 1 | 0 |
..
我想知道为什么我的火花流工作中有这么多任务编号?它变得越来越大... 运行3.2h后,增长到120020。运行一天后,它将增长到100万...为什么? 推荐答案 此SparkUI功能意味着某些阶段依赖项可能已计算过,也可能没有计算过,但由于其输出已可用而被跳过。因此,它们表现为skipped。 请不要使用might,这意味着在作业完成Spark之前,无法确定是否需要返回并重新计
..
我使用fileStream从Spark(流上下文)读取HDFS目录中的文件。如果我的Spark在一段时间后关闭并启动,我希望读取目录中的新文件。我不想读取Spark已经读取和处理过的目录中的旧文件。我在此尽量避免重复。 val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/home/File") 是否有需要帮助
..
{ var history: RDD[(String, List[String]) = sc.emptyRDD() val dstream1 = ... val dstream2 = ... val historyDStream = dstream1.transform(rdd => rdd.union(history)) val joined = historyDStream.join(ds
..
我正在尝试使用SparkStreaming(Spark-Streaming_2.10,版本:1.5.1)的简单文件流传输示例 public class DStreamExample { public static void main(final String[] args) { final SparkConf sparkConf = new SparkConf()
..
我想从流查询中获取类似triggerExecution, inputRowsPerSecond, numInputRows, processedRowsPerSecond的信息。 我使用rate格式生成10 rows per second,并使用QueryProgressEvent获取所有指标。 但是,在控制台中,当打印QueryProgressEvent.inputRowsPerSe
..
在电光流媒体中,推荐的在DStream上实现自定义分区程序的方式是什么? 我在批处理模式下使用了JavaPairRDD.artitionBy(分区程序),但在JavaDStreamPairRDD上找不到等效的分区程序。 谢谢 推荐答案 DStream上的分区是通过从接收方获取数据的过程创建的。由每个接收器创建的数据流被切割成大小spark.streaming.blockIn
..
我正在尝试将RDD检查点设置为非HDFS系统。从DSE document看起来不可能使用Cassandra文件系统。因此,我计划使用Amazon S3。但我找不到任何使用AWS的好例子。 问题 如何使用Amazon S3作为检查点目录?是否仅够调用 Ssc.check point(Amazons3url)? 除了Hadoop文件系统之外,检查点还可以有其他可靠的数据存储吗? 推
..
我使用 Scala 编写了简单的 kafka 流.它在当地运作良好.我已经拿了肥罐并在 scala 集群中提交.提交作业后,我收到类未找到错误.如果我提取 fat jar,它在 fat jar 中具有所有依赖项. 为什么我得到 class not found 错误?如何解决? 注意:如果我手动将 fat jar 部署(复制)到 Spark/jars 文件夹中.我看不出有什么问题.但是,
..
当我尝试使用 scalatest 在 SBT 窗口上对我的 spark 流代码执行单元测试时遇到异常. sbt testOnly > * * * * * * 2018-06-18 02:39:00 错误执行程序:91 - 阶段 3.0 (TID 11) 中的任务 1.0 异常java.lang.NoSuchMethodError: net.jpo
..
中提到的例子http://spark.apache.org/docs/latest/streaming-编程指南.html让我在 TCP 流中接收数据包并监听 端口 9999 导入 org.apache.spark._导入 org.apache.spark.streaming._import org.apache.spark.streaming.StreamingContext._//从 Spa
..
我构建了一个从 Twitter 接收 DStream 的应用程序,停止 Streaming 上下文的唯一方法是停止执行.我想知道是否有一种方法可以在不停止整个应用程序的情况下设置时间并终止流式套接字? 解决方案 您可以使用任一 awaitTerminationOrTimeout(long) 如上一个答案中所述,或者您可以从其他线程手动停止流上下文: //在主线程中等待终止();//将
..
当我尝试使用 cassandra 运行 Spark 应用程序时出现错误. 线程“main" org.apache.spark.SparkException 中的异常:只有一个 SparkContext 可能在这个 JVM 中运行(参见 SPARK-2243). 我使用的是 spark 版本 1.2.0,很明显我在我的应用程序中只使用了一个 spark 上下文.但是每当我尝试添加以下代码用于流式
..
我正在使用 Kafka Spark Streaming 来获取流数据. val lines = KafkaUtils.createDirectStream[Array[Byte], String, DefaultDecoder, StringDecoder](ssc, kafkaConf, Set(topic)).map(_._2) 我正在使用这个 DStream 并处理 RDD val o
..
在我们的 spark-streaming 作业中,我们从 kafka 中读取流中的消息. 为此,我们使用 KafkaUtils.createDirectStream API 返回 JavaPairInputDStreamfrom. 从 kafka(来自三个主题 - test1、test2、test3)中读取消息的方式如下: private static final String TO
..
我已经编写了一个非常简单的 python 脚本来测试我的 Spark 流创意,并计划在我的本地机器上运行它以稍微搞砸一下.这是命令行: spark-submit spark_streaming.py localhost 9999 但是终端给我一个错误: Error execution Jupyter command '': [Errno 2] No such file or director
..
是否可以将 Spark RDD 通过管道传输到 Python? 因为我需要一个 python 库来对我的数据进行一些计算,但是我的主要 Spark 项目是基于 Scala 的.有没有办法将它们混合或让 python 访问相同的 spark 上下文? 解决方案 您确实可以使用 Scala 和 Spark 以及常规 Python 脚本输出到 Python 脚本. test.py
..