生成自己数据的 Flink 流示例 [英] Flink streaming example that generates its own data

查看:35
本文介绍了生成自己数据的 Flink 流示例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

之前我询问了 Flink 的一个简单的hello world 示例.这给了我一些很好的例子!

Earlier I asked about a simple hello world example for Flink. This gave me some good examples!

然而,我想要一个更流"的例子,我们每秒生成一个输入值.理想情况下,这应该是随机的,但即使每次都是相同的值也可以.

However I would like to ask for a more ‘streaming’ example where we generate an input value every second. This would ideally be random, but even just the same value each time would be fine.

目标是在没有/最少外部接触的情况下获得移动"的流.

The objective is to get a stream that ‘moves’ with no/minimal external touch.

因此我的问题是:

我找到了如何通过在外部生成数据并写入 Kafka 或收听公共资源来展示这一点,但是我试图以最小的依赖性来解决它(例如从 Nifi 中的 GenerateFlowFile 开始).

I found how to show this with generating data externally and writing to Kafka, or listening to a public source, however I am trying to solve it with minimal dependence (like starting with GenerateFlowFile in Nifi).

推荐答案

这是一个例子.这是作为如何使您的源和接收器可插入的示例而构建的.这个想法是,在开发中,您可以使用随机源并打印结果,对于测试,您可以使用输入事件的硬连线列表并将结果收集在列表中,而在生产中,您将使用真实的源和接收器.

Here's an example. This was constructed as an example of how to make your sources and sinks pluggable. The idea being that in development you might use a random source and print the results, for tests you might use a hardwired list of input events and collect the results in a list, and in production you'd use the real sources and sinks.

这是工作:

/*
 * Example showing how to make sources and sinks pluggable in your application code so
 * you can inject special test sources and test sinks in your tests.
 */

public class TestableStreamingJob {
    private SourceFunction<Long> source;
    private SinkFunction<Long> sink;

    public TestableStreamingJob(SourceFunction<Long> source, SinkFunction<Long> sink) {
        this.source = source;
        this.sink = sink;
    }

    public void execute() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Long> LongStream =
                env.addSource(source)
                        .returns(TypeInformation.of(Long.class));

        LongStream
                .map(new IncrementMapFunction())
                .addSink(sink);

        env.execute();
    }

    public static void main(String[] args) throws Exception {
        TestableStreamingJob job = new TestableStreamingJob(new RandomLongSource(), new PrintSinkFunction<>());
        job.execute();
    }

    // While it's tempting for something this simple, avoid using anonymous classes or lambdas
    // for any business logic you might want to unit test.
    public class IncrementMapFunction implements MapFunction<Long, Long> {

        @Override
        public Long map(Long record) throws Exception {
            return record + 1 ;
        }
    }

}

这是RandomLongSource:

public class RandomLongSource extends RichParallelSourceFunction<Long> {

    private volatile boolean cancelled = false;
    private Random random;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        random = new Random();
    }

    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        while (!cancelled) {
            Long nextLong = random.nextLong();
            synchronized (ctx.getCheckpointLock()) {
                ctx.collect(nextLong);
            }
        }
    }

    @Override
    public void cancel() {
        cancelled = true;
    }
}

这篇关于生成自己数据的 Flink 流示例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆