使用数据流在 Google Cloud Platform 中加入两个 json [英] join two json in Google Cloud Platform with dataflow

查看:16
本文介绍了使用数据流在 Google Cloud Platform 中加入两个 json的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想从两个不同的 JSON 文件中仅找出女性员工,并仅选择我们感兴趣的字段并将输出写入另一个 JSON.

I want to find out only female employees out of the two different JSON files and select only the fields which we are interested in and write the output into another JSON.

我还尝试使用 Dataflow 在 Google 的云平台中实现它.有人可以提供任何可以实现以获得结果的示例Java代码吗.

Also I am trying to implement it in Google's cloud platform using Dataflow. Can someone please provide any sample Java code which can be implemented to get the result.

员工 JSON

{"emp_id":"OrgEmp#1","emp_name":"Adam","emp_dept":"OrgDept#1","emp_country":"USA","emp_gender":"female","emp_birth_year":"1980","emp_salary":"$100000"}
{"emp_id":"OrgEmp#1","emp_name":"Scott","emp_dept":"OrgDept#3","emp_country":"USA","emp_gender":"male","emp_birth_year":"1985","emp_salary":"$105000"}

部门 JSON

{"dept_id":"OrgDept#1","dept_name":"Account","dept_start_year":"1950"}
{"dept_id":"OrgDept#2","dept_name":"IT","dept_start_year":"1990"}
{"dept_id":"OrgDept#3","dept_name":"HR","dept_start_year":"1950"}

预期的输出 JSON 文件应该是这样的

{"emp_id":"OrgEmp#1","emp_name":"Adam","dept_name":"Account","emp_salary":"$100000"}

推荐答案

您可以使用 CoGroupByKey(将使用 shuffle 的地方)或使用辅助输入来做到这一点,如果您的部门集合明显较小.

You can do this using CoGroupByKey (where shuffle will be used), or using side inputs, if your departments collection is significantly smaller.

我会给你 Python 代码,但你可以在 Java 中使用相同的管道.

I will give you code in Python, but you can use the same pipeline in Java.

通过辅助输入,您将:

  1. 将您的部门 PCollection 转换为映射的字典dept_id 到部门 JSON 字典.

  1. Convert your departments PCollection into a dictionary that maps dept_id to the department JSON dictionary.

然后你把员工 PCollection 作为主要输入,您可以在其中使用 dept_id获取部门 PCollection 中每个部门的 JSON.

Then you take the employees PCollection as main input, where you can use the dept_id to get the JSON for each department in the departments PCollection.

像这样:

departments = (p | LoadDepts()
                 | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))

deps_si = beam.pvalue.AsDict(departments)

employees = (p | LoadEmps())

def join_emp_dept(employee, dept_dict):
  return employee.update(dept_dict[employee['dept_id']])

joined_dicts = employees | beam.Map(join_dicts, dept_dict=deps_si)

<小时>

使用CoGroupByKey,您可以使用dept_id 作为键对两个集合进行分组.这将产生一个键值对的 PCollection,其中键是 dept_id,值是部门和该部门员工的两个可迭代对象.


With CoGroupByKey, you can use dept_id as a key to group both collections. This will result in a PCollection of key-value pairs where the key is the dept_id, and the value are two iterables of the department, and the employees in that department.

departments = (p | LoadDepts()
               | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))

employees = (p | LoadEmps()
               | 'key_emp' >> beam.Map(lambda emp: (emp['dept_id'], emp)))

def join_lists((k, v)):
  itertools.product(v['employees'], v['departments'])

joined_dicts = (
    {'employees': employees, 'departments': departments} 
    | beam.CoGroupByKey()    
    | beam.FlatMap(join_lists)
    | 'mergedicts' >> beam.Map(lambda (emp_dict, dept_dict): emp_dict.update(dept_dict))
    | 'filterfields'>> beam.Map(filter_fields)
)

这篇关于使用数据流在 Google Cloud Platform 中加入两个 json的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆