生产者程序中的 kafka 网络处理器错误(ArrayIndexOutOfBoundsException:18) [英] kafka network Processor error in producer program(ArrayIndexOutOfBoundsException: 18)

查看:28
本文介绍了生产者程序中的 kafka 网络处理器错误(ArrayIndexOutOfBoundsException:18)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下 kafka 制作人 Api 程序,我是 kafka 本身的新手.下面的代码从 API 之一获取数据并将消息发送到 kafka 主题.

I have below kafka producer Api program and i am new to kafka itself. Below code fetch data from one of API and send message to kafka topic.

package kafka_Demo;

import java.util.Properties;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import org.apache.kafka.clients.producer.*;
import java.net.URL;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class HttpBasicAuth {

    public static void main(String[] args) {
        try {

            Properties props = new Properties();
             props.put("bootstrap.servers", "localhost:9092");
             props.put("zookeeper.connect", "localhost:2181");
             props.put("batch.size", 16384);
             props.put("linger.ms", 1);
             props.put("buffer.memory", 33554432);
             props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
             props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

            Producer<String, String> producer = new KafkaProducer<>(props);
            Json_read count = new Json_read();
            URL url = new URL("https://alm.sysbiz.org/rest/api/2/search?jql=project=ALG&maxResults=0");
            long total_ticket = count.ticketCount(url);
            Alm_authentication alm = new Alm_authentication();
            for (long i = 0; i <=total_ticket ; i = i + 20) {
                url = new URL("https://alm.sysbiz.org/rest/api/2/search?jql=project=ALG&expand=changelog&startAt=" + i+"&maxResults=20");
                InputStream content = (InputStream) alm.performAuth(url);
                if (content != null) {
                    BufferedReader in = new BufferedReader(new InputStreamReader(content));
                    String line;
                    while ((line = in.readLine()) != null) {
                        //writer.println(line);
                        // System.out.println(line);
                        producer.send(new ProducerRecord<String, String>("test", "a11", line)).get();
                    }               
                }
                producer.send(new ProducerRecord<String, String>("test", "a11", "\n"));
                content.close();

            }

         producer.close();
        } catch (Exception e) {
            e.printStackTrace();
        }


    }

}

当我运行上面的代码时,我在 kafka 代理处遇到错误.

While i am running above code i am getting below error at kafka broker.

ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.ArrayIndexOutOfBoundsException: 18
    at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:68)
    at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39)
    at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:79)
    at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:426)
    at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:421)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at kafka.network.Processor.run(SocketServer.scala:421)
    at java.lang.Thread.run(Thread.java:748)

我通过在 Google 中搜索进行了很多尝试,但都没有成功.谁能帮我解决一下.下面的错误我可以在我的 Eclipse IDE 中看到.

I have tried a lot by searching in Google but no luck. Can any one help me to resolve it. And below error i can see it in my eclipse IDE.

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
    at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1124)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:823)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:648)
    at kafka_Demo.HttpBasicAuth.main(HttpBasicAuth.java:40)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.  

推荐答案

出现这种情况是因为 kafka 版本不匹配.确保 kafka-clients jar 版本 &您机器上安装的 kafka 服务器版本.

This happens due to kafka version mismatch. Make sure kafka-clients jar version & kafka server version installed in your machine.

这篇关于生产者程序中的 kafka 网络处理器错误(ArrayIndexOutOfBoundsException:18)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆