在Kafka集群节点之间分配数据套接字 [英] Distributing data socket among kafka cluster nodes

查看:106
本文介绍了在Kafka集群节点之间分配数据套接字的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想从套接字获取数据并将其放入kafka主题,以便我的flink程序可以从该主题读取数据并对其进行处理.我可以在一个节点上做到这一点.但是我想拥有一个至少有三个不同节点(不同的IP地址)的kafka集群,并从套接字轮询数据以在节点之间分配它.我不知道该怎么做并更改此代码.我的简单程序如下:

I want to get data from socket and put it to kafka topic that my flink program can read data from topic and process it. I can do that on one node. But I want to have a kafka cluster with at least three different nodes(different IP address) and poll data from socket to distribute it among nodes.I do not know how to do this and change this code. My simple program is in following:

public class WordCount {

   public static void main(String[] args) throws Exception {

    kafka_test objKafka=new kafka_test();
  // Checking input parameters
    final ParameterTool params = ParameterTool.fromArgs(args);
    int myport = 9999;
    String hostname = "localhost";
 // set up the execution environment
    final StreamExecutionEnvironment env = 
  StreamExecutionEnvironment.getExecutionEnvironment();


 // make parameters available in the web interface
    env.getConfig().setGlobalJobParameters(params);

    DataStream<String> stream = env.socketTextStream(hostname,myport);

    stream.addSink(objKafka.createStringProducer("testFlink", 
    "localhost:9092"));

    DataStream<String> text = 
    env.addSource(objKafka.createStringConsumerForTopic("testFlink", 
    "localhost:9092", "test"));
    DataStream<Tuple2<String, Long>> counts = text
     .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                @Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) 
   {
          // normalize and split the line
             String[] words = value.toLowerCase().split("\\W+");

                    // emit the pairs
             for (String word : words) {
                  if (!word.isEmpty()) {
                     out.collect(new Tuple2<String, Long>(word, 1L));
                        }
                    }
                }
            })
            .keyBy(0)
            .timeWindow(Time.seconds(5))
            .sum(1);
          // emit result
        if (params.has("output")) {
           counts.writeAsText(params.get("output"));
          } else {
          System.out.println("Printing result to stdout. Use --output 
          to specify output path.");
          counts.print();
         }
    // execute program
    env.execute("Streaming WordCount");

    }//main
   }

  public class kafka_test {
  public FlinkKafkaConsumer<String> createStringConsumerForTopic(
        String topic, String kafkaAddress, String kafkaGroup) {
  //        ************************** KAFKA Properties ******        
     Properties props = new Properties();
    props.setProperty("bootstrap.servers", kafkaAddress);
    props.setProperty("group.id", kafkaGroup);
    FlinkKafkaConsumer<String> myconsumer = new FlinkKafkaConsumer<>(
            topic, new SimpleStringSchema(), props);
    myconsumer.setStartFromLatest();     

    return myconsumer;
  }

  public FlinkKafkaProducer<String> createStringProducer(
        String topic, String kafkaAddress) {

        return new FlinkKafkaProducer<>(kafkaAddress,
            topic, new SimpleStringSchema());
     }
  }

请您指导我如何在不同的kafka节点之间广播套接字流数据?

Would you please guide me how to broadcast a socket stream data between different kafka nodes?

任何帮助将不胜感激.

Any help would be appreciated.

推荐答案

我认为您的代码是正确的. Kafka将负责数据的分发".如何在Kafka经纪人之间分配数据将取决于主题配置.

I think your code is correct. Kafka will take care of the "distribution" of the data. How data will be distributed among Kafka brokers will depend on the topic configuration.

此处中查看答案,以更好地了解Kafka主题和分区.

Check the answer here to better understand Kafka topics and partitions.

假设您有3位Kafka经纪人.然后,如果您创建具有3个副本和3个分区的主题

Lets say you have 3 Kafka brokers. Then if you create your topic with 3 replicas and 3 partitions

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-topic

这将导致您的主题具有3个分区,并且每个分区将在您的集群中存储3次.使用3个代理,您将在每个代理上存储1个分区和2个副本.

This will cause that your topic will have 3 partitions and each partition will be stored 3 times in your cluster. With 3 brokers you will get stored 1 partition and 2 replicas on each broker.

然后,您只需创建您的Kafka水槽

Then you just have to create your Kafka Sink

FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
        "broker1:9092,broker2:9092,broker3:9092",
        "my-topic",
        new SimpleStringSchema());

stream.addSink(myProducer);

这篇关于在Kafka集群节点之间分配数据套接字的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆