将键值数据库与Spark集成 [英] Integrate key-value database with Spark

查看:165
本文介绍了将键值数据库与Spark集成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述



我想制作一个Spark集群,用于从RocksDB数据库(或任何其他密钥价值商店)。然而,在这一刻,我能做的最好的就是从数据库中将整个数据集提取到每个群集节点的内存中(例如映射到一个映射中),并从该对象构建RDD。



我只需要做什么来获取必要的数据(如Spark对HDFS所做的那样)?我已经阅读了Hadoop输入格式和记录读取器,但我并没有完全理解我应该实现的内容。



我知道这是一个广泛的问题,但我真的很感谢帮助我开始。预先感谢您。

解决方案

以下是一个可能的解决方案。我假设你有一个客户端库,用于你想要访问的键值存储(RocksDB)。

KeyValuePair 表示一个bean类来自您的键值存储的一个键值对。



课程

  / *懒惰迭代器从KeyValue存储中读取* / 
class KeyValueIterator实现Iterator< KeyValuePair> {
public KeyValueIterator(){
// TODO使用Java客户端库初始化您的自定义阅读器
}
@Override
public boolean hasNext(){
// TODO
}

@Override
public KeyValuePair next(){
// TODO
}
}
class KeyValueReader实现了FlatMapFunction< KeyValuePair,KeyValuePair>(){
@Override
public Iterator< KeyValuePair>调用(KeyValuePair keyValuePair)抛出E​​xception {
//忽略空'keyValuePair'对象
return new KeyValueIterator();
}
}

创建KeyValue RDD

  / *带有虚拟KeyValuePair实例的列表* / 
ArrayList< KeyValuePair> keyValuePairs = new ArrayList<>();
keyValuePairs.add(new KeyValuePair());
JavaRDD< KeyValuePair> keyValuePairRDD = javaSparkContext.parallelize(keyValuePairs);
/ *懒惰地读取一个键值对* /
keyValuePairRDD = keyValuePairRDD.flatMap(new KeyValueReader());

注意:

以上解决方案默认创建了一个带有两个分区的RDD(其中一个将为空)。在对 keyValuePairRDD 进行任何转换之前增加分区,以便跨处理器执行处理。
不同的方式增加分区:

  keyValuePairRDD.repartition(partitionCounts)
// OR
keyValuePairRDD.partitionBy(...)


I'm having trouble understanding how Spark interacts with storage.

I would like to make a Spark cluster that fetches data from a RocksDB database (or any other key-value store). However, at this moment, the best I can do is fetch the whole dataset from the database into memory in each of the cluster nodes (into a map for example) and build an RDD from that object.

What do I have to do to fetch only the necessary data (like Spark does with HDFS)? I've read about Hadoop Input Format and Record Readers, but I'm not completely grasping what I should implement.

I know this is kind of a broad question, but I would really appreciate some help to get me started. Thank you in advance.

解决方案

Here is one possible solution. I assume you have client library for the key-value store(RocksDB in your case) that you want to access.
KeyValuePair represents a bean class representing one Key-value pair from your key-value store.

Classes

/*Lazy iterator to read from KeyValue store*/
class KeyValueIterator implements Iterator<KeyValuePair> {
    public KeyValueIterator() {
        //TODO initialize your custom reader using java client library
    }
    @Override
    public boolean hasNext() {
        //TODO
    }

    @Override
    public KeyValuePair next() {
        //TODO
    }
}
class KeyValueReader implements FlatMapFunction<KeyValuePair, KeyValuePair>() {
    @Override
    public Iterator<KeyValuePair> call(KeyValuePair keyValuePair) throws Exception {
        //ignore empty 'keyValuePair' object
        return new KeyValueIterator();
    }
}

Create KeyValue RDD

/*list with a dummy KeyValuePair instance*/
ArrayList<KeyValuePair> keyValuePairs = new ArrayList<>();
keyValuePairs.add(new KeyValuePair());
JavaRDD<KeyValuePair> keyValuePairRDD = javaSparkContext.parallelize(keyValuePairs);
/*Read one key-value pair at a time lazily*/    
keyValuePairRDD = keyValuePairRDD.flatMap(new KeyValueReader());

Note:

Above solution creates an RDD with two partitions by default(one of them will be empty). Increase the partitions before applying any transformation on keyValuePairRDD to distribute the processing across executors. Different ways to increase partitions:

keyValuePairRDD.repartition(partitionCounts)
//OR
keyValuePairRDD.partitionBy(...)

这篇关于将键值数据库与Spark集成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆