spark-dataframe相关内容
这里我创建了一个日期框架,如下所示, df = spark.createDataFrame([('a',5,'R','X'),('b',7,'G','S'),('c',8,'G','S')],["Id","V1","V2","V3"]) 看起来像 +---+---+---+---+|编号|V1|V2|V3|+---+---+---+---+|一个|5|R|X||乙|7|G|S||| |8|G
..
简而言之,我正在利用 spark-xml 对 XML 文件进行一些解析.但是,使用它会删除我感兴趣的所有值中的前导零.但是,我需要最终输出,它是一个 DataFrame,以包含前导零.我不确定/无法想出一种方法来向我感兴趣的列添加前导零. val df = spark.read.format("com.databricks.spark.xml").option("rowTag", "输出").o
..
我对 Spark 还是比较陌生,但我已经能够创建 Spark 应用程序我需要能够使用 JDBC 驱动程序从我们的 SQL Server 重新处理数据(我们正在删除昂贵的 SP),该应用程序加载了几个表从 Sql Server 通过 JDBC 到数据帧,然后我做了一些连接、一个组和一个过滤器,最后通过 JDBC 将一些数据重新插入到不同的表中.所有这些在 Amazon Web Services 中的
..
python Pandas 库包含以下函数: DataFrame.merge(right, how='inner', on=None, left_on=None, right_on=None, left_index=False,right_index=False, sort=False, suffixes=('_x', '_y'), copy=True,指标=假) 指标字段结合 Panda 的
..
有谁知道为什么在加入多个 PySpark DataFrames 时使用 Python3 的 functools.reduce() 会导致比使用 for 循环迭代地加入相同的 DataFrames 更差的性能?具体来说,这会导致大幅减速,然后出现内存不足错误: def join_dataframes(list_of_join_columns, left_df, right_df):返回 left_
..
我正在使用 spark sql 对我的数据集运行查询.查询的结果很小,但仍然是分区的. 我想合并生成的 DataFrame 并按列对行进行排序.我试过了 DataFrame 结果 = sparkSQLContext.sql("my sql").coalesce(1).orderBy("col1")result.toJSON().saveAsTextFile("输出") 我也试过 Dat
..
在 MySQL 中,我可以有这样的查询: 选择cast(from_unixtime(t.time, '%Y-%m-%d %H:00') as datetime) as timeHour, ...从some_table t通过...分组时间,...订购时间,... 其中 GROUP BY 中的 timeHour 是选择表达式的结果. 但是我刚刚尝试了一个类似于 Sqark SQL 中的查询
..
HDFS 上的根目录:/tmp/hive 应该是可写的.当前权限为:rwx-------- 嗨,以下 Spark 代码我在 CDH 5.8 & Eclipse 中执行超越运行时异常 public static void main(String[] args) {final SparkConf sparkConf = new SparkConf().setMaster("local").se
..
下面的代码正在运行并从文本文件创建一个 Spark 数据帧.但是,我正在尝试使用 header 选项将第一列用作标题,但由于某种原因,它似乎没有发生.我不明白为什么!这一定是愚蠢的,但我无法解决这个问题. >>>from pyspark.sql import SparkSession>>>spark = SparkSession.builder.master("local").appName("
..
Apache Spark Dataset API 有两种方法,即 head(n:Int) 和 take(n:Int). Dataset.Scala 源代码包含 def take(n: Int): Array[T] = head(n) 在这两个函数之间找不到执行代码的任何差异.为什么 API 有两种不同的方法来产生相同的结果? 解决方案 我已经尝试过 &发现 head(n) 和 t
..
我有一份 Spark/Scala 工作,我在其中执行此操作: 1:计算一个大的DataFrame df1 + cache 到内存中 2:使用df1计算dfA 3:将原始数据读入df2(再次,它很大)+ cache it 在执行 (3) 时,我不再需要 df1.我想确保它的空间得到释放.我在 (1) 处缓存,因为这个 DataFrame 在 (2) 中被使用,这是确保我不会每次都重新
..
我是 Spark 2.0 的新手,我在我们的代码库中使用了数据集.我有点注意到我需要在我们的代码中到处import spark.implicits._.例如: 文件AA类{定义作业(火花:SparkSession)= {导入 spark.implcits._//创建数据集dsval b = 新 B(火花)b.doSomething(ds)做某事(ds)}private def doSomethi
..
Spark - 如何使用以下方法获取逻辑/物理查询执行 通过节俭 通过 SparkInteractor 解决方案 您可以通过 thrift 在直线中使用带有查询的解释语句,如下所示. EXPLAIN EXTENDED select * from sr23 join sr12 [] 火花拦截器是什么意思?它是spark-sql shell.?如果是,那么你可以使用上面的查询.
..
我目前正在尝试通过 Spark SQL 将一个非常大的 MySQL 表的内容批量迁移到一个镶木地板文件中.但是这样做时,我很快就会耗尽内存,即使将驱动程序的内存限制设置得更高(我在本地模式下使用 spark).示例代码: 数据集ds = spark.read().format("jdbc").option("url", url).option("driver", "com.mysql.jdbc.
..
在追加模式下将 DF 插入 Hive 内部表的正确方法是什么.似乎我们可以使用“saveAsTable"方法直接将 DF 写入 Hive 或将 DF 存储到临时表然后使用查询. df.write().mode("append").saveAsTable("tableName") 或 df.registerTempTable("temptable")sqlContext.sql("CREATE
..
我是 Apache Spark 的新手. 我的工作是读取两个 CSV 文件,从中选择一些特定的列、合并、聚合并将结果写入单个 CSV 文件. 例如, CSV1 name,age,deparment_id CSV2 department_id,deparment_name,location 我想获得第三个 CSV 文件 姓名、年龄、部门名称 我正在将两个 CSV 加载到
..
我正在尝试将数据帧转换为 RDD,然后执行下面的一些操作以返回元组: df.rdd.map { t=>(t._2 + "_" + t._3 , t)}.take(5) 然后我得到了下面的错误.谁有想法?谢谢! :37: 错误:值 _2 不是 org.apache.spark.sql.Row 的成员(t._2 + "_" + t._3 , t)^ 解决方案 当你将 DataFrame 转换
..
我正在研究 spark mllib 算法.我拥有的数据集是这种形式 Company":"XXXX","CurrentTitle":"XYZ","Edu_Title":"ABC","Exp_mnth":.(还有更多类似的值) 我正在尝试将字符串值原始编码为数字值.因此,我尝试使用 zipwithuniqueID 作为每个字符串值的唯一值.出于某种原因,我无法将修改后的数据集保存到磁盘.我
..
我想计算 DataFrame 中某一列的百分位数?我在 Spark 聚合函数中找不到任何 percentile_approx 函数. 例如在 Hive 中,我们有 percentile_approx,我们可以通过以下方式使用它 hiveContext.sql("select percentile_approx("Open_Rate",0.10) from myTable); 但出于性能原
..
我想了解 pyspark 代码中的分析. 如下:https://github.com/apache/spark/pull/2351 >>>sc._conf.set("spark.python.profile", "true")>>>rdd = sc.parallelize(range(100)).map(str)>>>rdd.count()100>>>sc.show_profiles()=
..