apache-spark-dataset相关内容
如果我有一个数据集,每个记录的每个记录都是一个案例类,那么我按如下所示保存该数据集,以便使用序列化: myDS.persist(StorageLevel.MERORY_ONLY_SER) Spark是否使用Java/kyro序列化序列化数据集?还是像数据框一样,Spark有其自己的方式将数据存储在数据集中? 解决方案 Spark Dataset 不使用标准的序列化器.相反,它使用
..
说你有这个(编码自定义类型的解决方案来自此线程): //假设我们处理自定义类型类MyObj(val i:Int,val j:String)隐式val myObjEncoder = org.apache.spark.sql.Encoders.kryo [MyObj]val ds = spark.createDataset(Seq(new MyObj(1,"a"),new MyObj(2,"b"),
..
我正在尝试使用 RDD 创建一个 DataFrame . 首先,我使用以下代码创建 RDD - val account = sc.parallelize(Seq((1,null,2,"F"),(2,2,4,"F"),(3,3,6,"N"),(4,null,8,"F"))) 一切正常- 帐户:org.apache.spark.rdd.RDD [(Int,Any,Int,Strin
..
我有一个带有时间戳字段的spark数据框,我想将其转换为long数据类型。我使用了UDF,但独立代码可以正常工作,但是当我插入需要转换任何时间戳的通用逻辑时,它就无法正常工作。问题是如何将UDF的返回值返回到数据帧列 下面是代码段 val spark:SparkSession = SparkSession.builder()。master(“ local [*]”)。appName(
..
我按两列加入2个数据集,结果是包含550亿行的数据集。之后,我必须在该DS上按与联接中使用的列不同的列进行一些聚合。 问题是,尽管聚集列是唯一的,但数据已经正确分配,但Spark在加入后进行交换分区(花费太多时间处理550亿行)。我知道聚合密钥已正确分发,是否有办法告知Spark应用程序? 解决方案 1)转到Spark UI并检查“位置级别” 2)如果要联接大数据和小数据,请使用b
..
我正在使用spark-sql 2.4.x版本,对于Cassandra-3.x版本使用datastax-spark-cassandra-connector.连同卡夫卡. 我有一个来自kafka主题的财务数据场景.数据(基础数据集)包含companyId,year,prev_year字段信息. 如果列year === prev_year,那么我需要加入不同的表,即exchange_rate
..
对于以下数据集,要获取 Col1 的总摘要值,我做了 import org.apache.spark.sql.functions._ val totaldf = df.groupBy("Col1").agg(lit("Total").as("Col2"), sum("price").as("price"), sum("displayPrice").as("displayPrice"))
..
我知道有很多相同的问题,但没有一个能真正回答我的问题. 我有情景数据. val data_codes = Seq("con_dist_1","con_dist_2","con_dist_3","con_dist_4","con_dist_5") val codes = data_codes.toDF("item_code") val partitioned_cod
..
我正在尝试解决一个问题,例如,我有一个这样的数据集: (1, 3) (1, 4) (1, 7) (1, 2) 2)和(2 -> 7),我想将集合(2, 7)替换为(1, 7) 类似地,(3 -> 7)和(7 -> 4)还将(7,4)替换为(3, 4) 因此
..
我试图弄清楚如何基于另一行来更新某些行. 例如,我有一些数据,例如 Id | useraname | ratings | city -------------------------------- 1, philip, 2.0, montreal, ... 2, john, 4.0, montreal, ... 3, charles, 2.0, texas, ... 我想将同一城市
..
这是我的partitionBy条件,我需要根据数据框中的列值进行更改. val windowSpec = Window.partitionBy("col1", "clo2","clo3").orderBy($"Col5".desc) 现在,如果数据帧中列(col6)之一的值是I,则满足上述条件. 但是当column(col6)的值更改为O时,则处于以下条件 val wind
..
我有一个Dataset df,其中包含类型为string的两列(“键"和“值"). df.printSchema();给我以下输出: root |-- key: string (nullable = true) |-- value: string (nullable = true) value列的内容实际上是一个csv格式的行(来自kafka主题),该行的最后一个条目代表类
..
Spark Scala API具有Dataset#transform方法,可轻松链接自定义DataFrame转换,如下所示: val weirdDf = df .transform(myFirstCustomTransformation) .transform(anotherCustomTransformation) 我没有看到transform方法>文档中的pyspark .
..
我有一个.gz格式的压缩文件,是否可以使用spark DF/DS直接读取该文件? 详细信息:文件为带有制表符分隔的csv. 解决方案 读取压缩的csv的方式与读取未压缩的csv文件的方式相同.对于Spark版本2.0+,可以使用Scala通过以下方式完成操作(请注意制表符分隔符的额外选项): val df = spark.read.option("sep", "\t").csv
..
我对Spark 2.0数据集感到非常满意,因为它具有编译时类型安全性.但是这里有几个我无法解决的问题,我也没有为此找到好的文档. 问题1-对汇总列进行除法运算- 考虑下面的代码- 我有一个DataSet [MyCaseClass],我想在c1,c2,c3和sum(c4)/8上使用groupByKey.下面的代码可以很好地工作,如果我只是计算总和,但它会给出除法(8)的编译时错误.我不知道如何
..
我正在尝试根据“制造商"列的内容将数据集拆分为不同的数据集.这很慢 请提出一种改进代码的方法,以便它可以更快地执行并减少Java代码的使用. List lsts= countsByAge.collectAsList(); for(Row lst:lsts){ String man=lst.toString();
..
要利用Dataset的优化,我是否必须显式使用Dataframe's方法(例如df.select(col("name"), col("age")等)或调用 any 方法- 甚至是类似RDD的方法 (例如filter,map等)是否也可以进行优化? 解决方案 数据帧优化通常分为3种: 钨记忆管理 催化剂查询优化 整个阶段的代码生成器 钨记忆管理 在定义RDD [mycl
..
我在一天的时间范围内(86400)对(user,app)的尝试次数进行了计数.我想提取具有最新时间戳和计数的行,并删除不必要的先前计数.确保您的答案考虑了时间范围.一个拥有1台设备的用户每天或每周可以进行多次尝试,我想能够在每个特定窗口中检索带有最终计数的特定时刻. 我的初始数据集是这样的: val df = sc.parallelize(Seq( ("user1", "iphon
..
我正在尝试使用spark Dataset API读取json文件,问题是该json在某些字段名称中包含空格. 这将是一个json行 {"Field Name" : "value"} 我的案例课必须是这样的 case class MyType(`Field Name`: String) 然后我可以将文件加载到DataFrame中,它将加载正确的架构 val dataf
..
我们需要在Apache Spark 数据集中跨字符串实现Jaro-Winkler距离计算.我们是新兴的,在网络上搜索之后,我们找不到很多东西.如果您能指导我们,那就太好了.我们考虑使用 flatMap ,然后意识到这无济于事,然后我们尝试使用几个foreach循环,但无法弄清楚如何进行.因为每个字符串都必须与所有字符串进行比较.就像下面的数据集一样. RowFactory.create(0,
..