如何使用e剂流批量处理事件 [英] How to process events in batches with elixir flow

查看:95
本文介绍了如何使用e剂流批量处理事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个csv_file,其中
a。)首先,每行都需要转换为xml,
b。)第二,转换后的xml将被发送到rails端进行一些数据库写操作。 / p>

下面是我的流程代码。

  flow = csv_rows 
|> Flow.from_enumerable()
|> Flow.partition
|> Flow.map(&(CSV.generate_xml(& 1)))
|> Flow.map(&((CSV.save_to_rails_databse(& 1)))
|> Flow.run

对于小型csv文件,一切正常,但是当csv_file非常大时(假设有20,000条记录,那么执行第二个操作(即在rails侧写入数据库)试图同时插入两个多条记录,因为elixir同时向rails侧发送了太多请求,因此数据库是



最好以50个批次处理事件, min_demand max_demand 在这种情况下将很有用。

解决方案

您可以使用 Flow.map_state / 2 接收特定状态的整个状态(在您的情况下,因为您正在映射,所以状态将是该批次中的事件)。



您将想要在此处使用三个参数,都提供给from_enumerable:




  • m in_demand:这将是有效的批处理大小

  • max_demand:阶段之间可流通的最大行数

  • stages:并发数阶段处理数据。您的情况是同时处理多少批次



其他一些注意事项:




  • 您不需要分区,因为您没有进行任何分组

  • 请考虑使用 NimbleCSV 允许将CSV作为流使用-如果CSV太大,则有助于内存使用

  • 在此示例中,您可能根本不需要Flow, Task.asycn_stream / 3 应该足够



在我们开发Flow时,我们能够获得一些Flow课程并将其应用回Elixir。其中一堂课产生了 Task.async_stream / 3 ,当您想在没有缩减阶段的情况下映射集合时,该功能非常有用:

  batch_size = 100 

#8个任务同时运行,我们不在乎结果的顺序
async_options = [max_concurrency:8,有序:false]

csv_rows
|> Stream.chunk(batch_size)
|> Task.async_stream(fn批处理->
批处理
|> Enum.map(& CSV.generate_xml / 1)
|> CSV.save_to_batch_rails_database()
结尾,async_options)
|> Stream.run()

我没有测试代码,但是它应该提供足够的指导。它应该和Flow一样快,但没有额外的依赖关系。


I have a csv_file in which a.) first, each rows need to converted to xml and b.) second, converted xml will be send to rails side for some database write operation.

Below is my Flow code for the same.

flow = csv_rows
 |> Flow.from_enumerable()
 |> Flow.partition
 |> Flow.map(&(CSV.generate_xml(&1)))
 |> Flow.map(&(CSV.save_to_rails_databse(&1)))
 |> Flow.run

Everyting is working fine for the small csv file, but when the csv_file is very large(suppose 20,000) records, then performing the second operation(i.e writing to database on rails side) is trying to insert two many records at the same time, since elixir is sending too many request to the rails side at the same time, therefore database is reaching at its peak limit.

Will it be good to process the events in the batch of 50, and will the min_demand and max_demand will be useful in this case.

解决方案

You can use Flow.map_state/2 to receive the whole state for a particular state (in your case, since you are mapping, the state will be the events in that batch).

You will want to use three parameters here, all given to from_enumerable:

  • min_demand: this will be effectively the batch size
  • max_demand: the maximum of rows that will be in flux between stages
  • stages: the number of concurrent stages processing the data. In your case, how many batches being processed at the same time

A few other considerations:

  • You don't need partitioning, since you are not doing any grouping
  • Consider using NimbleCSV that allows CSV to be consumed as a stream - this helps with memory usage if the CSV is too large
  • You likely don't need Flow at all in this example, Task.asycn_stream/3 should suffice

When we worked on Flow, we were able to get some of Flow lessons and apply it back to Elixir. One of those lessons resulted in Task.async_stream/3, which is useful when you want to map over a collection without a reduce stage, exactly what you have:

batch_size = 100

# 8 tasks running at the same time and we don't care about the results order
async_options = [max_concurrency: 8, ordered: false]

csv_rows
|> Stream.chunk(batch_size)
|> Task.async_stream(fn batch -> 
  batch
  |> Enum.map(&CSV.generate_xml/1)
  |> CSV.save_to_batch_rails_database()
end, async_options)
|> Stream.run()

I haven't tested the code but it should provide enough guidance. It should be as fast as Flow but without an extra dependency.

这篇关于如何使用e剂流批量处理事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆