Apache Flink:如何从 Cassandra 读取数据流/数据集? [英] Apache Flink: How can I read a DataStream/DataSet from Cassandra?

查看:30
本文介绍了Apache Flink:如何从 Cassandra 读取数据流/数据集?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试使用以下链接中提供的信息将 Cassandra 视为 Flink 中的数据源:

I tried to treat Cassandra as the source of data in Flink with the information provided in the following links:

我在运行任务时遇到了 AsyncWaitOperator 异常.根据第一个链接,出现这个异常是因为网络问题.然而,奇怪的是我在本地 VM 上运行 Cassandra,目标表中只有 10 行数据.

I got the AsyncWaitOperator exception when I run the task. According the the first link, this exception occurs due to network problem. However, the strange thing is that I am running Cassandra on my local VM with only 10 rows of data in the target table.

@Jicaar 在第一个链接中也提到从 RichAsyncFunction 切换到 RichMapFunction 可以避免 AsyncWaitOperator 异常,有类似经验的人可以分享如何在 RichMapFunction 中完成?

@Jicaar in first link also mentions that switching from RichAsyncFunction to RichMapFunction can avoid the AsyncWaitOperator exception, can someone with similar experience share how to do it in RichMapFunction?

AsyncWaitOperator 异常跟踪 -->

02:21:00.164 [AsyncIO-Emitter-Thread (Source: Custom Source -> async wait operator -> (Flat Map, Sink: Unnamed) (1/1))] INFO  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> async wait operator -> (Flat Map, Sink: Unnamed) (1/1) (2809cef511194e612b2cc65510f78c64) switched from RUNNING to FAILED.
java.lang.Exception: An async function call terminated with an exception. Failing the AsyncWaitOperator.
  at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137) [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85) [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:611) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:572) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:133) [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  ... 2 common frames omitted
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader)
contextClassLoader (java.lang.Thread)
threads (java.lang.ThreadGroup)
groups (java.lang.ThreadGroup)
threadGroup (io.netty.util.concurrent.DefaultThreadFactory)
val$backingThreadFactory (com.google.common.util.concurrent.ThreadFactoryBuilder$1)
threadFactory (java.util.concurrent.ThreadPoolExecutor)
delegate (com.google.common.util.concurrent.MoreExecutors$ListeningDecorator)
blockingExecutor (com.datastax.driver.core.Cluster$Manager)
manager (com.datastax.driver.core.Host)
triedHosts (com.datastax.driver.core.ExecutionInfo)
info (com.datastax.driver.core.ArrayBackedResultSet$SinglePage)
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:82) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505) ~[kryo-2.24.0.jar:na]
  at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:182) ~[flink-core-1.4.2.jar:1.4.2]
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  ... 10 common frames omitted
Caused by: java.util.ConcurrentModificationException: null
  at java.util.Vector$Itr.checkForComodification(Vector.java:1184) ~[na:1.8.0_60]
  at java.util.Vector$Itr.next(Vector.java:1137) ~[na:1.8.0_60]
  at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
  ... 68 common frames omitted

推荐答案

以下代码应该适用于从 Cassandra 读取以在 Flink 中进行批处理.

The below code should work for reading from Cassandra for batch processing in Flink.

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

ClusterBuilder clusterBuilder = new ClusterBuilder() {

        @Override
        public Cluster buildCluster(Cluster.Builder builder) {

            return builder.addContactPoint(<cassandraHost>))
                  .withPort(9042)
                  .withCredentials(<cassandraUserName>,<cassandraPassword>)
                  .build();
        } 
    };  

DataSet<Tuple3<String,String,String>> inputRecords = env
            .createInput    
            (new CassandraInputFormat<Tuple3<String,String,String>>(<select query>,clusterBuilder)          
            ,TupleTypeInfo.of(new TypeHint<Tuple3<String,String,String>>() {}));    

DataSet 的数据类型(示例中的 Tuple3 由三个字符串组成)将根据选择查询返回的字段类型和数量而有所不同.

The data type of the DataSet (Tuple3 consisting of three strings in the example) will vary according to the type and number of fields returned by your select query.

这篇关于Apache Flink:如何从 Cassandra 读取数据流/数据集?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆