在kafka集群节点间分配数据socket [英] Distributing data socket among kafka cluster nodes

查看:32
本文介绍了在kafka集群节点间分配数据socket的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想从socket中获取数据并放到kafka topic中,这样我的flink程序就可以从topic中读取数据并进行处理.我可以在一个节点上做到这一点.但是我想要一个 kafka 集群,它至少有三个不同的节点(不同的 IP 地址)并从套接字轮询数据以在节点之间分配它.我不知道如何做到这一点并更改此代码.我的简单程序如下:

I want to get data from socket and put it to kafka topic that my flink program can read data from topic and process it. I can do that on one node. But I want to have a kafka cluster with at least three different nodes(different IP address) and poll data from socket to distribute it among nodes.I do not know how to do this and change this code. My simple program is in following:

public class WordCount {

   public static void main(String[] args) throws Exception {

    kafka_test objKafka=new kafka_test();
  // Checking input parameters
    final ParameterTool params = ParameterTool.fromArgs(args);
    int myport = 9999;
    String hostname = "localhost";
 // set up the execution environment
    final StreamExecutionEnvironment env = 
  StreamExecutionEnvironment.getExecutionEnvironment();


 // make parameters available in the web interface
    env.getConfig().setGlobalJobParameters(params);

    DataStream<String> stream = env.socketTextStream(hostname,myport);

    stream.addSink(objKafka.createStringProducer("testFlink", 
    "localhost:9092"));

    DataStream<String> text = 
    env.addSource(objKafka.createStringConsumerForTopic("testFlink", 
    "localhost:9092", "test"));
    DataStream<Tuple2<String, Long>> counts = text
     .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                @Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) 
   {
          // normalize and split the line
             String[] words = value.toLowerCase().split("\\W+");

                    // emit the pairs
             for (String word : words) {
                  if (!word.isEmpty()) {
                     out.collect(new Tuple2<String, Long>(word, 1L));
                        }
                    }
                }
            })
            .keyBy(0)
            .timeWindow(Time.seconds(5))
            .sum(1);
          // emit result
        if (params.has("output")) {
           counts.writeAsText(params.get("output"));
          } else {
          System.out.println("Printing result to stdout. Use --output 
          to specify output path.");
          counts.print();
         }
    // execute program
    env.execute("Streaming WordCount");

    }//main
   }

  public class kafka_test {
  public FlinkKafkaConsumer<String> createStringConsumerForTopic(
        String topic, String kafkaAddress, String kafkaGroup) {
  //        ************************** KAFKA Properties ******        
     Properties props = new Properties();
    props.setProperty("bootstrap.servers", kafkaAddress);
    props.setProperty("group.id", kafkaGroup);
    FlinkKafkaConsumer<String> myconsumer = new FlinkKafkaConsumer<>(
            topic, new SimpleStringSchema(), props);
    myconsumer.setStartFromLatest();     

    return myconsumer;
  }

  public FlinkKafkaProducer<String> createStringProducer(
        String topic, String kafkaAddress) {

        return new FlinkKafkaProducer<>(kafkaAddress,
            topic, new SimpleStringSchema());
     }
  }

请指导我如何在不同的kafka节点之间广播套接字流数据?

Would you please guide me how to broadcast a socket stream data between different kafka nodes?

任何帮助将不胜感激.

推荐答案

我认为您的代码是正确的.Kafka 将负责数据的分布".如何在 Kafka 代理之间分配数据将取决于主题配置.

I think your code is correct. Kafka will take care of the "distribution" of the data. How data will be distributed among Kafka brokers will depend on the topic configuration.

此处查看答案以更好地了解 Kafka 主题和分区.

Check the answer here to better understand Kafka topics and partitions.

假设您有 3 个 Kafka 经纪人.然后,如果您使用 3 个副本和 3 个分区创建主题

Lets say you have 3 Kafka brokers. Then if you create your topic with 3 replicas and 3 partitions

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-topic

这将导致您的主题将有 3 个分区,并且每个分区将在您的集群中存储 3 次.使用 3 个代理,您将在每个代理上存储 1 个分区和 2 个副本.

This will cause that your topic will have 3 partitions and each partition will be stored 3 times in your cluster. With 3 brokers you will get stored 1 partition and 2 replicas on each broker.

然后你只需要创建你的 Kafka Sink

Then you just have to create your Kafka Sink

FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
        "broker1:9092,broker2:9092,broker3:9092",
        "my-topic",
        new SimpleStringSchema());

stream.addSink(myProducer);

这篇关于在kafka集群节点间分配数据socket的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆