Flink Kafka问题中如何利用字符串方法保留顺序将数据流推送到Kafka主题 [英] How to push datastream to kafka topic by retaining the order using string method in Flink Kafka Problem

查看:0
本文介绍了Flink Kafka问题中如何利用字符串方法保留顺序将数据流推送到Kafka主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试每隔500 ms创建一个JSON数据集,并希望将其推送到Kafka主题,以便我可以在下游设置一些窗口并执行计算。以下是我的代码:

package KafkaAsSource

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer}
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper


import java.time.format.DateTimeFormatter
import java.time.LocalDateTime
import java.util.{Optional, Properties}

object PushingDataToKafka {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setMaxParallelism(256)
    env.enableCheckpointing(5000)
    val stream: DataStream[String] = env.fromElements(createData())

    stream.addSink(sendToTopic(stream))
  }

  def getProperties(): Properties = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")

    return properties
  }

  def createData(): String = {
    val minRange: Int = 0
    val maxRange: Int = 1000
    var jsonData = ""
    for (a <- minRange to maxRange) {
      jsonData = "{
  "id":"" + a + "",
  "Category":"Flink",
  "eventTime":"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + ""
  
}"
      println(jsonData)
      Thread.sleep(500)
    }
    return jsonData
  }

  def sendToTopic(): Properties = {
    val producer = new FlinkKafkaProducer[String](
      "topic"
      ,
      new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema())
      ,
      getProperties(),
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE
    )
    return producer
  }
}

它给出以下错误:

type mismatch;
 found   : Any
 required: org.apache.flink.streaming.api.functions.sink.SinkFunction[String]
    stream.addSink(sendToTopic())

修改代码:

object FlinkTest {

  def main(ars: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    env.setMaxParallelism(256)
    var stream = env.fromElements("")
    //env.enableCheckpointing(5000)
    //val stream: DataStream[String] = env.fromElements("hey mc", "1")

    val myProducer = new FlinkKafkaProducer[String](
      "maddy", // target topic
      new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), // serialization schema
      getProperties(), // producer config
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
    val minRange: Int = 0
    val maxRange: Int = 10
    var jsonData = ""
    for (a <- minRange to maxRange) {
      jsonData = "{
  "id":"" + a + "",
  "Category":"Flink",
  "eventTime":"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + ""
  
}"
      println(a)
      Thread.sleep(500)
      stream = env.fromElements(jsonData)
      println(jsonData)
      stream.addSink(myProducer)
    }

    env.execute("hey")
  }

  def getProperties(): Properties = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    return properties
  }
  /*
  def createData(): String = {
    val minRange: Int = 0
    val maxRange: Int = 10
    var jsonData = ""
    for (a <- minRange to maxRange) {
      jsonData = "{
  "id":"" + a + "",
  "Category":"Flink",
  "eventTime":"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + ""
  
}"
      Thread.sleep(500)
    }
    return jsonData
  }
  */

}

Modified Code给了我Kafka主题中的数据,但它不保留顺序。我在循环中做错了什么?此外,还必须将Flink的版本从1.13.5更改为1.12.2

我最初使用的Flink1.13.5ConnectorsScala。我到底错过了什么?

推荐答案

关于此循环的几件事:

for (a <- minRange to maxRange) {
    jsonData = 
      "{
  "id":"" + a + "",
  "Category":"Flink",
  "eventTime":""
      + DateTimeFormatter
        .ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
        .format(LocalDateTime.now) + ""
  
}"
    println(a)
    Thread.sleep(500)
    stream = env.fromElements(jsonData)
    println(jsonData)
    stream.addSink(myProducer)
}
  • 休眠发生在Flink客户端中,并且仅影响客户端在将作业图提交到集群之前组装作业图所需的时间。它对作业的运行方式没有影响。

  • 这个循环创建了10个独立的管道,这些管道将独立、并行地运行,所有这些管道都生成相同的Kafka主题。这些管道将相互竞争。


要获得您正在寻找的行为(跨单个管道的全局排序),您需要从单个源(当然,按顺序)生成所有事件,并以1的并行度运行作业。这样做就可以了:

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}

object FlinkTest {

  def main(ars: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    env.setParallelism(1)

    val myProducer = ...
    val jsonData = (i: Long) => ...

    env.fromSequence(0, 9)
      .map(i => jsonData(i))
      .addSink(myProducer)

      env.execute()
  }
}

您可以将最大并行度保留为256(或其缺省值128);它在这里并不是特别相关。最大并行度是keyBy将散列关键字的散列存储桶的数量,它定义了作业可伸缩性的上限。

这篇关于Flink Kafka问题中如何利用字符串方法保留顺序将数据流推送到Kafka主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆