创建支持&QUOT一个ConcurrentHashMap的;快照" [英] Creating a ConcurrentHashMap that supports "snapshots"

查看:178
本文介绍了创建支持&QUOT一个ConcurrentHashMap的;快照"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图创建一个的ConcurrentHashMap 支持快照,以提供一致的迭代器,并想知道如果有一个更有效的方法来做到这一点。问题是,如果两个迭代器在同一时间产生那么他们需要阅读相同的价值观,以及并发的哈希表的弱一致的迭代器的定义并不能保证这是这种情况。我也想避免锁如果可能的话:有几千值的地图和处理每个项目都需要花费几十毫秒,而我不希望有在这段时间,因为这阻止作家可能会导致作家阻塞一分钟或更长的时间。

I'm attempting to create a ConcurrentHashMap that supports "snapshots" in order to provide consistent iterators, and am wondering if there's a more efficient way to do this. The problem is that if two iterators are created at the same time then they need to read the same values, and the definition of the concurrent hash map's weakly consistent iterators does not guarantee this to be the case. I'd also like to avoid locks if possible: there are several thousand values in the map and processing each item takes several dozen milliseconds, and I don't want to have to block writers during this time as this could result in writers blocking for a minute or longer.

我到目前为止有:

  1. 的ConcurrentHashMap的键是字符串,其值 ConcurrentSkipListMap&LT的实例;龙,T>
  2. 当一个元素被添加到HashMap中使用的putIfAbsent ,然后一个新的skiplist分配,对象是通过添加skipList.put( System.nanoTime(),T)
  3. 要查询的地图,我用 map.get(键).lastEntry()的getValue()返回最近的值。若要查询快照(如使用迭代器),我用 map.get(键).lowerEntry(iteratorTimestamp).getValue(),其中 iteratorTimestamp 的结果是 System.nanoTime()调用的时候,迭代器初始化。
  4. 如果一个对象被删除,我用 map.get(键)。把(时间戳,SnapShotMap.DELETED),其中删除的一个静态的最终目标。
  1. The ConcurrentHashMap's keys are Strings, and its values are instances of ConcurrentSkipListMap<Long, T>
  2. When an element is added to the hashmap with putIfAbsent, then a new skiplist is allocated, and the object is added via skipList.put(System.nanoTime(), t).
  3. To query the map, I use map.get(key).lastEntry().getValue() to return the most recent value. To query a snapshot (e.g. with an iterator), I use map.get(key).lowerEntry(iteratorTimestamp).getValue(), where iteratorTimestamp is the result of System.nanoTime() called when the iterator was initialized.
  4. If an object is deleted, I use map.get(key).put(timestamp, SnapShotMap.DELETED), where DELETED is a static final object.

问题:

  1. 有一个已经实现了这个库?或限制,是否有一个数据结构,会比的ConcurrentHashMap ConcurrentSkipListMap 比较合适?我的钥匙是可比的,所以也许某种并发树将更好地支持快照不是并发的哈希表。
  2. 我如何prevent这事不断增长?我可以删除所有的跳跃列表项的钥匙小于X(除​​了在地图上的最后一个键),在X或之前初始化已经完成了所有迭代之后,但我不知道的一个很好的方式,以确定何时这已经发生了:我可以标志一个迭代完成时的规则hasNext 方法返回false,但不是所有的迭代器都一定要去完成运行;我可以保持一个的WeakReference 来迭代器,这样我可以发现,当它被垃圾回收,但我想不出一个好办法来检测这个以外使用线程,通过弱引用的集合迭代,然后睡几分钟 - 理想的线程将阻止对的WeakReference 当包裹参考GC'd通知,但我不认为这是一个选项。

  1. Is there a library that already implements this? Or barring that, is there a data structure that would be more appropriate than the ConcurrentHashMap and the ConcurrentSkipListMap? My keys are comparable, so maybe some sort of concurrent tree would better support snapshots than a concurrent hash table.
  2. How do I prevent this thing from continually growing? I can delete all of the skip list entries with keys less than X (except for the last key in the map) after all iterators that were initialized on or before X have completed, but I don't know of a good way to determine when this has happened: I can flag that an iterator has completed when its hasNext method returns false, but not all iterators are necessarily going to run to completion; I can keep a WeakReference to an iterator so that I can detect when it's been garbage collected, but I can't think of a good way to detect this other than by using a thread that iterates through the collection of weak references and then sleeps for several minutes - ideally the thread would block on the WeakReference and be notified when the wrapped reference is GC'd, but I don't think this is an option.

ConcurrentSkipListMap<Long, WeakReference<Iterator>> iteratorMap;
while(true) {
    long latestGC = 0;
    for(Map.Entry<Long, WeakReference<Iterator>> entry : iteratorMap.entrySet()) {
        if(entry.getValue().get() == null) {
            iteratorMap.remove(entry.getKey());
            latestGC = entry.getKey();
        } else break;
    }
    // remove ConcurrentHashMap entries with timestamps less than `latestGC`
    Thread.sleep(300000); // five minutes
}

编辑::要澄清一些混乱的答案和意见,我目前通过弱一致的迭代器code写的在公司另一个部门,他们都问我增加迭代器一致性的实力。他们已经知道的事实,这是不可行的,我做100%一致的迭代器,他们只是想在我的部分尽最大努力。他们更关心的吞吐量比迭代器的一致性,使粗粒度的锁是不是一种选择。

To clear up some confusion in the answers and comments, I'm currently passing weakly consistent iterators to code written by another division in the company, and they have asked me to increase the strength of the iterators' consistency. They are already aware of the fact that it is infeasible for me to make 100% consistent iterators, they just want a best effort on my part. They care more about throughput than iterator consistency, so coarse-grained locks are not an option.

推荐答案

什么是您的实际使用情况,需要特殊的实现?从的ConcurrentHashMap 的Javadoc中(重点补充):

What is your actual use case that requires a special implementation? From the Javadoc of ConcurrentHashMap (emphasis added):

反演反映拿着在他们的发病最近完成的更新操作的结果。 ... 迭代器和枚举返回的元素将反映在或自创建迭代器/枚举的哈希表在某些时候的状态。他们不抛出ConcurrentModificationException。然而,迭代器被设计成在一个时间使用仅由一个线程

Retrievals reflect the results of the most recently completed update operations holding upon their onset. ... Iterators and Enumerations return elements reflecting the state of the hash table at some point at or since the creation of the iterator/enumeration. They do not throw ConcurrentModificationException. However, iterators are designed to be used by only one thread at a time.

所以经常 ConcurrentHashMap.values​​()。迭代器()会给你一个一致的迭代,但仅限于一次性使用一个线程。如果你需要和/或多个线程使用相同的快照多次,我建议做地图的副本。

So the regular ConcurrentHashMap.values().iterator() will give you a "consistent" iterator, but only for one-time use by a single thread. If you need to use the same "snapshot" multiple times and/or by multiple threads, I suggest making a copy of the map.

编辑:随着新的信息和坚持了坚决一致的迭代,我提供这个解决方案。请注意,使用一个ReadWriteLock中的具有以下含义:

With the new information and the insistence for a "strongly consistent" iterator, I offer this solution. Please note that the use of a ReadWriteLock has the following implications:

  • 写入将被序列化的(只有一个作家在一个时间),所以写性能可能会受到影响。
  • 并行读取允许的,只要有正在进行中没有写,所以读取性能的影响应该很小。
  • 活动的读者块作家的,但只只要它需要检索参考当前的快照。一旦一个线程具有快照,它不再阻止作家,不管需要多长时间来处理快照的信息。
  • 读者被封锁的,而任何写处于活动状态;一旦写入完成,然后所有的读者将有机会获得新的快照,直到一个新的写替换它。
  • Writes will be serialized (only one writer at a time) so write performance may be impacted.
  • Concurrent reads are allowed as long as there is no write in progress, so read performance impact should be minimal.
  • Active readers block writers but only as long as it takes to retrieve the reference to the current "snapshot". Once a thread has the snapshot, it no longer blocks writers no matter how long it takes to process the information in the snapshot.
  • Readers are blocked while any write is active; once the write finishes then all readers will have access to the new snapshot until a new write replaces it.

一致性是通过序列化写入和制作的复制的对电流值来实现的每个写的。持有的引用陈旧快照读者可以继续使用旧的快照,而不必担心修改和垃圾收集器将尽快回收旧的快照,因为没有一个是更多的使用它。据推测,存在的不要求的一种读卡器,从较早的时间点请求的快照。

Consistency is achieved by serializing the writes and making a copy of the current values on each and every write. Readers that hold a reference to a "stale" snapshot can continue to use the old snapshot without worrying about modification, and the garbage collector will reclaim old snapshots as soon as no one is using it any more. It is assumed that there is no requirement for a reader to request a snapshot from an earlier point in time.

由于快照多个并发线程之间潜在的共享,快照是只读的,不能修改。此限制也适用于删除快照创建的任何迭代实例()方法。

Because snapshots are potentially shared among multiple concurrent threads, the snapshots are read-only and cannot be modified. This restriction also applies to the remove() method of any Iterator instances created from the snapshot.

import java.util.*;
import java.util.concurrent.locks.*;

public class StackOverflow16600019 <K, V> {
    private final ReadWriteLock locks = new ReentrantReadWriteLock();
    private final HashMap<K,V> map = new HashMap<>();
    private Collection<V> valueSnapshot = Collections.emptyList();

    public V put(K key, V value) {
        locks.writeLock().lock();
        try {
            V oldValue = map.put(key, value);
            updateSnapshot();
            return oldValue;
        } finally {
            locks.writeLock().unlock();
        }
    }

    public V remove(K key) {
        locks.writeLock().lock();
        try {
            V removed = map.remove(key);
            updateSnapshot();
            return removed;
        } finally {
            locks.writeLock().unlock();
        }
    }

    public Collection<V> values() {
        locks.readLock().lock();
        try {
            return valueSnapshot; // read-only!
        } finally {
            locks.readLock().unlock();
        }
    }

    /** Callers MUST hold the WRITE LOCK. */
    private void updateSnapshot() {
        valueSnapshot = Collections.unmodifiableCollection(
            new ArrayList<V>(map.values())); // copy
    }
}

这篇关于创建支持&QUOT一个ConcurrentHashMap的;快照&QUOT;的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆