apache-spark相关内容

循环访问数据库中的文件失败

继续Managing huge zip files in dataBricks 数据库在30个文件后挂起。怎么办? 我已经将巨大的32 GB Zip分成了100个独立的部分。我已经从文件中分离了头文件,因此可以像处理任何CSV文件一样处理它。我需要根据列过滤数据。文件位于Azure Data Lake存储Gen1中,并且必须存储在那里。 在工作约30分钟后,尝试一次读取单个文件(或所 ..

在Spaklyr中创建虚拟变量?

我正在尝试扩展我的一些ML管道,我喜欢Spaklyr打开的RStudio、Spark和H2O的组合(http://spark.rstudio.com/) 我想弄明白的一件事是如何使用Spaklyr从数据帧中的字符向量创建伪COL。 我已经尝试了下面的方法,但我认为它可能混合了一些Spaklyr尚未实现的功能。 library(sparklyr) library(dplyr) s ..
发布时间:2022-09-03 16:22:44 其他开发

在Kubernetes上使用Spark写入输出时出现chmod错误

我正在使用AKS(Azure Kubernetes Service,Azure Kubernetes Service)来设置Spark集群,以便使用Kubernetes进行资源管理。我正在使用Spark-Submit以集群模式向K8提交PSPARK应用程序,我已经成功地让应用程序正常运行。 我设置了Azure文件共享来存储应用程序脚本和Persistent Volume,并设置了一个指向此文 ..
发布时间:2022-09-01 11:25:09 其他开发

Spark Have SQL返回空数据帧

我正在使用胶水作为我的母公司元存储。我有一个每小时向注册分区写入文件的每小时作业。 表定义: CREATE EXTERNAL TABLE table_name ( column_1 STRING, column_2 STRING ) PARTITIONED BY (process_date DATE) STORED AS PARQUET LOCATION "s3://bucket/ta ..
发布时间:2022-08-16 19:22:46 其他开发

PYSpark没有打印Kafka流中的任何数据,也没有失败

我是Spark和Kafka的新手。使用从免费Kafka服务器提供商(Cloudkarafka)创建的Kafka服务器来使用数据。在运行pyspark代码(在Databricks上)以使用流数据时,流只是保持初始化,并且不获取任何内容。它既不会失败,也不会停止执行,只是将状态保持为流正在初始化。 代码: from pyspark.sql.functions import col kaf ..

在集群模式下与Spark-Submit共享配置文件

我在开发期间一直在“客户端”模式下运行我的Spark作业。我使用“--file”与执行器共享配置文件。驱动程序正在本地读取配置文件。现在,我想在“集群”模式下部署作业。我现在无法与驱动程序共享配置文件。 例如,我将配置文件名作为Extra Java Options传递给驱动程序和执行器。我正在使用SparkFiles.get()读取文件 val configFile = org.a ..
发布时间:2022-08-08 17:34:31 其他开发

如何将气流调度器部署到AWS EC2?

我正在尝试使用Airflow在AWS上建立一条简单的数据管道。 我已经创建了一个DAG,它每天将数据抓取到S3,然后使用在EMR上运行的Spark作业进行处理。 我当前在本地笔记本电脑上运行气流计划程序,但我当然知道这不是一个好的长期解决方案。 所以我想了解一些关于将调度程序部署到EC2的提示(实例大小、部署进程或任何其他有用的信息) 推荐答案 在本地运行通常不是可行的后期 ..

Azure BLOB商店是否支持拼花板柱投影和下推过滤器/谓词

IV‘我已经阅读了一些关于镶木地板格式以及Spark如何与其集成的内容。 作为列式存储,parquet really shines只要Spark可以与底层存储协作,就可以执行投影,而不必加载所有数据,并指示存储根据各种统计数据加载特定的列块(当涉及筛选器时)。 我看到lecture on youtube(21:54)警告对象存储不支持下推过滤器(特别是以Amazon S3为例)。 ..
发布时间:2022-07-19 22:57:40 其他开发

如何使用PYSPARK从Spark获得批次行

我有一个包含60多亿行数据的Spark RDD,我想使用Train_on_Batch来训练深度学习模型。我不能将所有行都放入内存中,所以我希望一次获得10K左右的内存,以批处理成64或128个的块(取决于型号大小)。我目前使用的是rdd.Sample(),但我认为这不能保证我会得到所有行。有没有更好的方法来划分数据,使其更易于管理,这样我就可以编写一个生成器函数来获取批处理?我的代码如下: ..
发布时间:2022-07-15 23:08:08 Python

Pandas UDF在PySpark中的改进

我必须在Pyspark中的滑动窗口内执行聚合。特别是,我必须执行以下操作: 一次考虑100天的数据 组按ID的给定列 取聚合的最后一个值 求和并返回结果 这些任务必须在滑动窗口中使用.rangeBetween(-100 days, 0) 进行计算 我可以很容易地通过构造一个Pandas UDF来实现这个结果,该UDF接受Pyspark DF的一些列作为输入,将它们转换为Pan ..