使用sparklyr :: spark_read_json时添加文件名 [英] Adding name of file when using sparklyr::spark_read_json

查看:96
本文介绍了使用sparklyr :: spark_read_json时添加文件名的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有数百万个json文件,其中每个文件包含相同数量的列,可以说 x y .请注意,单个文件的 x y 的长度相等,但是在比较两个不同的文件时可能会有所不同.

I have millions of json-files, where each of the files contains the same number of columns, lets say x and y. Note that the length of x and y is equal for a single file, but could be different when comparing two different files.

问题在于,分隔数据的唯一内容是文件名.因此,在合并文件时,我希望将文件名包含在第三列中.是否可以使用 sparklyr :: spark_read_json ,即使用通配符时?

The problem is that the only thing that separates the data is the name of the file. So when combining the files I'd like to have the name of the file included as a third column. Is this possible using sparklyr::spark_read_json, i.e. when using wildcards?

MWE:

library(sparklyr)

## Spark connection
sc <- spark_connect(master = "local", version = "2.1.0")

## Create data
data_dir <- tempdir()
tbl_json1 <- data.frame(x = 1:3, y = 1:3)
tbl_json2 <- data.frame(x = 1:10, y = 1:10)

## Write data to disk
write(jsonlite::toJSON(tbl_json1), sprintf("%s/tab1.json", data_dir))
write(jsonlite::toJSON(tbl_json2), sprintf("%s/tab2.json", data_dir))

## Read both files using wildcard 
combined_table <- spark_read_json(
    sc, 
    name = "combined_table", 
    path = sprintf("%s/*.json", data_dir)
)

## Tranfer results to R
library(dplyr)
dt <- combined_table %>% collect

# # A tibble: 13 x 2
#       x     y
#     <dbl> <dbl>
#  1    1.    1.
#  2    2.    2.
#  3    3.    3.
#  4    4.    4.
#  5    5.    5.
#  6    6.    6.
#  7    7.    7.
#  8    8.    8.
#  9    9.    9.
# 10   10.   10.
# 11    1.    1.
# 12    2.    2.
# 13    3.    3.

想要的输出

# # A tibble: 13 x 2
#       x     y     id
#     <dbl> <dbl> <chr>
#  1    1.    1.    tab2
#  2    2.    2.    tab2
#  3    3.    3.    tab2
#  4    4.    4.    tab2
#  5    5.    5.    tab2
#  6    6.    6.    tab2
#  7    7.    7.    tab2
#  8    8.    8.    tab2
#  9    9.    9.    tab2
# 10   10.   10.    tab2
# 11    1.    1.    tab1
# 12    2.    2.    tab1
# 13    3.    3.    tab1

推荐答案

您可以禁用快速缓存(无论如何,您还是应该这样做):

You can disable eager caching (you really should anyway):

combined_table <- spark_read_json(
  sc, 
  name = "combined_table", 
  path = sprintf("%s/*.json", data_dir),
  memory=FALSE
)

并使用

# Source:   lazy query [?? x 3]
# Database: spark_connection
       x     y id                              
   <dbl> <dbl> <chr>                           
 1     1     1 file:///tmp/RtmpnIAUek/tab2.json
 2     2     2 file:///tmp/RtmpnIAUek/tab2.json
 3     3     3 file:///tmp/RtmpnIAUek/tab2.json
 4     4     4 file:///tmp/RtmpnIAUek/tab2.json
 5     5     5 file:///tmp/RtmpnIAUek/tab2.json
 6     6     6 file:///tmp/RtmpnIAUek/tab2.json
 7     7     7 file:///tmp/RtmpnIAUek/tab2.json
 8     8     8 file:///tmp/RtmpnIAUek/tab2.json
 9     9     9 file:///tmp/RtmpnIAUek/tab2.json
10    10    10 file:///tmp/RtmpnIAUek/tab2.json
# ... with more rows

如果需要,它可以与 Hive的 parse_url > UDF :

If needed it can be combined with Hive's parse_url UDF:

combined_table %>% mutate(id = parse_url(input_file_name(), "FILE"))

# Source:   lazy query [?? x 3]
# Database: spark_connection
       x     y id                       
   <dbl> <dbl> <chr>                    
 1     1     1 /tmp/RtmpnIAUek/tab2.json
 2     2     2 /tmp/RtmpnIAUek/tab2.json
 3     3     3 /tmp/RtmpnIAUek/tab2.json
 4     4     4 /tmp/RtmpnIAUek/tab2.json
 5     5     5 /tmp/RtmpnIAUek/tab2.json
 6     6     6 /tmp/RtmpnIAUek/tab2.json
 7     7     7 /tmp/RtmpnIAUek/tab2.json
 8     8     8 /tmp/RtmpnIAUek/tab2.json
 9     9     9 /tmp/RtmpnIAUek/tab2.json
10    10    10 /tmp/RtmpnIAUek/tab2.json
# ... with more rows

,您可以使用其他字符串处理功能来提取单个信息位.

and you can use other string processing function to extract individual bits of information.

这篇关于使用sparklyr :: spark_read_json时添加文件名的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆