谁能在Scala中分享Flink Kafka示例? [英] Can anyone share a Flink Kafka example in Scala?
本文介绍了谁能在Scala中分享Flink Kafka示例?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
有人可以在Scala中分享Flink Kafka(主要是从Kafka接收消息)的工作示例吗?我知道有一个 KafkaWordCount 示例.我只需要在Flink中打印出Kafka消息即可.真的很有帮助.
Can anyone share a working example of Flink Kafka (mainly receiving messages from Kafka) in Scala? I know there is a KafkaWordCount example in Spark. I just need to print out Kafka message in Flink. It would be really helpful.
推荐答案
以下代码显示了如何使用Flink的Scala DataStream API从Kafka主题中读取内容:
The following code shows how to read from a Kafka topic using Flink's Scala DataStream API:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object Main {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val stream = env
.addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))
.print
env.execute("Flink Kafka Example")
}
}
这篇关于谁能在Scala中分享Flink Kafka示例?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文