apache-spark-dataset相关内容
我正在努力了解RDD,DataSet和DataFrame之间的转换是如何工作的. 我是Spark的新手,每次需要从一个数据模型传递到另一个模型时(特别是从RDD到Datasets和Dataframes),我都会陷入困境. 谁能解释给我正确的方法? 作为示例,现在我有一个RDD[org.apache.spark.ml.linalg.Vector],我需要将其传递给我的机器学习算法,例如KMea
..
我有2个实木复合地板零件文件part-00043-0bfd7e28-6469-4849-8692-e625c25485e2-c000.snappy.parquet(是2017年11月14日运行的零件文件)和part-00199-64714828-8a9e-4ae1-8735-c5102c0a834d-c000.snappy.parquet (是2017年11月16日运行的零件文件),并且都具有相同
..
试图了解Hive分区与Spark分区之间的关系,最终导致有关联接的问题. 我有2个外部Hive表;均由S3存储桶支持并由date分区;因此,每个存储桶中都有名称格式为date=/的键. 问题1: 如果我将这些数据读入Spark: val table1 = spark.table("table1").as[Table1Row] va
..
在以下代码段中,tryParquet函数尝试从Parquet文件中加载数据集(如果存在).如果没有,它将计算,保留并返回提供的数据集计划: import scala.util.{Try, Success, Failure} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.Dataset sealed
..
我正在使用2.1.1版编写一个Spark应用程序.以下代码在调用带有LocalDate参数的方法时出现错误? Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate - field (class: "java.time.LocalD
..
我想为数据集行分配一个唯一的ID.我知道有两个实现选项: 第一个选项: import org.apache.spark.sql.expressions.Window; ds.withColumn("id",row_number().over(Window.orderBy("a column"))) 第二个选项: df.withColumn("id", monotonicall
..
如何降低数据框的列名大小写而不是其值的大小写?使用RAW Spark SQL和Dataframe方法? 输入数据帧(想象一下,这些列中有100个是大写的) NAME | COUNTRY | SRC | CITY | DEBIT --------------------------------------------- "foo"| "NZ" | sala
..
我是Scala的新手.我正在尝试将scala列表(在源DataFrame上保存一些计算数据的结果)转换为Dataframe或Dataset.我没有找到任何直接的方法来做到这一点. 但是,我尝试了以下过程将列表转换为DataSet,但似乎不起作用.我提供以下3种情况. 有人可以给我带来些希望吗,如何进行转换?谢谢. import org.apache.spark.sql.{DataFra
..
我一直在阅读有关火花谓词下推和分区修剪的信息,以了解读取的数据量.我对此有以下疑问 假设我有一个包含列的数据集 (年份:国际,学校名称:字符串,学生ID:国际,已注册学科:字符串) 其中存储在磁盘上的数据按Year和SchoolName进行分区,并以拼花格式存储在例如Azure Data Lake存储器中. 1)如果我发出read spark.read(container).filte
..
我需要按特定顺序遍历数据帧,并应用一些复杂的逻辑来计算新列. 在下面的示例中,我将使用简单的表达式,其中s的当前值是所有先前值的乘积,因此似乎可以使用UDF甚至解析函数来完成.但是,实际上逻辑要复杂得多. 下面的代码完成了所需的操作 import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import
..
我有一堆列,示例如我的数据所示,如下所示. 我需要检查列中的错误,并且必须生成两个输出文件. 我正在使用Apache Spark 2.0,我想以一种有效的方式做到这一点. Schema Details --------------- EMPID - (NUMBER) ENAME - (STRING,SIZE(50)) GENDER - (STRING,SIZE(1)) Data ----
..
我正在为Spark Streaming的实现而苦苦挣扎. kafka发出的消息看起来像这样,但具有更多字段 {"event":"sensordata", "source":"sensors", "payload": {"actual data as a json}} {"event":"databasedata", "mysql":"sensors", "payload": {"act
..
我试图用Java创建Dataset,所以我编写了以下代码: public Dataset createDataset(){ List list = new ArrayList(); list.add(new Person("name", 10, 10.0)); Dataset dateset = sqlContext.createDataset
..
我正在处理代表事件流(例如从网站跟踪事件而解雇)的数据集.所有事件都有时间戳.我们经常遇到的一个用例是尝试查找给定字段的第一个非null值.因此,例如类似的东西可以使我们最有效地到达目的地: val eventsDf = spark.read.json(jsonEventsPath) case class ProjectedFields(visitId: String, userId:
..
在spark Dataset.filter中获取此空错误 输入CSV: name,age,stat abc,22,m xyz,,s 工作代码: case class Person(name: String, age: Long, stat: String) val peopleDS = spark.read.option("inferSchema","true") .o
..
我一直在尝试不同的方法来过滤类型化的数据集.事实证明,性能可能大不相同. 该数据集是基于1.6 GB的数据行(具有33列和4226047行)创建的.通过加载csv数据创建DataSet并将其映射到案例类. val df = spark.read.csv(csvFile).as[FireIncident] UnitId ='B02'上的过滤器应返回47980行.我测试了以下三种方法:
..
查看spark数据集上的select()函数,生成了各种函数签名: (c1: TypedColumn[MyClass, U1],c2: TypedColumn[MyClass, U2] ....) 这似乎暗示着我应该能够直接引用MyClass的成员并输入安全,但是我不确定如何... ds.select("member")当然可以工作..似乎ds.select(_.member)可能
..
我已经编写了使用SparkSQL访问Hive表的代码.这是代码: SparkSession spark = SparkSession .builder() .appName("Java Spark Hive Example") .master("local[*]") .config("hive.metastore.uris",
..
我可以很容易地在Scala中将DataFrame转换为Dataset: case class Person(name:String, age:Long) val df = ctx.read.json("/tmp/persons.json") val ds = df.as[Person] ds.printSchema 但是在Java版本中,我不知道如何将Dataframe转换为Datase
..
我正在尝试在两个表上进行相当简单的联接,没有什么复杂的. 加载两个表,进行联接和更新列,但它总是引发异常. 我注意到任务卡在了最后一个分区199/200上,最终崩溃了. 我的怀疑是数据歪斜,导致所有数据都加载到了最后一个分区199. SELECT COUNT(DISTINCT report_audit) FROM ReportDs = 1.5million. 而 SELEC
..