火花流和连接池实现 [英] spark-streaming and connection pool implementation

查看:443
本文介绍了火花流和连接池实现的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在该火花流媒体网站https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams提到了以下code:

The spark-streaming website at https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams mentions the following code:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

我试图用这个,但org.apache.commons.pool2运行的应用程序来实现失败,并预期java.io.NotSerializableException:

I have tried to implement this using org.apache.commons.pool2 but running the application fails with the expected java.io.NotSerializableException:

15/05/26 08:06:21 ERROR OneForOneStrategy: org.apache.commons.pool2.impl.GenericObjectPool
java.io.NotSerializableException: org.apache.commons.pool2.impl.GenericObjectPool
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
 ...

我不知道它是多么现实的实现连接池是序列化的。有没有人成功地做到了这一点?

I am wondering how realistic it is to implement a connection pool that is serializable. Has anyone succeeded in doing this ?

感谢您。

推荐答案

要解决我们所需要的是一个单独的对象,这种本地资源的问题 - 即这是值得进行一次,并只在JVM实例化一次的对象。幸运的是,Scala中对象提供了这个功能开箱即用的。

To address this "local resource" problem what's needed is a singleton object - i.e. an object that's warranted to be instantiated once and only once in the JVM. Luckily, Scala object provides this functionality out of the box.

要考虑的第二件事是,这个单将提供有关它的托管在同一JVM上运行的所有任务的服务,因此,它的必须的照顾并发和资源管理。

The second thing to consider is that this singleton will provide a service to all tasks running on the same JVM where it's hosted, so, it MUST take care of concurrency and resource management.

让我们尝试素描(*)这样的服务:

Let's try to sketch(*) such service:

class ManagedSocket(private val pool: ObjectPool, val socket:Socket) {
   def release() = pool.returnObject(socket)
}

// singleton object 
object SocketPool {
    var hostPortPool:Map[(String, Int),ObjectPool] = Map()
    sys.addShutdownHook{
        hostPortPool.values.foreach{ // terminate each pool } 
    }

    // factory method
    def apply(host:String, port:String): ManagedSocket = {
        val pool = hostPortPool.getOrElse{(host,port), {
            val p = ??? // create new pool for (host, port)
            hostPortPool += (host,port) -> p
            p
        }
        new ManagedSocket(pool, pool.borrowObject)
    }
}

然后使用就变成了:

Then usage becomes:

val host = ???
val port = ???
stream.foreachRDD { rdd =>
    rdd.foreachPartition { partition => 
        val mSocket = SocketPool(host, port)
        partition.foreach{elem => 
            val os = mSocket.socket.getOutputStream()
            // do stuff with os + elem
        }
        mSocket.release()
    }
}

我假设的问题,使用了 GenericObjectPool 正在并发的照顾。否则,访问每个实例需要使用某种形式的同步进行守卫。

I'm assuming that the GenericObjectPool used in the question is taking care of concurrency. Otherwise, access to each pool instance need to be guarded with some form of synchronization.

(*)code - 需要额外的努力来被转换成工作版本

(*) code provided to illustrate the idea on how to design such object - needs additional effort to be converted into a working version.

这篇关于火花流和连接池实现的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆