Spark:加载多个文件、单独分析、合并结果并保存 [英] Spark: Load multiple files, analyze individually, merge results, and save

查看:172
本文介绍了Spark:加载多个文件、单独分析、合并结果并保存的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Spark 的新手,不知道如何问这个问题(使用哪些术语等),所以这是我在概念上试图实现的目标的图片:

I'm new to Spark and not quite how to ask this (which terms to use, etc.), so here's a picture of what I'm conceptually trying to accomplish:

我有很多小的、单独的 .txt分类帐"文件(例如,当时带有时间戳和属性值的行分隔文件).

I have lots of small, individual .txt "ledger" files (e.g., line-delimited files with a timestamp and attribute values at that time).

我想:

  1. 将每个分类帐"文件读入单独的数据框(阅读:不合并为一个大数据框);

  1. Read each "ledger" file into individual data frames (read: NOT combining into one, big data frame);

对每个单独的数据框进行一些基本的计算,得到一行新的数据值;然后

Perform some basic calculations on each individual data frame, which result in a row of new data values; and then

将所有单独的结果行合并为一个最终对象 &以行分隔文件的形式将其保存到磁盘.

Merge all the individual result rows into a final object & save it to disk in a line-delimited file.

似乎我找到的几乎每个答案(在谷歌搜索相关术语时)都是关于将多个文件加载到单个 RDD 或 DataFrame 中,但我确实找到了这个 Scala 代码:

It seems like nearly every answer I find (when googling related terms) is about loading multiple files into a single RDD or DataFrame, but I did find this Scala code:

val data = sc.wholeTextFiles("HDFS_PATH")
val files = data.map { case (filename, content) => filename}
def doSomething(file: String) = { 
println (file);

 // your logic of processing a single file comes here

 val logData = sc.textFile(file);
 val numAs = logData.filter(line => line.contains("a")).count();
 println("Lines with a: %s".format(numAs));

 // save rdd of single file processed data to hdfs comes here
}

files.collect.foreach( filename => {
    doSomething(filename)
})

...但是:

A.我不知道这是否使读取/分析操作并行化,并且

A. I can't tell if this parallelizes the read/analyze operation, and

B.我不认为它提供了将结果合并到单个对象中的功能.

B. I don't think it provides for merging the results into a single object.

非常感谢任何方向或建议!

Any direction or recommendations are greatly appreciated!

更新

似乎我正在尝试做的事情(在多个文件上并行运行脚本,然后合并结果)可能需要类似 线程池 (?).

It seems like what I'm trying to do (run a script on multiple files in parallel and then combine results) might require something like thread pools (?).

为了清楚起见,下面是我想对通过读取账本"文件创建的 DataFrame 执行的计算示例:

For clarity, here's an example of the calculation I'd like to perform on the DataFrame created by reading in the "ledger" file:

from dateutil.relativedelta import relativedelta
from datetime import datetime
from pyspark.sql.functions import to_timestamp

# Read "ledger file"
df = spark.read.json("/path/to/ledger-filename.txt")

# Convert string ==> timestamp & sort
df = (df.withColumn("timestamp", to_timestamp(df.timestamp, 'yyyy-MM-dd HH:mm:ss'))).sort('timestamp')

columns_with_age = ("location", "status")
columns_without_age = ("wh_id")

# Get the most-recent values (from the last row of the df)
row_count = df.count()
last_row = df.collect()[row_count-1]

# Create an empty "final row" dictionary
final_row = {}

# For each column for which we want to calculate an age value ...
for c in columns_with_age:

    # Initialize loop values
    target_value = last_row.__getitem__(c)
    final_row[c] = target_value
    timestamp_at_lookback = last_row.__getitem__("timestamp")
    look_back = 1
    different = False

    while not different:
        previous_row = df.collect()[row_count - 1 - look_back]
        if previous_row.__getitem__(c) == target_value:
            timestamp_at_lookback = previous_row.__getitem__("timestamp")
            look_back += 1

        else:
            different = True

    # At this point, a difference has been found, so calculate the age
    final_row["days_in_{}".format(c)] = relativedelta(datetime.now(), timestamp_at_lookback).days

因此,像这样的分类帐:

Thus, a ledger like this:

+---------+------+-------------------+-----+
| location|status|          timestamp|wh_id|
+---------+------+-------------------+-----+
|  PUTAWAY|     I|2019-04-01 03:14:00|   20|
|PICKABLE1|     X|2019-04-01 04:24:00|   20|
|PICKABLE2|     X|2019-04-01 05:33:00|   20|
|PICKABLE2|     A|2019-04-01 06:42:00|   20|
|  HOTPICK|     A|2019-04-10 05:51:00|   20|
| ICEXCEPT|     A|2019-04-10 07:04:00|   20|
| ICEXCEPT|     X|2019-04-11 09:28:00|   20|
+---------+------+-------------------+-----+

会减少到(假设计算是在 2019-04-14 上运行的):

Would reduce to (assuming the calculation was run on 2019-04-14):

{ '_id': 'ledger-filename', 'location': 'ICEXCEPT', 'days_in_location': 4, 'status': 'X', 'days_in_status': 3, 'wh_id': 20 }

推荐答案

不推荐使用 wholeTextFiles,因为它会一次性将完整文件加载到内存中.如果您真的想为每个文件创建一个单独的数据框,您可以简单地使用完整路径而不是目录.但是,不建议这样做,并且很可能会导致资源利用率低下.相反,请考虑使用 input_file_path https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/functions.html#input_file_name--

Using wholeTextFiles is not recommended as it loads the full file into memory at once. If you really want to create an individual data frame per file, you can simply use the full path instead of a directory. However, this is not recommended and will most likely lead to poor resource utilisation. Instead, consider using input_file_path https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/functions.html#input_file_name--

例如:

spark
.read
  .textFile("path/to/files")
  .withColumn("file", input_file_name())
  .filter($"value" like "%a%")
  .groupBy($"file")
  .agg(count($"value"))
  .show(10, false)

+----------------------------+------------+
|file                        |count(value)|
+----------------------------+------------+
|path/to/files/1.txt         |2           |
|path/to/files/2.txt         |4           |
+----------------------------+------------+

所以文件可以单独处理,然后再合并.

so the files can be processed individually and then later combined.

这篇关于Spark:加载多个文件、单独分析、合并结果并保存的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆