apache-spark相关内容
我正在尝试将数组类型的列乘以标量。此标量也是同一个PySpark数据帧中的值。 例如,我有这样的数据帧: df = sc.parallelize([([1, 2],3)]).toDF(["l","factor"]) +------+------+ | l|factor| +------+------+ |[1, 2]| 3| +------+------+ 我想要实
..
我正在尝试使用Spark读取VCF文件。 Spark 3.0 spark.read.format("com.databricks.vcf").load("vcfFilePath") 错误: java.lang.ClassNotFoundException: Failed to find data source: com.databricks.vcf. Please find
..
继续Managing huge zip files in dataBricks 数据库在30个文件后挂起。怎么办? 我已经将巨大的32 GB Zip分成了100个独立的部分。我已经从文件中分离了头文件,因此可以像处理任何CSV文件一样处理它。我需要根据列过滤数据。文件位于Azure Data Lake存储Gen1中,并且必须存储在那里。 在工作约30分钟后,尝试一次读取单个文件(或所
..
我正在尝试扩展我的一些ML管道,我喜欢Spaklyr打开的RStudio、Spark和H2O的组合(http://spark.rstudio.com/) 我想弄明白的一件事是如何使用Spaklyr从数据帧中的字符向量创建伪COL。 我已经尝试了下面的方法,但我认为它可能混合了一些Spaklyr尚未实现的功能。 library(sparklyr) library(dplyr) s
..
我正在使用AKS(Azure Kubernetes Service,Azure Kubernetes Service)来设置Spark集群,以便使用Kubernetes进行资源管理。我正在使用Spark-Submit以集群模式向K8提交PSPARK应用程序,我已经成功地让应用程序正常运行。 我设置了Azure文件共享来存储应用程序脚本和Persistent Volume,并设置了一个指向此文
..
通过SCI-KIT学习,我们可以根据累积方差图确定希望保留的功能数量,如下所示 from sklearn.decomposition import PCA pca = PCA() # init pca pca.fit(dataset) # fit the dataset into pca model pca.explained_variance_ratio # this attribut
..
我正在使用胶水作为我的母公司元存储。我有一个每小时向注册分区写入文件的每小时作业。 表定义: CREATE EXTERNAL TABLE table_name ( column_1 STRING, column_2 STRING ) PARTITIONED BY (process_date DATE) STORED AS PARQUET LOCATION "s3://bucket/ta
..
在EMR Spark上,通过数据帧向S3写入RDD[String]。 rddString .toDF() .coalesce(16) .write .option("compression", "gzip") .mode(SaveMode.Overwrite) .json(s"s3n://my-bucket/some/new/path") 保存模式为Overwri
..
我正在运行EMR笔记本中的所有代码。 SPEK.VERSION '3.0.1-amzn-0' temp_df.printSchema() root |-- dt: string (nullable = true) |-- AverageTemperature: double (nullable = true) |-- AverageTemperatureUncertai
..
Spark解释镶木地板柱子的方式有一些问题。 我有一个具有确认架构(df.schema()方法)的Oracle源代码: root |-- LM_PERSON_ID: decimal(15,0) (nullable = true) |-- LM_BIRTHDATE: timestamp (nullable = true) |-- LM_COMM_METHOD: string
..
我正在尝试创建一个函数来检查数据的质量(nans/null等) 我在一个PySpark DataFrame上运行了以下代码 df.select([f.count(f.when((f.isnan(c) | f.col(c).isNull()), c)).alias(c) for c in cols_check]).show() 只要要检查的列是字符串/整数,我就没有问题。但是,当我检查数据
..
我是Spark和Kafka的新手。使用从免费Kafka服务器提供商(Cloudkarafka)创建的Kafka服务器来使用数据。在运行pyspark代码(在Databricks上)以使用流数据时,流只是保持初始化,并且不获取任何内容。它既不会失败,也不会停止执行,只是将状态保持为流正在初始化。 代码: from pyspark.sql.functions import col kaf
..
我觉得这里肯定漏掉了一些明显的东西,但我似乎无法在Spark SQL中动态设置变量值。 假设我有两个表tableSrc和tableBuilder,并且我正在创建tableDest。 我一直在尝试 上的变体 SET myVar FLOAT = NULL SELECT myVar = avg(myCol) FROM tableSrc; CREATE TABLE tableD
..
我在开发期间一直在“客户端”模式下运行我的Spark作业。我使用“--file”与执行器共享配置文件。驱动程序正在本地读取配置文件。现在,我想在“集群”模式下部署作业。我现在无法与驱动程序共享配置文件。 例如,我将配置文件名作为Extra Java Options传递给驱动程序和执行器。我正在使用SparkFiles.get()读取文件 val configFile = org.a
..
我正在尝试使用Airflow在AWS上建立一条简单的数据管道。 我已经创建了一个DAG,它每天将数据抓取到S3,然后使用在EMR上运行的Spark作业进行处理。 我当前在本地笔记本电脑上运行气流计划程序,但我当然知道这不是一个好的长期解决方案。 所以我想了解一些关于将调度程序部署到EC2的提示(实例大小、部署进程或任何其他有用的信息) 推荐答案 在本地运行通常不是可行的后期
..
IV‘我已经阅读了一些关于镶木地板格式以及Spark如何与其集成的内容。 作为列式存储,parquet really shines只要Spark可以与底层存储协作,就可以执行投影,而不必加载所有数据,并指示存储根据各种统计数据加载特定的列块(当涉及筛选器时)。 我看到lecture on youtube(21:54)警告对象存储不支持下推过滤器(特别是以Amazon S3为例)。
..
我有一个包含60多亿行数据的Spark RDD,我想使用Train_on_Batch来训练深度学习模型。我不能将所有行都放入内存中,所以我希望一次获得10K左右的内存,以批处理成64或128个的块(取决于型号大小)。我目前使用的是rdd.Sample(),但我认为这不能保证我会得到所有行。有没有更好的方法来划分数据,使其更易于管理,这样我就可以编写一个生成器函数来获取批处理?我的代码如下:
..
我必须在Pyspark中的滑动窗口内执行聚合。特别是,我必须执行以下操作: 一次考虑100天的数据 组按ID的给定列 取聚合的最后一个值 求和并返回结果 这些任务必须在滑动窗口中使用.rangeBetween(-100 days, 0) 进行计算 我可以很容易地通过构造一个Pandas UDF来实现这个结果,该UDF接受Pyspark DF的一些列作为输入,将它们转换为Pan
..
我有一个方案,其中我将XML数据放在DataFrame列中。 性别 更新时间 访问者 F 1574264158 <;?xml版本=&qot;1.0;编码=";utf-8 我想使用UDF将访问者列-嵌套的XML字段解析为Dataframe中的列 XML格式
..
我想从火花流到几个弹性搜索索引。 我创建了成对的,当我执行groupByKey时,结果是>的元组,但为了使用ElasticSearch-Spark插件保存到ElasticSearch,我需要JavaRDD的值。 我知道有一个可以从List创建Java RDD的SparkConext.p
..