每个JVM中的Spark Streaming连接池 [英] Spark Streaming connection pool in each JVM

查看:291
本文介绍了每个JVM中的Spark Streaming连接池的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的Spark Streaming应用程序中,我有很多I/O操作,例如codis,hbase等.我想确保每个执行器中只有一个连接池,如何才能做到这一点呢? 现在,我分散地实现了一些静态类,这对管理不利.如何将它们集中到一个类中,例如xxContext,一些集中在SparkContext中,需要广播吗?我知道广播大型只读数据集会很好,但是这些连接池又如何呢? Java或scala都可以接受.

In my spark streaming app, I have many I/O operations, such as codis, hbase, etc. I want to make sure exactly one connection pool in each executor, how can I do this elegantly? Now, I implement some static class dispersedly, this is not good for management. How about centralize them into one class like xxContext, some what like SparkContext, and need I broadcast it? I know it's good to broadcast large read-only dataset, but how about these connection pools? Java or scala are both acceptable.

推荐答案

foreachPartition最合适

示例代码段

foreachPartition is best fit

Sample code snippet to it

val dstream = ...

dstream.foreachRDD { rdd =>

  //loop through each parttion in rdd
  rdd.foreachPartition { partitionOfRecords =>

    //1. Create Connection object/pool for Codis, HBase

    // Use it if you want record level control in rdd or partion
    partitionOfRecords.foreach { record =>
      // 2. Write each record to external client 
    }
    
    // 3. Batch insert if connector supports from an RDD to external source
  }

  //Use 2 or 3 to write data as per your requirement 
}

针对类似用例的另一种答案

检查此内容:使用foreachRDD的设计模式

这篇关于每个JVM中的Spark Streaming连接池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆