pyspark相关内容

循环访问数据库中的文件失败

继续Managing huge zip files in dataBricks 数据库在30个文件后挂起。怎么办? 我已经将巨大的32 GB Zip分成了100个独立的部分。我已经从文件中分离了头文件,因此可以像处理任何CSV文件一样处理它。我需要根据列过滤数据。文件位于Azure Data Lake存储Gen1中,并且必须存储在那里。 在工作约30分钟后,尝试一次读取单个文件(或所 ..

在Kubernetes上使用Spark写入输出时出现chmod错误

我正在使用AKS(Azure Kubernetes Service,Azure Kubernetes Service)来设置Spark集群,以便使用Kubernetes进行资源管理。我正在使用Spark-Submit以集群模式向K8提交PSPARK应用程序,我已经成功地让应用程序正常运行。 我设置了Azure文件共享来存储应用程序脚本和Persistent Volume,并设置了一个指向此文 ..
发布时间:2022-09-01 11:25:09 其他开发

PYSpark没有打印Kafka流中的任何数据,也没有失败

我是Spark和Kafka的新手。使用从免费Kafka服务器提供商(Cloudkarafka)创建的Kafka服务器来使用数据。在运行pyspark代码(在Databricks上)以使用流数据时,流只是保持初始化,并且不获取任何内容。它既不会失败,也不会停止执行,只是将状态保持为流正在初始化。 代码: from pyspark.sql.functions import col kaf ..

如何使用PYSPARK从Spark获得批次行

我有一个包含60多亿行数据的Spark RDD,我想使用Train_on_Batch来训练深度学习模型。我不能将所有行都放入内存中,所以我希望一次获得10K左右的内存,以批处理成64或128个的块(取决于型号大小)。我目前使用的是rdd.Sample(),但我认为这不能保证我会得到所有行。有没有更好的方法来划分数据,使其更易于管理,这样我就可以编写一个生成器函数来获取批处理?我的代码如下: ..
发布时间:2022-07-15 23:08:08 Python

Pandas UDF在PySpark中的改进

我必须在Pyspark中的滑动窗口内执行聚合。特别是,我必须执行以下操作: 一次考虑100天的数据 组按ID的给定列 取聚合的最后一个值 求和并返回结果 这些任务必须在滑动窗口中使用.rangeBetween(-100 days, 0) 进行计算 我可以很容易地通过构造一个Pandas UDF来实现这个结果,该UDF接受Pyspark DF的一些列作为输入,将它们转换为Pan ..

ApacheSpark中的高效字符串匹配

我使用OCR工具从截图中提取文本(每个截图大约1-5句)。但是,在手动验证提取的文本时,我注意到不时会出现几个错误。 考虑到文字“你好星火!我真的很喜欢😊❤️!”,我注意到: 1)字母“i”、“!”和“l”被替换为“|”。 2)表情符号未正确提取并被其他字符替换或被省略。 3)不时删除空格。 结果,我可能会得到这样的字符串:“Hello here 7l|Real|y ..
发布时间:2022-06-21 13:23:33 Python

查找最终的父代

我正在努力寻找有Dir pandas 的终极父母。但这项任务有一个特长,那就是图表不太适合,或者我只是不知道如何正确使用它。 输入: 子项 父级 类 1001 8888 A 1001 1002 D 1001 1002 C 1001 1003 C 1003 6666 G 1002 9999 H 输出: 子项 旗舰_父级 类 连接 1001 8888 A 直接 100 ..
发布时间:2022-06-20 17:53:28 Python

更改SPARK_TEMPORY目录路径

是否可以更改Spark在写入前保存其临时文件的_temporary目录? 具体地说,因为我正在写入表的单个分区,所以我希望临时文件夹位于分区文件夹中。 可能吗? 文件输出委员会无法使用默认的${mapred.output.dir}/_temporary 由于其实现方式,文件输出委员会会创建一个推荐答案子目录来写入文件,并在提交后移到${mapred.output.dir}。 ..
发布时间:2022-06-10 20:49:29 其他开发