spark-streaming相关内容
给定一个动态的 structType .此处 structType 名称未知.它是动态的,因此它的名字正在改变. 名称是可变的.所以不要预先假设“MAIN_COL"在架构中. root|-- MAIN_COL: struct (nullable = true)||-- a: 字符串 (nullable = true)||-- b: 字符串(可为空 = 真)||-- c: 字符串 (null
..
我正在尝试读取以制表符分隔但无法读取所有记录的分隔文件. 这是我的输入记录: head1 head2 head3a b ca2 a3 a4a1"b1"c1 我的代码: var inputDf = sparkSession.read.option("分隔符","\t").option("header", "true")//.option("inferSchema", "true").op
..
我的代码是: val lines = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer-group", Map("hello" -> 5))val data=lines.map(_._2)数据打印() 我的输出有 50 个不同的值,格式如下 {"id:st04","data:26-02-2018
..
我刚刚从 http://zeppelin-project.org/docs/tutorial/tutorial.html(“流数据教程"部分).我现在遇到的问题是该应用程序似乎只能在本地工作.如果我将 Spark 解释器设置“master"从“local[*]"更改为“spark://master:7077",当我执行相同的 SQL 语句时,应用程序将不再产生任何结果.我做错了什么吗?我已经重新启
..
我有带有树结构的层次结构数据模型的数据表.例如:这是一个示例数据行: -------------------------------------------身份证 |姓名 |parentId |路径 |深度-------------------------------------55 |加拿大 |空|空|077 |安大略 |55 |/55 |1100|多伦多 |77 |/55/77 |2104|
..
我在 Web 服务器上有一个 .gz 文件,我想以流式方式使用该文件并将数据插入 Couchbase..gz 文件中只有一个文件,每行包含一个 JSON 对象. 由于 Spark 没有 HTTP 接收器,所以我自己写了一个(如下所示).我正在使用 Couchbase Spark 连接器 进行插入.但是,在运行时,该作业实际上并未插入任何内容.我怀疑这是由于我对 Spark 缺乏经验并且不知道
..
我正在运行一个 Spark 作业,就逻辑而言,它的性能非常好.但是,当我使用 saveAsTextFile 将文件保存在 s3 存储桶中时,我的输出文件的名称采用 part-00000、part-00001 等格式.有没有办法改变输出文件名? 谢谢. 解决方案 在 Spark 中,您可以使用 saveAsNewAPIHadoopFile 并将 hadoop 配置中的 mapred
..
我们期待使用 Spark Streaming(带水槽)和带窗口的 Spark SQL 实现一个用例,允许我们对一组数据执行 CEP 计算.(有关如何捕获和使用数据,请参见下文).这个想法是使用 SQL 来执行一些匹配特定条件的操作..根据每个传入的事件批次执行查询似乎很慢(随着它的进行). 这里的慢意味着说我配置了 600 秒的窗口大小和 20 秒的批处理间隔.(以每 2 秒 1 个输入的速
..
SparkContext 类中的 getOrCreate 方法的目的是什么?我不明白我们什么时候应该使用这种方法. 如果我有 2 个使用 spark-submit 运行的 spark 应用程序,并且在主方法中我使用 SparkContext.getOrCreate 实例化了 spark 上下文,则两个应用程序都将具有相同的上下文? 或者目的更简单,唯一的目的是当我创建一个spark应用
..
我有一个 DataSet.map 操作,需要从外部 REST API 中提取数据. REST API 客户端返回一个 Future[Int]. 是否可以让 DataSet.map 操作以某种方式异步等待这个 Future?或者我是否需要使用 Await.result 来阻塞线程?或者这只是没有完成的事情......即我应该尝试将API保存的数据加载到它自己的DataSet中,然后执行j
..
我在从 spark master url 运行应用程序时遇到一个奇怪的问题,其中 UI 无限期地报告“WAITING"的“STATE",因为 0 个内核显示在 RUNNING APPLICATIONs 表下,无论我配置什么核心计数. 我使用以下设置配置了我的应用程序,其中 spark.max.cores = 2 &spark.default.cores = 2 &内存设置为3GB.该机器是具
..
我有一个像这样的火花流应用: val message = KafkaUtils.createStream(...).map(_._2)message.foreachRDD( rdd => {如果(!rdd.isEmpty){val kafkaDF = sqlContext.read.json(rdd)kafkaDF.foreachPartition(我 =>{创建连接()i.foreach(行
..
我使用的是 Spark Streaming 2.1.我想定期刷新一些缓存表(由 spark 提供的数据源加载,如 parquet、MySQL 或用户定义的数据源). 如何刷新表格? 假设我加载了一些表 spark.read.format("").load().createTempView("my_table") 并且它也被缓存 spark.sql("缓存表 my_ta
..
我有一个特定的用例,我为同一客户有多个行,其中每个行对象看起来像: root-c1:BigInt-c2:字符串-c3:双倍-c4:双倍-c5:地图[字符串,整数] 现在我已经按列 c1 进行分组并将所有行收集为同一客户的列表,例如: c1, [Row1, Row3, Row4]c2, [Row2, Row5] 我试过这样做dataset.withColumn("combined", arr
..
我的项目中有一个场景,我正在使用 spark-sql-2.4.1 版本读取 kafka 主题消息.我能够使用结构化流处理这一天.收到数据并进行处理后,我需要将数据保存到 hdfs 存储中的相应镶木地板文件中. 我能够存储和读取镶木地板文件,我将触发时间保持在 15 秒到 1 分钟之间.这些文件非常小,因此导致文件很多. 这些 parquet 文件需要稍后通过 hive 查询读取.
..
Spark 生成了多个小型镶木地板文件.如何有效地处理生产者和消费者 Spark 作业上的少量镶木地板文件. 解决方案 import org.apache.hadoop.mapreduce.InputSplit;导入 org.apache.hadoop.mapreduce.RecordReader;导入 org.apache.hadoop.mapreduce.TaskAttemptCont
..
我有一个流式 JSON 数据,其结构可以用下面的案例类来描述 case class Hello(A: String, B: Array[Map[String, String]]) 相同的样本数据如下 |一个 |乙 ||-------|------------------------------------------||ABC |[{C:1, D:1}, {C:2, D:4}] ||XYZ
..
我使用的是带有 java8 版本的 spark-sql-2.4.1v.我有一个场景,我需要复制当前行并创建另一行修改几列数据如何在 spark-sql 中实现? 例如:给定 val data = List((“20",“分数",“学校",14 ,12),(“21"、“分数"、“学校"、13、13),(“22"、“比率"、“学校"、11 ,14))val df = data.toDF(“i
..
我正在使用 ubuntu 我使用 Intellij 使用 spark 依赖 未找到命令“spark",但可以通过以下方式安装:..(当我在 shell 中输入 spark 时) 我有两个用户 amine 和 hadoop_amine(设置了 hadoop hdfs) 当我尝试将数据帧保存到 HDFS (spark scala) 时: proceded.write.format("js
..
我使用 spark-sql-2.4.1 和 spark-cassandra-connector-2_11.jar 我正在尝试加入流数据集,如下所示: 数据集companyInfo_df = company_info_df.select("companyInfo.*" ).withColumn("companyInfoEventTs", ( col("eventTs").divide(10
..