如何在 Scala 中使用 Flink 的 KafkaSource? [英] How to use Flink's KafkaSource in Scala?

查看:29
本文介绍了如何在 Scala 中使用 Flink 的 KafkaSource?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Flink 的 KafkaSource 运行一个简单的测试程序.我正在使用以下内容:

I'm trying to run a simple test program with Flink's KafkaSource. I'm using the following:

  • Flink 0.9
  • Scala 2.10.4
  • 卡夫卡 0.8.2.1

我按照文档来测试 KafkaSource(添加依赖项,将 Kafka 连接器 flink-connector-kafka 捆绑在插件中),如 此处此处.

I followed the docs to test KafkaSource (added dependency, bundle the Kafka connector flink-connector-kafka in plugin) as described here and here.

下面是我的简单测试程序:

Below is my simple test program:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka

object TestKafka {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
     .print
  }
}

然而,编译总是抱怨找不到 KafkaSource:

However, compilation always complains KafkaSource not found:

[ERROR] TestKafka.scala:8: error: not found: type KafkaSource
[ERROR]     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))

我在这里想念什么?

推荐答案

我是 sbt 用户,所以我使用了以下 build.sbt:

I'm a sbt user so I used the following build.sbt:

organization := "pl.japila.kafka"
scalaVersion := "2.11.7"

libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}")
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1"

允许我运行程序:

import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.connectors.kafka
import org.apache.flink.streaming.connectors.kafka.api._
import org.apache.flink.streaming.util.serialization._

object TestKafka {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
     .print
  }
}

输出:

[kafka-flink]> run
[info] Running TestKafka
log4j:WARN No appenders could be found for logger (org.apache.flink.streaming.api.graph.StreamGraph).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[success] Total time: 0 s, completed Jul 15, 2015 9:29:31 AM

这篇关于如何在 Scala 中使用 Flink 的 KafkaSource?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆