生成自己的数据的Flink流传输示例 [英] Flink streaming example that generates its own data

查看:73
本文介绍了生成自己的数据的Flink流传输示例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

之前,我问过一个关于Flink的简单 hello world示例.这给了我一些很好的例子!

但是我想问一个更流式"的例子,我们每秒生成一个输入值.理想情况下,这将是随机的,但是每次即使是相同的值也没问题.

目标是使视频流在没有/没有最小外部触摸的情况下移动".

提出我的问题:

如何显示Flink实际在没有外部依赖性的情况下流式传输数据?

我发现了如何通过外部生成数据并写入Kafka或收听公共资源来展示这一点,但是我试图以最小的依赖度来解决它(例如从Nifi中的GenerateFlowFile开始).

解决方案

下面是一个示例.这是作为如何使源和接收器可插入的示例.这样的想法是,在开发中,您可以使用随机源并打印结果,对于测试,您可以使用输入事件的硬连接列表并将结果收集在列表中,而在生产中,您可以使用真实的源和接收器./p>

这是工作:

 <代码>/**显示如何使源和接收器可插入应用程序代码中的示例*您可以在测试中注入特殊的测试源和测试接收器.*/公共类TestableStreamingJob {私有SourceFunction< Long>资源;私有SinkFunction< Long>下沉;public TestableStreamingJob(SourceFunction< Long>源,SinkFunction< Long>接收器){this.source =源;this.sink =水槽;}公共无效execute()引发异常{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream< Long>长流=env.addSource(源).returns(TypeInformation.of(Long.class));长流.map(new IncrementMapFunction()).addSink(sink);env.execute();}公共静态void main(String [] args)引发异常{TestableStreamingJob作业=新的TestableStreamingJob(新的RandomLongSource(),新的PrintSinkFunction<>());job.execute();}//虽然很想尝试这种简单的方法,但请避免使用匿名类或lambda//对于您可能要进行单元测试的任何业务逻辑.公共类IncrementMapFunction实现MapFunction< Long,Long>.{@Override公共长地图(长记录)引发异常{返回记录+1;}}} 

这是 RandomLongSource :

 公共类RandomLongSource扩展了RichParallelSourceFunction< Long>.{private volatile布尔型canceled = false;私人随机随机@Overridepublic void open(配置参数)抛出异常{super.open(参数);random =新的Random();}@Overridepublic void run(SourceContext< Long> ctx)引发异常{同时(!已取消){长nextLong = random.nextLong();已同步(ctx.getCheckpointLock()){ctx.collect(nextLong);}}}@Override公共无效cancel(){已取消=真;}} 

Earlier I asked about a simple hello world example for Flink. This gave me some good examples!

However I would like to ask for a more ‘streaming’ example where we generate an input value every second. This would ideally be random, but even just the same value each time would be fine.

The objective is to get a stream that ‘moves’ with no/minimal external touch.

Hence my question:

How to show Flink actually streaming data without external dependencies?

I found how to show this with generating data externally and writing to Kafka, or listening to a public source, however I am trying to solve it with minimal dependence (like starting with GenerateFlowFile in Nifi).

解决方案

Here's an example. This was constructed as an example of how to make your sources and sinks pluggable. The idea being that in development you might use a random source and print the results, for tests you might use a hardwired list of input events and collect the results in a list, and in production you'd use the real sources and sinks.

Here's the job:

/*
 * Example showing how to make sources and sinks pluggable in your application code so
 * you can inject special test sources and test sinks in your tests.
 */

public class TestableStreamingJob {
    private SourceFunction<Long> source;
    private SinkFunction<Long> sink;

    public TestableStreamingJob(SourceFunction<Long> source, SinkFunction<Long> sink) {
        this.source = source;
        this.sink = sink;
    }

    public void execute() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Long> LongStream =
                env.addSource(source)
                        .returns(TypeInformation.of(Long.class));

        LongStream
                .map(new IncrementMapFunction())
                .addSink(sink);

        env.execute();
    }

    public static void main(String[] args) throws Exception {
        TestableStreamingJob job = new TestableStreamingJob(new RandomLongSource(), new PrintSinkFunction<>());
        job.execute();
    }

    // While it's tempting for something this simple, avoid using anonymous classes or lambdas
    // for any business logic you might want to unit test.
    public class IncrementMapFunction implements MapFunction<Long, Long> {

        @Override
        public Long map(Long record) throws Exception {
            return record + 1 ;
        }
    }

}

Here's the RandomLongSource:

public class RandomLongSource extends RichParallelSourceFunction<Long> {

    private volatile boolean cancelled = false;
    private Random random;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        random = new Random();
    }

    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        while (!cancelled) {
            Long nextLong = random.nextLong();
            synchronized (ctx.getCheckpointLock()) {
                ctx.collect(nextLong);
            }
        }
    }

    @Override
    public void cancel() {
        cancelled = true;
    }
}

这篇关于生成自己的数据的Flink流传输示例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆