spark-dataframe相关内容

如何从字典中创建数据框,其中每个项目都是 PySpark 中的一列

我想从字典中创建一个新的数据框.字典包含列名作为键和列数据列表作为值.例如: col_dict = {'col1': [1, 2, 3],'col2': [4, 5, 6]} 我需要它作为一个看起来像这样的数据框: +------+------+|列 1 |col2 |+------+------+|1|4||2|5||3|6|+------+------+ 似乎没有一种简单的方法可以做到 ..
发布时间:2021-11-14 23:06:52 其他开发

Spark将列转换为存储在字符串中的sql类型

简单的请求是我需要帮助将一列添加到数据框中,但是,该列必须为空,其类型来自 ...spark.sql.types,并且必须从字符串中定义类型. > 我可能可以用 ifs 或 case 来做到这一点,但我正在寻找更优雅的东西.不需要为 org.apache.spark.sql.types 中的每种类型编写案例的东西 如果我这样做: df = df.withColumn("col_name ..
发布时间:2021-11-14 23:06:16 其他开发

如何使用 Spark 执行插入覆盖?

我正在尝试将我们的一个 ETL Hive 脚本转换为 Spark,其中 Hive ETL 脚本维护一个表,其中每晚需要在新同步之前删除部分数据.Hive ETL 使用插入覆盖将主表删除超过 3 天的数据.基本上创建一个临时表,其中的数据不超过三天,然后覆盖主表. 使用 Spark(使用 Scala)时,我不断收到此错误,无法写入相同的源.这是我的代码: spark.sql ("Select ..
发布时间:2021-11-14 23:05:23 其他开发

引发内存多次迭代

我有一个 spark 作业(在 spark 1.3.1 中运行)必须迭代多个键(大约 42 个)并处理该作业.这是程序的结构 从地图中获取密钥 从 hive(下面的 hadoop-yarn)中获取与键匹配的数据作为数据框 处理数据 将结果写入 hive 当我一键运行时,一切正常.当我使用 42 个键运行时,在第 12 次迭代时出现内存不足异常.有没有办法可以在每次迭代之间清理内存 ..
发布时间:2021-11-14 23:05:08 其他开发

使用 spark csv 包读取非常大的文件时出错

我们正在尝试使用 spark-csv 和 univocity 1.5.0 解析器读取一个 3 gb 文件,该文件的一个列中有多个换行符,但是该文件在某些​​行的多列中被拆分换行符.这种情况发生在大文件的情况下. 我们使用的是 spark 1.6.1 和 Scala 2.10 以下是我用来读取文件的代码: sqlContext.read.format("com.databricks.s ..
发布时间:2021-11-14 23:04:46 其他开发

Pyspark - 排名列保持联系

我正在寻找一种方法来对保留关系的数据帧的列进行排名.特别是对于这个例子,我有一个 pyspark 数据框,如下所示,我想在其中为 colA & 生成排名.colB(虽然我想支持能够对 N 个列进行排名) +--------+----------+-----+----+|实体|身份证|colA|colB|+--------------------+-----+----+|a|8589934652 ..
发布时间:2021-11-14 23:04:22 其他开发

无法从直线访问 Spark 2.0 临时表

在 Spark 1.5.1 中,我已经能够使用 Thrift Server 从 Beeline 访问 spark-shell 临时表.通过阅读 Stackoverflow 上相关问题的答案,我已经能够做到这一点. 但是,升级到 Spark 2.0 后,我无法再从 Beeline 看到临时表,这是我正在遵循的步骤. 我正在使用以下命令启动 spark-shell: ./bin/spar ..
发布时间:2021-11-14 23:03:58 其他开发

如何从字典中创建数据框,其中每个项目都是 PySpark 中的一列

我想从字典中创建一个新的数据框.字典包含列名作为键和列数据列表作为值.例如: col_dict = {'col1': [1, 2, 3],'col2': [4, 5, 6]} 我需要它作为一个看起来像这样的数据框: +------+------+|列 1 |col2 |+------+------+|1|4||2|5||3|6|+------+------+ 似乎没有一种简单的方法可以做到 ..
发布时间:2021-11-14 23:03:55 其他开发

Spark Dataframes 已成功创建但无法写入本地磁盘

我使用 IntelliJ IDE 在 Microsoft Windows 平台上执行 Spark Scala 代码. 我有四个 Spark 数据帧,每个数据帧大约有 30000 条记录,我尝试从这些数据帧中的每一个中取出一列作为我要求的一部分. 我使用了 Spark SQL 函数来完成它并成功执行.当我执行 DF.show() 或 DF.count() 方法时,我能够在屏幕上看到结果, ..
发布时间:2021-11-14 23:03:52 其他开发

如何合并由 SPARK 数据框创建的文件夹中的所有零件文件并在 Scala 中重命名为文件夹名称

嗨,我有我的 spark 数据框的输出,它创建了文件夹结构并创建了部分文件.现在我必须合并文件夹内的所有部分文件并将该文件重命名为文件夹路径名. 这就是我做分区的方式 df.write.partitionBy("DataPartition","PartitionYear").format("csv").option("nullValue", "").option("header", "tr ..
发布时间:2021-11-14 23:03:36 其他开发

在不停止进程的情况下刷新 Spark 实时流中的数据帧

在我的应用程序中,我从 Kafka 队列获得了一个帐户流(使用 Spark 流和 kafka) 而且我需要从 S3 获取与这些帐户相关的属性,因此我计划缓存 S3 结果数据帧,因为 S3 数据目前至少不会更新一天,将来可能会更改为 1 小时或 10 分钟.所以问题是如何在不停止进程的情况下定期刷新缓存的数据帧. **更新:我计划在 S3 中有更新时将事件发布到 kafka,使用 SNS ..

spark Dataframe 执行 UPDATE 语句

伙计们, 我需要使用 Apache Spark DataFrame 执行 jdbc 操作.基本上我有一个名为 Measures 的历史 jdbc 表,我必须在其中执行两个操作: 1.将旧度量记录的endTime 有效性属性设置为当前时间 2.插入新的小节记录设置 endTime 为 9999-12-31 谁能告诉我如何执行(如果可以的话)第一个操作的更新语句和第二个操作的插 ..
发布时间:2021-11-14 23:02:44 其他开发