pyspark-sql相关内容
我有一个简单的连接,我限制了两边.在解释计划中我看到在执行限制之前有一个 ExchangeSingle 操作,确实我看到在这个阶段集群中只有一个任务在运行. 这当然会显着影响性能(消除限制会消除单个任务瓶颈,但会延长连接,因为它适用于更大的数据集). limit 真的不可并行吗?如果是这样 - 是否有解决方法? 我在 Databricks 集群上使用 spark. 编辑:关
..
我有一个 pyspark 数据框,其中列偶尔会有与另一列匹配的错误值.它看起来像这样: |日期 |纬度 ||2017-01-01 |43.4553 ||2017-01-02 |42.9399 ||2017-01-03 |43.0091 ||2017-01-04 |2017-01-04 | 显然,最后的纬度值不正确.我需要删除任何和所有这样的行.我想过使用 .isin() 但我似乎无法让它工作.
..
我想将一列随机值添加到我正在测试的数据帧(每行都有一个 id).我正在努力在 Spark 会话中获得可重现的结果 - 每行 id 的随机值相同.我可以使用 重现结果 from pyspark.sql.functions import randnew_df = my_df.withColumn("rand_index", rand(seed = 7)) 但它仅在我在同一个 Spark 会话中运
..
我有以下 PySpark 输入数据框: +-------+------------+|索引 |值列表|+-------+--------------+|1.0 |[10,20,30] ||2.0 |[11,21,31] ||0.0 |[14,12,15] |+-------+--------------+ 地点: 索引:类型为 Double 值列表:输入矢量.(它是非数组) 从上
..
我想根据特定条件从大型 DataFrame 生成分层的 TFrecord 文件,为此我使用 write.partitionBy().我也在 SPARK 中使用了 tensorflow-connector,但这显然不能与 write.partitionBy() 操作一起使用.因此,除了尝试分两步工作之外,我还没有找到其他方法: 根据我的情况,使用 partitionBy() 重新分区数据帧,并
..
我在使用广播提示时遇到问题(可能是缺乏 SQL 知识). 我有一个类似的查询 SELECT */* 广播(a) */从一个内连接 b在 ....内连接 c在 .... 我想做 SELECT */* 广播(a) */从一个内连接 b在 ....内部连接 c/* 广播(AjoinedwithB)*/在 .... 我的意思是,我想强制广播加入(我宁愿避免更改火花参数以在任何地方强制它)
..
我的目标是在 Pandas DataFrame 中添加一个新列,但我遇到了一个奇怪的错误. 新列应该是现有列的转换,可以在字典/哈希图中进行查找. # 加载数据df = sqlContext.read.format(...).load(train_df_path)# 实例化地图some_map = {'一':0,'b': 1,'c': 1,}# 使用地图创建一个新列df['new_colu
..
我是 pyspark 的新手.我试图了解如何访问具有多层嵌套结构和数组的镶木地板文件.我需要用空值替换数据帧(带有嵌套模式)中的一些值,我已经看到了这个 解决方案 它适用于结构,但不确定它如何适用于数组. 我的架构是这样的 |-- unitOfMeasure: 结构体||-- 原始:结构|||-- id: 字符串|||-- codingSystemId: 字符串|||-- 显示:字符串||
..
我尝试使用在 Spark DataFrame 之前定义的常规 Spark 映射操作,如下所示: businessJSON = os.path.join(targetDir, 'business.json')businessDF = sqlContext.read.json(businessJSON)reviewJSON = os.path.join(targetDir, 'review.json
..
我有一个数据框 df.我已经对数据框执行了决策树分类算法.两列是执行算法时的标签和特征.该模型称为dtc.如何在 pyspark 中创建混淆矩阵? dtc = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label')dtcModel = dtc.fit(train)预测 = dtcModel.transform(te
..
是否可以使用 select 语句在 spark 上创建表? 我执行以下操作 import findsparkfindspark.init()导入pyspark从 pyspark.sql 导入 SQLContextsc = pyspark.SparkContext()sqlCtx = SQLContext(sc)spark_df = sqlCtx.read.format('com.datab
..
我正在尝试做一些非常简单的事情,但我遇到了一些非常愚蠢的斗争.我认为这一定与对 Spark 正在做什么的根本误解有关.我将不胜感激任何帮助或解释. 我有一个非常大(~3 TB、~300MM 行、25k 分区)的表,在 s3 中保存为镶木地板,我想给某人一个小样本作为单个镶木地板文件.不幸的是,这需要很长时间才能完成,我不明白为什么.我尝试了以下方法: tiny = spark.sql("S
..
我的 pyspark 数据框中有 500 列……有些是字符串类型,有些是 int 类型,有些是 boolean(100 个布尔列).现在,所有布尔列都有两个不同的级别 - Yes 和 No,我想将它们转换为 1/0 对于字符串,我有三个值 - 通过、失败和空.如何用 0 替换这些空值?fillna(0) 仅适用于整数 c1|c2 |c3 |c4|c5..... |c500是|是|通过|4
..
所以我想从目录中读取 csv 文件,作为 pyspark 数据帧,然后将它们附加到单个数据帧中.在 pyspark 中没有得到替代方案,就像我们在 Pandas 中所做的那样. 例如在 Pandas 中,我们这样做: files=glob.glob(path +'*.csv')df=pd.DataFrame()对于文件中的 f:dff=pd.read_csv(f,delimiter=','
..
假设我有一个具有此架构的 DataFrame x: xSchema = StructType([ \StructField("a", DoubleType(), True), \StructField("b", DoubleType(), True), \StructField("c", DoubleType(), True)]) 然后我有数据帧: DataFrame[a :double,
..
数据工程师您好! 我正在尝试使用名为 星界 这里是udf: def time_from_solar_noon(d, y):noon = astral.Astral().solar_noon_utc时间 = 中午(d,y)回程时间SolarNoon = F.udf(lambda d, y: time_from_solar_noon(d,y), TimestampType()) 按照我
..
用于创建查询的 Spark SQL 类似于 这个 - CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name[(col_name1 col_type1 [COMMENT col_comment1], ...)]使用数据源[选项 (key1=val1, key2=val2, ...)][PARTITIONED BY (col_
..
我正在尝试通过 pyspark 构建 sql 来实现这一点.目标是将多行合并成单行例子:我想转换这个 +-----+----+----+-----+|col1|col2|col3|col4|+-----+----+----+-----+|x |是 |z |13::1||x |是 |z |10::2|+-----+----+----+-----+ 到 +-----+----+----+----
..
我是 pyspark 的新手 我有一个数据集看起来像(只是几列的快照) 我想按键对我的数据进行分组.我的钥匙是 CONCAT(a.div_nbr,a.cust_nbr) 我的最终目标是将数据转换成JSON,格式如下 k1[{v1,v2,....},{v1,v2,....}], k2[{v1,v2,....},{v1,v2,....}],.... 例如 248138339 [{
..
在 Python 中学习 Spark 时,我无法理解 alias 方法的目的及其用法.文档 显示它被用于创建使用新名称的现有 DataFrame,然后将它们连接在一起: >>>从 pyspark.sql.functions 导入 *>>>df_as1 = df.alias("df_as1")>>>df_as2 = df.alias("df_as2")>>>join_df = df_as1.joi
..