pyspark-sql相关内容
我在Databricks笔记本中制作了多选小工具。 dbutils.widgets.multiselect("Scenario", "Actual", [str(x) for x in scenario_type]) 但我想使用选定的值来更新我拥有的表。 一旦只选择了一个项目,它就会起作用。 display(ur.filter((ur.scenario == getArgum
..
我觉得这里肯定漏掉了一些明显的东西,但我似乎无法在Spark SQL中动态设置变量值。 假设我有两个表tableSrc和tableBuilder,并且我正在创建tableDest。 我一直在尝试 上的变体 SET myVar FLOAT = NULL SELECT myVar = avg(myCol) FROM tableSrc; CREATE TABLE tableD
..
我正在尝试使用以下代码将CSV读取到pyspark控制台中: from pyspark.sql import SQLContext import pyspark sql_c = SQLContext(sc) df = sql_c.read.csv('join_rows_no_prepended_new_line.csv') 但是,当我有144 GB的空闲空间时,我收到一个关于内存使用的很
..
我有一个如下所示的DataFrame ID Date Amount 10001 2019-07-01 50 10001 2019-05-01 15 10001 2019-06-25 10 10001 2019-05-27 20 10002 2019-06-29 25 10002 2019-07-18
..
我正在尝试使用 Spark 结构化流从 Kafka 主题中读取 XML 数据. 我尝试使用 Databricks spark-xml 包,但我收到一条错误消息,指出此包不支持流式读取.有什么方法可以使用结构化流从 Kafka 主题中提取 XML 数据? 我当前的代码: df = spark \.readStream \.format(“卡夫卡")\.format('com.datab
..
我使用 Spark 2.0. 我想执行以下 SQL 查询: val sqlText = """选择f.ID 作为 TID,f.BldgID 作为 TBldgID,f.LeaseID 作为 TLeaseID,f.Period 作为 TPeriod,合并((选择f 电荷量从Fact_CMCharges f在哪里f.BldgID = Fact_CMCharges.BldgID限制 1),0) 作
..
我需要在我的 spark sql 表中实现一个自动递增列,我该怎么做.请指导我.我正在使用 pyspark 2.0 谢谢卡利安 解决方案 我会编写/重用 stateful Hive udf 并注册到 pySpark,因为 Spark SQL 确实对 Hive 有很好的支持. 在下面的代码中检查这一行 @UDFType(deterministic = false, statefu
..
来自像 这样的 PySpark SQL 数据帧 name 年龄城市abc 20定义 30 B 如何获取最后一行.(就像通过 df.limit(1) 我可以将数据帧的第一行放入新的数据帧中). 以及如何通过 index.like 行号访问数据帧行.12 或 200 . 在熊猫中我可以做到 df.tail(1) # 最后一行df.ix[rowno or index] # 按索引df
..
我是 Python 新手,如果我的方法有任何错误,请道歉 我有一个场景,客户端可以提供其自定义 Python 函数,并希望将它们注册为 PySpark 中的 UDF. 根据我的初步理解,我期待一个函数,它返回函数名称和函数定义的字典,从导入模块并在运行时调用此方法. 随机自定义函数类示例 class CustomFuntions():def reverse_statement
..
我在与其他用户共享的集群上使用 Spark.因此,仅根据运行时间来判断我的哪个代码运行效率更高是不可靠的.因为当我运行更高效的代码时,其他人可能会运行大量数据并使我的代码执行更长时间. 我可以在这里问两个问题吗: 我正在使用 join 函数来加入 2 个 RDD 并且我在使用 之前尝试使用 groupByKey()加入,像这样: rdd1.groupByKey().join(rdd2
..
我用pyspark配置了eclipse 我使用的是最新版本的 SPARK 和 PYTHON. 当我尝试编写代码并运行时.我得到以下错误. java.io.IOException:无法运行程序“python":CreateProcess error=2,系统找不到指定的文件 我写的代码如下 '''创建于 2017 年 12 月 23 日@作者:联想'''从 pyspark 导入 S
..
我有一个包含两列的数据框,listA 存储为 Seq[String] 和 valB 存储为 String>.我想创建第三列 valC,它将是 Int 类型,其值为 如果 valB 存在于 listA 中则为 1 否则为 0 我尝试执行以下操作: val dfWithAdditionalColumn = df.withColumn("valC", when($"listA".contain
..
我在 spark 中有这个数据集, val sales = Seq((“华沙",2016,“脸书",“分享",100),(“华沙",2017,“脸书",“喜欢",200),(“波士顿",2015,“推特",“分享",50),(“波士顿",2016 年,“facebook",“分享",150),(“多伦多",2017,“推特",“喜欢",50)).toDF("city", "year","medi
..
我在 Databricks 中有一个名为 customerDetails 的数据框. +--------------------+-----------+|客户姓名|客户 ID|+--------------------+------------+|约翰·史密斯 |0001||简伯恩斯|0002||弗兰克·琼斯 |0003|+--------------------+------------+
..
以下是 PySpark ETL 代码的最后两行: df_writer = DataFrameWriter(usage_fact)df_writer.partitionBy("data_date", "data_product").saveAsTable(usageWideFactTable, format=fileFormat,mode=writeMode,path=usageWideFactp
..
我正在为 Spark 作业编写单元测试,其中一些输出被命名为元组:pyspark.sql.Row 我如何断言他们的平等? actual = get_data(df)预期 = 行(总计 = 4,unique_ids = 2)self.assertEqual(实际,预期) 当我这样做时,这些值会以我无法确定的顺序重新排列. 解决方案 您的代码应该按编写的方式工作,因为根据 文档:
..
我正在使用 pyspark 2.3.1 开发 Spark 2.3、Python 3.6 我有一个 Spark DataFrame,其中每个条目都是一个工作步骤,我想将一些行合并到一个工作会话中.这应该在下面的函数 getSessions 中完成.我相信它有效. 我进一步创建了一个包含我想要的所有信息的 RDD - 每个条目都是一个带有所需列的 Row 对象,看起来类型很好(一些数据被伪
..
我在 Horton 沙箱上运行 pyspark-sql 代码 18/08/11 17:02:22 信息 spark.SparkContext:运行 Spark 1.6.3 版 # 代码从 pyspark.sql 导入 *从 pyspark.sql.types 导入 *rdd1 = sc.textFile ("/user/maria_dev/spark_data/products.csv")
..
我有一个包含两列的数据框,listA 存储为 Seq[String] 和 valB 存储为 String>.我想创建第三列 valC,它将是 Int 类型,其值为 如果 valB 存在于 listA 中则为 1 否则为 0 我尝试执行以下操作: val dfWithAdditionalColumn = df.withColumn("valC", when($"listA".contain
..
我用pyspark配置了eclipse 我使用的是最新版本的 SPARK 和 PYTHON. 当我尝试编写代码并运行时.我得到以下错误. java.io.IOException:无法运行程序“python":CreateProcess error=2,系统找不到指定的文件 我写的代码如下 '''创建于 2017 年 12 月 23 日@作者:联想'''从 pyspark 导入 S
..