我想使用 Flink 的 Streaming File Sink 写入 ORC 文件,但它没有正确写入文件 [英] I want to write ORC file using Flink's Streaming File Sink but it doesn’t write files correctly

查看:125
本文介绍了我想使用 Flink 的 Streaming File Sink 写入 ORC 文件,但它没有正确写入文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在从 Kafka 读取数据并尝试以 ORC 格式将其写入 HDFS 文件系统.我使用了他们官方网站上的以下链接参考.但是我可以看到 Flink 为所有数据写了完全相同的内容并制作了这么多文件并且所有文件都可以 103KB

I am reading data from Kafka and trying to write it to the HDFS file system in ORC format. I have used the below link reference from their official website. But I can see that Flink write exact same content for all data and make so many files and all files are ok 103KB

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#orc-format

请在下面找到我的代码.

Please find my code below.

object BeaconBatchIngest extends StreamingBase {
  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  def getTopicConfig(configs: List[Config]): Map[String, String]  = (for (config: Config <- configs) yield (config.getString("sourceTopic"), config.getString("destinationTopic"))).toMap

  def setKafkaConfig():Unit ={
    val kafkaParams = new Properties()
    kafkaParams.setProperty("bootstrap.servers","")
    kafkaParams.setProperty("zookeeper.connect","")
    kafkaParams.setProperty("group.id", DEFAULT_KAFKA_GROUP_ID)
    kafkaParams.setProperty("auto.offset.reset", "latest")
    
    val kafka_consumer:FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String]("sourceTopics", new SimpleStringSchema(),kafkaParams)
    kafka_consumer.setStartFromLatest()
    val stream: DataStream[DataParse] = env.addSource(kafka_consumer).map(new temp)
    val schema: String = "struct<_col0:string,_col1:bigint,_col2:string,_col3:string,_col4:string>"
    val writerProperties = new Properties()

    writerProperties.setProperty("orc.compress", "ZLIB")
    val writerFactory = new OrcBulkWriterFactory(new PersonVectorizer(schema),writerProperties,new org.apache.hadoop.conf.Configuration);
    val sink: StreamingFileSink[DataParse] = StreamingFileSink
          .forBulkFormat(new Path("hdfs://warehousestore/hive/warehouse/metrics_test.db/upp_raw_prod/hour=1/"), writerFactory)
          .build()
    stream.addSink(sink)
  }


  def main(args: Array[String]): Unit = {
    setKafkaConfig()
    env.enableCheckpointing(5000)
    env.execute("Kafka_Flink_HIVE")
  }
}
class temp extends MapFunction[String,DataParse]{

  override def map(record: String): DataParse = {
    new DataParse(record)
  }
}

class DataParse(data : String){
  val parsedJason = parse(data)
  val timestamp = compact(render(parsedJason \ "timestamp")).replaceAll("\"", "").toLong
  val event = compact(render(parsedJason \ "event")).replaceAll("\"", "")
  val source_id = compact(render(parsedJason \ "source_id")).replaceAll("\"", "")
  val app = compact(render(parsedJason \ "app")).replaceAll("\"", "")
  val json = data
}
class PersonVectorizer(schema: String) extends Vectorizer[DataParse](schema) {

  override def vectorize(element: DataParse, batch: VectorizedRowBatch): Unit = {
    val eventColVector = batch.cols(0).asInstanceOf[BytesColumnVector]
    val timeColVector = batch.cols(1).asInstanceOf[LongColumnVector]
    val sourceIdColVector = batch.cols(2).asInstanceOf[BytesColumnVector]
    val appColVector = batch.cols(3).asInstanceOf[BytesColumnVector]
    val jsonColVector = batch.cols(4).asInstanceOf[BytesColumnVector]
    timeColVector.vector(batch.size + 1) = element.timestamp
    eventColVector.setVal(batch.size + 1, element.event.getBytes(StandardCharsets.UTF_8))
    sourceIdColVector.setVal(batch.size + 1, element.source_id.getBytes(StandardCharsets.UTF_8))
    appColVector.setVal(batch.size + 1, element.app.getBytes(StandardCharsets.UTF_8))
    jsonColVector.setVal(batch.size + 1, element.json.getBytes(StandardCharsets.UTF_8))
  }

}

推荐答案

对于批量格式(例如 ORC),StreamingFileSink 会随着每个检查点滚动到新文件.如果减少检查点间隔(目前为 5 秒),则不会写入这么多文件.

With bulk formats (such as ORC), the StreamingFileSink rolls over to new files with every checkpoint. If you reduce the checkpointing interval (currently 5 seconds), it won't write so many files.

这篇关于我想使用 Flink 的 Streaming File Sink 写入 ORC 文件,但它没有正确写入文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆