从 Cassandra 读取数据以在 Flink 中处理 [英] Read data from Cassandra for processing in Flink

查看:31
本文介绍了从 Cassandra 读取数据以在 Flink 中处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我必须使用 Flink 作为流引擎来处理来自 Kafka 的数据流.为了对数据进行分析,我需要在 Cassandra 中查询一些表.做这个的最好方式是什么?我一直在 Scala 中寻找此类案例的示例.但是我找不到任何.如何使用Scala作为编程语言在Flink中读取来自Cassandra的数据?阅读&使用 apache flink Java API 将数据写入 cassandra 在同一行还有另一个问题.它在答案中提到了多种方法.我想知道在我的情况下最好的方法是什么.此外,大多数可用的示例都是用 Java 编写的.我正在寻找 Scala 示例.

I have to process data streams from Kafka using Flink as the streaming engine. To do the analysis on the data, I need to query some tables in Cassandra. What is the best way to do this? I have been looking for examples in Scala for such cases. But I couldn't find any.How can data from Cassandra be read in Flink using Scala as the programming language? Read & write data into cassandra using apache flink Java API has another question on the same lines. It has multiple approaches mentioned in the answers. I would like to know what is the best approach in my case. Also, most of the examples available are in Java. I am looking for Scala examples.

推荐答案

我目前在 flink 1.3 中使用 asyncIO 从 cassandra 读取.这是关于它的文档:

I currently read from cassandra using asyncIO in flink 1.3. Here is the documentation on it:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html(如果有 DatabaseClient,您将使用 com.datastax.drive.core.Cluster)

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html (where it has DatabaseClient, you will use the com.datastax.drive.core.Cluster instead)

如果您需要一个更深入的示例来使用它专门从 cassandra 读取数据,请告诉我,但不幸的是,我只能提供一个 Java 示例.

Let me know if you need a more in depth example for using it to read from cassandra specifically, but I unfortunately can only provide an example in java.

编辑 1

这是我使用 flink 的异步 I/O 从 Cassandra 读取的代码示例.我仍在努力识别和解决一个问题,由于某种原因(没有深入研究)单个查询返回的大量数据,异步数据流的超时被触发,即使它看起来被 Cassandra 很好地返回并且在超时时间之前.但假设这只是我正在做的其他事情的一个错误而不是因为这段代码,这对你来说应该可以正常工作(并且对我来说也可以正常工作数月):

Here is an example of the code I am using for reading from Cassandra with flink's Async I/O. I am still working on identifying and fixing an issue where for some reason (without going deep into it) for large amounts of data being returned by a single query, the async data stream's timeout is triggered even though it looks to be returned fine by Cassandra and well before the timeout time. But assuming that is just a bug with other stuff I am doing and not because of this code, this should work fine for you (and has worked fine for months for me as well):

public class GenericCassandraReader extends RichAsyncFunction<CustomInputObject, ResultSet> {

    private final Properties props;
    private Session client;

    public GenericCassandraReader(Properties props) {
        super();
        this.props = props;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        client = Cluster.builder()
                .addContactPoint(props.cassandraUrl)
                .withPort(props.cassandraPort)
                .build()
                .connect(props.cassandraKeyspace);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(final CustomInputObject customInputObject, final AsyncCollector<ResultSet> asyncCollector) throws Exception {

        String queryString = "select * from table where fieldToFilterBy='" + customInputObject.id() + "';";

        ListenableFuture<ResultSet> resultSetFuture = client.executeAsync(queryString);

        Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {

            public void onSuccess(ResultSet resultSet) {
                asyncCollector.collect(Collections.singleton(resultSet));
            }

            public void onFailure(Throwable t) {
                asyncCollector.collect(t);
            }
        });
    }
}

再次抱歉延迟.我希望能解决这个错误,这样我就可以确定,但此时我认为只有一些参考比没有好.

Again, sorry for the delay. Was hoping to have the bug resolved so I could be certain, but figured at this point just having some reference would be better than nothing.

编辑 2

所以我们最终确定问题不在于代码,而在于网络吞吐量.许多字节试图通过一个不足以处理它的管道,东西开始备份,一些开始涓涓细流,但是(感谢 datastax cassandra 驱动程序的 QueryLogger 我们可以看到这一点)接收结果所花费的时间每个查询开始攀升到 4 秒,然后是 6 秒,然后是 8 秒,依此类推.

So we came to finally determine that the issue isn't with the code, but with the network throughput. Lot of bytes trying to come through a pipe that isn't large enough to handle it, stuff starts backing up, some start trickling in but (thanks to datastax cassandra driver's QueryLogger we could see this) the time it took to receive the result of each query started climbing to 4 seconds, then 6, then 8 and so on.

TL;DR,代码没问题,请注意,如果您遇到 Flink 的 asyncWaitOperator 的 timeoutExceptions,则可能是网络问题.

编辑 2.5

还意识到,由于网络延迟问题,我们最终转向使用 RichMapFunction 来保存我们从 cassandra 中读取的数据,这可能是有益的.因此,该作业只跟踪通过它的所有记录,而不必在每次有新记录通过时从表中读取以获取其中的所有记录.

Also realized that it might be beneficial to mention that because of the network latency issue, we ended up moving to using a RichMapFunction that holds the data we were reading from cassandra in state. So the job just keeps track of all the records that come through it instead of having to read from the table each time a new record comes through to get all that are in there.

这篇关于从 Cassandra 读取数据以在 Flink 中处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆