如何在小捆绑的流式管道中按 N 个元素进行批处理? [英] How to Batch By N Elements in Streaming Pipeline With Small Bundles?
问题描述
我已经按照这个答案中的描述实现了 N 个元素的批处理:谷歌数据流管道中的数据存储输入是否可以一次处理 N 个条目的批次?
<前><代码>包 com.example.dataflow.transform;导入 com.example.dataflow.event.ClickEvent;导入 org.apache.beam.sdk.transforms.DoFn;导入 org.apache.beam.sdk.transforms.windowing.GlobalWindow;导入 org.joda.time.Instant;导入 java.util.ArrayList;导入 java.util.List;公共类 ClickToClicksPack 扩展了 DoFn> {公共静态最终 int BATCH_SIZE = 10;私人列表累加器;@StartBundle公共无效 startBundle() {累加器 = 新 ArrayList(BATCH_SIZE);}@ProcessElementpublic void processElement(ProcessContext c) {ClickEvent clickEvent = c.element();accumulator.add(clickEvent);如果(累加器.大小()> = BATCH_SIZE){c.输出(累加器);累加器 = 新 ArrayList(BATCH_SIZE);}}@FinishBundle公共无效finishBundle(FinishBundleContext c){如果(累加器大小()> 0){ClickEvent clickEvent = accumulator.get(0);长时间 = clickEvent.getClickTimestamp().getTime();c.output(accumulator, new Instant(time), GlobalWindow.INSTANCE);}}}但是当我在流模式下运行管道时,有很多批次只有 1 或 2 个元素.据我了解,这是因为小包的大小.运行一天后,批处理元素的平均数量大约为 4.我真的需要它更接近 10,以便更好地执行后续步骤.
有没有办法控制捆绑包的大小?或者我应该为此目的使用GroupIntoBatches"转换.在这种情况下,我不清楚应该选择什么作为键.
更新:使用 java 线程 ID 或 VM 主机名作为应用GroupIntoBatches"转换的键是个好主意吗?
我最终在内部使用GroupIntoBatches"进行复合变换.以下答案包含有关密钥选择的建议:https://stackoverflow.com/a/44956702/4888849
在我当前的实现中,我使用随机键来实现并行性,并且我正在窗口化事件以便定期发出结果,即使一个键的事件少于 BATCH_SIZE 也是如此.
<前><代码>包 com.example.dataflow.transform;导入 com.example.dataflow.event.ClickEvent;导入 org.apache.beam.sdk.transforms.DoFn;导入 org.apache.beam.sdk.transforms.GroupIntoBatches;导入 org.apache.beam.sdk.transforms.PTransform;导入 org.apache.beam.sdk.transforms.ParDo;导入 org.apache.beam.sdk.transforms.windowing.FixedWindows;导入 org.apache.beam.sdk.transforms.windowing.Window;导入 org.apache.beam.sdk.values.KV;导入 org.apache.beam.sdk.values.PCollection;导入 org.joda.time.Duration;导入 java.util.Random;/*** 批量点击成 BATCH_SIZE 大小的包*/公共类 ClickToClicksPack 扩展了 PTransform、PCollection>> {公共静态最终 int BATCH_SIZE = 10;//定义窗口持续时间.//在窗口结束后 - 即使元素少于 BATCH_SIZE 元素也会被发射public static final int WINDOW_DURATION_SECONDS = 1;私有静态最终 int DEFAULT_SHARDS_NUMBER = 20;//确定可能的并行级别私有 int shardsNumber = DEFAULT_SHARDS_NUMBER;公共 ClickToClicksPack() {极好的();}公共 ClickToClicksPack(int shardsNumber) {极好的();this.shardsNumber = shardsNumber;}@覆盖公共 PCollection> 展开(PCollection 输入){返回输入//分配键,因为GroupIntoBatches"仅适用于键值对.apply(ParDo.of(new AssignRandomKeys(shardsNumber))).apply(Window.into(FixedWindows.of(Duration.standardSeconds(WINDOW_DURATION_SECONDS)))).apply(GroupIntoBatches.ofSize(BATCH_SIZE)).apply(ParDo.of(new ExtractValues()));}/*** 分配给点击次数介于零和 shardsNumber 之间的随机整数*/私有静态类 AssignRandomKeys 扩展了 DoFn> {私人 int shardsNumber;private Random 随机;AssignRandomKeys(int shardsNumber) {极好的();this.shardsNumber = shardsNumber;}@设置公共无效设置(){随机=新随机();}@ProcessElementpublic void processElement(ProcessContext c) {ClickEvent clickEvent = c.element();KV kv = KV.of(random.nextInt(shardsNumber), clickEvent);c.输出(千伏);}}/*** 从 KV 中提取值*/私有静态类 ExtractValues extends DoFn>, Iterable> {@ProcessElementpublic void processElement(ProcessContext c) {kv> kv = c.element();c.output(kv.getValue());}}}I've implemented batching by N elements as described in this answer: Can datastore input in google dataflow pipeline be processed in a batch of N entries at a time?
package com.example.dataflow.transform;
import com.example.dataflow.event.ClickEvent;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.joda.time.Instant;
import java.util.ArrayList;
import java.util.List;
public class ClickToClicksPack extends DoFn> {
public static final int BATCH_SIZE = 10;
private List accumulator;
@StartBundle
public void startBundle() {
accumulator = new ArrayList(BATCH_SIZE);
}
@ProcessElement
public void processElement(ProcessContext c) {
ClickEvent clickEvent = c.element();
accumulator.add(clickEvent);
if (accumulator.size() >= BATCH_SIZE) {
c.output(accumulator);
accumulator = new ArrayList(BATCH_SIZE);
}
}
@FinishBundle
public void finishBundle(FinishBundleContext c) {
if (accumulator.size() > 0) {
ClickEvent clickEvent = accumulator.get(0);
long time = clickEvent.getClickTimestamp().getTime();
c.output(accumulator, new Instant(time), GlobalWindow.INSTANCE);
}
}
}
But when I run pipeline in streaming mode there are a lot of batches with just 1 or 2 elements. As I understand it's because of small bundles size. After running for a day average number of elements in batch is roughly 4. I really need it to be closer to 10 for better performance of the next steps.
Is there a way to control bundles size? Or should I use "GroupIntoBatches" transform for this purpose. In this case it's not clear for me, what should be selected as a key.
UPDATE: is it a good idea to use java thread id or VM hostname for a key to apply "GroupIntoBatches" transform?
I've ended up doing composite transform with "GroupIntoBatches" inside. The following answer contains recommendations regarding key selection: https://stackoverflow.com/a/44956702/4888849
In my current implementation I'm using random keys to achieve parallelism and I'm windowing events in order to emit results regularly even if there are less then BATCH_SIZE events by one key.
package com.example.dataflow.transform;
import com.example.dataflow.event.ClickEvent;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import java.util.Random;
/**
* Batch clicks into packs of BATCH_SIZE size
*/
public class ClickToClicksPack extends PTransform, PCollection>> {
public static final int BATCH_SIZE = 10;
// Define window duration.
// After window's end - elements are emitted even if there are less then BATCH_SIZE elements
public static final int WINDOW_DURATION_SECONDS = 1;
private static final int DEFAULT_SHARDS_NUMBER = 20;
// Determine possible parallelism level
private int shardsNumber = DEFAULT_SHARDS_NUMBER;
public ClickToClicksPack() {
super();
}
public ClickToClicksPack(int shardsNumber) {
super();
this.shardsNumber = shardsNumber;
}
@Override
public PCollection> expand(PCollection input) {
return input
// assign keys, as "GroupIntoBatches" works only with key-value pairs
.apply(ParDo.of(new AssignRandomKeys(shardsNumber)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(WINDOW_DURATION_SECONDS))))
.apply(GroupIntoBatches.ofSize(BATCH_SIZE))
.apply(ParDo.of(new ExtractValues()));
}
/**
* Assigns to clicks random integer between zero and shardsNumber
*/
private static class AssignRandomKeys extends DoFn> {
private int shardsNumber;
private Random random;
AssignRandomKeys(int shardsNumber) {
super();
this.shardsNumber = shardsNumber;
}
@Setup
public void setup() {
random = new Random();
}
@ProcessElement
public void processElement(ProcessContext c) {
ClickEvent clickEvent = c.element();
KV kv = KV.of(random.nextInt(shardsNumber), clickEvent);
c.output(kv);
}
}
/**
* Extract values from KV
*/
private static class ExtractValues extends DoFn>, Iterable> {
@ProcessElement
public void processElement(ProcessContext c) {
KV> kv = c.element();
c.output(kv.getValue());
}
}
}
这篇关于如何在小捆绑的流式管道中按 N 个元素进行批处理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!