pyspark-sql相关内容

如何从 Kafka 读取 XML 格式的流数据?

我正在尝试使用 Spark 结构化流从 Kafka 主题中读取 XML 数据. 我尝试使用 Databricks spark-xml 包,但我收到一条错误消息,指出此包不支持流式读取.有什么方法可以使用结构化流从 Kafka 主题中提取 XML 数据? 我当前的代码: df = spark \.readStream \.format(“卡夫卡")\.format('com.datab ..

如何选择最后一行以及如何通过索引访问 PySpark 数据框?

来自像 这样的 PySpark SQL 数据帧 name 年龄城市abc 20定义 30 B 如何获取最后一行.(就像通过 df.limit(1) 我可以将数据帧的第一行放入新的数据帧中). 以及如何通过 index.like 行号访问数据帧行.12 或 200 . 在熊猫中我可以做到 df.tail(1) # 最后一行df.ix[rowno or index] # 按索引df ..
发布时间:2021-12-22 21:29:32 Python

在 PySpark 中,有没有办法使用运行时给出的 Python 类的函数动态注册 UDF?

我是 Python 新手,如果我的方法有任何错误,请道歉 我有一个场景,客户端可以提供其自定义 Python 函数,并希望将它们注册为 PySpark 中的 UDF. 根据我的初步理解,我期待一个函数,它返回函数名称和函数定义的字典,从导入模块并在运行时调用此方法. 随机自定义函数类示例 class CustomFuntions():def reverse_statement ..
发布时间:2021-11-14 23:31:58 Python

Spark RDD groupByKey + join vs join 性能

我在与其他用户共享的集群上使用 Spark.因此,仅根据运行时间来判断我的哪个代码运行效率更高是不可靠的.因为当我运行更高效的代码时,其他人可能会运行大量数据并使我的代码执行更长时间. 我可以在这里问两个问题吗: 我正在使用 join 函数来加入 2 个 RDD 并且我在使用 之前尝试使用 groupByKey()加入,像这样: rdd1.groupByKey().join(rdd2 ..
发布时间:2021-11-14 23:30:47 其他开发

java.io.IOException:无法运行程序“python":CreateProcess error=2,系统找不到指定的文件

我用pyspark配置了eclipse 我使用的是最新版本的 SPARK 和 PYTHON. 当我尝试编写代码并运行时.我得到以下错误. java.io.IOException:无法运行程序“python":CreateProcess error=2,系统找不到指定的文件 我写的代码如下 '''创建于 2017 年 12 月 23 日@作者:联想'''从 pyspark 导入 S ..
发布时间:2021-11-14 23:30:19 Java开发

通过组合类型和子类型的 Apache Spark 组

我在 spark 中有这个数据集, val sales = Seq((“华沙",2016,“脸书",“分享",100),(“华沙",2017,“脸书",“喜欢",200),(“波士顿",2015,“推特",“分享",50),(“波士顿",2016 年,“facebook",“分享",150),(“多伦多",2017,“推特",“喜欢",50)).toDF("city", "year","medi ..
发布时间:2021-11-14 23:28:51 其他开发

检查两个 pyspark Rows 是否相等

我正在为 Spark 作业编写单元测试,其中一些输出被命名为元组:pyspark.sql.Row 我如何断言他们的平等? actual = get_data(df)预期 = 行(总计 = 4,unique_ids = 2)self.assertEqual(实际,预期) 当我这样做时,这些值会以我无法确定的顺序重新排列. 解决方案 您的代码应该按编写的方式工作,因为根据 文档: ..
发布时间:2021-11-14 23:28:39 Python

Pyspark 数据帧中的 Timedelta - TypeError

我正在使用 pyspark 2.3.1 开发 Spark 2.3、Python 3.6 我有一个 Spark DataFrame,其中每个条目都是一个工作步骤,我想将一些行合并到一个工作会话中.这应该在下面的函数 getSessions 中完成.我相信它有效. 我进一步创建了一个包含我想要的所有信息的 RDD - 每个条目都是一个带有所需列的 Row 对象,看起来类型很好(一些数据被伪 ..
发布时间:2021-11-14 23:28:08 其他开发

java.io.IOException:无法运行程序“python":CreateProcess error=2,系统找不到指定的文件

我用pyspark配置了eclipse 我使用的是最新版本的 SPARK 和 PYTHON. 当我尝试编写代码并运行时.我得到以下错误. java.io.IOException:无法运行程序“python":CreateProcess error=2,系统找不到指定的文件 我写的代码如下 '''创建于 2017 年 12 月 23 日@作者:联想'''从 pyspark 导入 S ..
发布时间:2021-11-14 23:26:42 Java开发