Kafka 流过滤问题 [英] Issue with Kafka stream filtering

查看:24
本文介绍了Kafka 流过滤问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试运行以下示例中的基本应用程序:

https://github.com/confluentinc/examples/blob/3.3.x/kafka-streams/src/main/scala/io/confluent/examples/streams/MapFunctionScalaExample.scala>

但是我在这一行遇到了一个例外:

//变体 1:使用 `mapValues`val uppercasedWithMapValues: KStream[Array[Byte], String] = textLines.mapValues(_.toUpperCase())

<块引用>

错误:(33, 25) 缺少扩展函数的参数类型 ((x$1) =>x$1.toUpperCase())textLines.mapValues(_.toUpperCase())

如果将光标悬停在代码上,则会出现错误:

<块引用>

类型不匹配,预期:ValueMapper[_>: String, _ <: NotInferedVR],实际:(Any) => Any 无法将符号解析为大写

我的 sbt 文件的内容:

name := "untitled1"版本:=0.1"ScalaVersion := "2.11.11"//https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.11.0.0"//https://mvnrepository.com/artifact/org.apache.kafka/kafka-clientslibraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"//https://mvnrepository.com/artifact/org.apache.kafka/kafka-streamslibraryDependencies += "org.apache.kafka" % "kafka-streams" % "0.11.0.0"//https://mvnrepository.com/artifact/org.apache.kafka/connect-apilibraryDependencies += "org.apache.kafka" % "connect-api" % "0.11.0.0"

我真的不知道如何继续,因为我对 Scala 很陌生.我想知道是什么问题以及如何解决.

解决方案

来自 http://docs.confluent.io/current/streams/faq.html#scala-compile-error-无类型参数-java-defined-trait-is-invariant-in-type-t

<块引用>

这个问题的根本原因是 Scala-Java 互操作性——Kafka Streams API 是用 Java 实现的,但您的应用程序是用 Scala 编写的.值得注意的是,这个问题是由 Java 和 Scala 的类型系统如何交互引起的.例如,Java 中的通用通配符通常会导致此类 Scala 问题.

要解决此问题,您需要在 Scala 应用程序中显式声明类型,以便代码编译.例如,您可能需要将将多个 DSL 操作链接到多个语句的单个语句分解,其中每个语句显式声明各自的返回类型.StreamToTableJoinScalaIntegrationTest 演示了如何显式声明返回变量的类型.

更新

Kafka 2.0(将于 6 月发布)包含一个适当的 Scala API,可以避免这些问题.比较 https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams

I'm trying to run a basic app from the following example:

https://github.com/confluentinc/examples/blob/3.3.x/kafka-streams/src/main/scala/io/confluent/examples/streams/MapFunctionScalaExample.scala

However I'm getting an exception at this line:

// Variant 1: using `mapValues`
val uppercasedWithMapValues: KStream[Array[Byte], String] = textLines.mapValues(_.toUpperCase())

Error:(33, 25) missing parameter type for expanded function ((x$1) => x$1.toUpperCase()) textLines.mapValues(_.toUpperCase())

Error I'm getting if I hover cursor over the code:

Type mismatch, expected: ValueMapper[_ >: String, _ <: NotInferedVR], actual: (Any) => Any Cannot resolve symbol toUpperCase

Contents of my sbt file:

name := "untitled1"

version := "0.1"

scalaVersion := "2.11.11"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.11.0.0"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams
libraryDependencies += "org.apache.kafka" % "kafka-streams" % "0.11.0.0"

// https://mvnrepository.com/artifact/org.apache.kafka/connect-api
libraryDependencies += "org.apache.kafka" % "connect-api" % "0.11.0.0"

I'm really not sure how to proceed with that as I'm quite new to Scala. I'd like to know what's the issue and how to fix it.

解决方案

From http://docs.confluent.io/current/streams/faq.html#scala-compile-error-no-type-parameter-java-defined-trait-is-invariant-in-type-t

The root cause of this problem is Scala-Java interoperability – the Kafka Streams API is implemented in Java, but your application is written in Scala. Notably, this problem is caused by how the type systems of Java and Scala interact. Generic wildcards in Java, for example, are often causing such Scala issues.

To fix the problem you would need to declare types explicitly in your Scala application in order for the code to compile. For example, you may need to break a single statement that chains multiple DSL operations into multiple statements, where each statement explicitly declares the respective return types. The StreamToTableJoinScalaIntegrationTest demonstrates how the types of return variables are explicitly declared.

Update

Kafka 2.0 (will be released in June) contains a proper Scala API that avoid those issues. Compare https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams

这篇关于Kafka 流过滤问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆