Flink Kafka:类型应为PojoTypeInfo [英] Flink Kafka : Expecting type to be a PojoTypeInfo

查看:0
本文介绍了Flink Kafka:类型应为PojoTypeInfo的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的Customer类已使用maven-avro插件创建。当我尝试运行此程序时,收到的错误为Exception in thread "main" java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo

[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.example.Customer does not contain a setter for field first_name

<2-2]> 我正在使用Java 8

我从maven Avro插件创建的Customer类具有特定的记录类型

请帮帮我我已经花了5天时间解决这个问题

我尝试了3种不同的方法,我提到它们是方法1、方法2。下面

package com.example
import com.typesafe.config.ConfigException.Generic
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificRecord
import org.apache.flink.api.scala.createTypeInformation
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.flink.formats.avro.AvroDeserializationSchema
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema


import java.util.Properties
object flink_kafka_avro extends App  {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties
    properties.setProperty("bootstrap.servers", "127.0.0.1:9092")
    properties.put("group.id", "customer-consumer-group-v1")
    properties.put("auto.commit.enable", "false")
    properties.put("auto.offset.reset", "earliest")
    

    import org.apache.avro.Schema
    import org.apache.avro.reflect.ReflectData


    val schema = ReflectData.get.getSchema(classOf[Customer])
    // Method 1 not working
    //val ss = new FlinkKafkaConsumer[Customer]("customer-avro", AvroDeserializationSchema.forSpecific(classOf[Customer]),properties)
    val schemaRegistryUrl = "http://localhost:8081"
    //Method 2
    val userKafkaReaderResult = env.addSource(new FlinkKafkaConsumer[Customer]("customer-avro",
        ConfluentRegistryAvroDeserializationSchema.forSpecific(classOf[Customer],schemaRegistryUrl), properties).setStartFromEarliest())
    userKafkaReaderResult.print()
    //Method 3

    // I tried like this it is not working even

    //val strenew = FlinkKafkaConsumer[Customer]("test_topic", AvroDeserializationSchema.forSpecific(classOf[Customer]), properties).setStartFromEarliest
    //env.addSource(ss).print()
    env.execute()


}

我的POM文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <groupId>org.example</groupId>
    <artifactId>kafkaavrov1</artifactId>
    <version>1.0-SNAPSHOT</version>

    <modelVersion>4.0.0</modelVersion>



    <properties>
        <avro.version>1.8.2</avro.version>
        <kafka.version>0.11.0.1</kafka.version>
        <confluent.version>3.3.1</confluent.version>
    </properties>

    <!--necessary to resolve confluent dependencies-->
    <repositories>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven/</url>
        </repository>
    </repositories>


    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.13.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.13.2</version>

        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-confluent-registry -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro-confluent-registry</artifactId>
            <version>1.13.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro</artifactId>
            <version>1.13.2</version>
        </dependency>



        <!--Only dependency needed for the avro part-->
        <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>

        <!--dependencies needed for the kafka part-->
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.2.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.platform</groupId>
            <artifactId>junit-platform-runner</artifactId>
            <version>1.2.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-params</artifactId>
            <version>5.2.0</version>
            <scope>test</scope>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>dd
                    <target>1.8</target>
                </configuration>
            </plugin>



            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>${avro.version}</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
                            <stringType>String</stringType>
                            <createSetters>false</createSetters>
                            <enableDecimalLogicalType>true</enableDecimalLogicalType>
                            <fieldVisibility>private</fieldVisibility>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <id>add-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>target/generated-sources/avro</source>
                            </sources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


</project>

推荐答案

除非Customer扩展org.apache.avro.specific.SpecificRecordBase,否则Flink不会将其视为avro类型,并将尝试使用其POJO序列化程序对其进行序列化。如果失败(就像这里一样),它将退回到将其视为泛型类型,并将使用Kryo。

这篇关于Flink Kafka:类型应为PojoTypeInfo的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆