从单个线程修改哈希映射并从多个线程读取? [英] Modifying hash map from a single thread and reading from multiple threads?

查看:161
本文介绍了从单个线程修改哈希映射并从多个线程读取?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个类,我每30秒从一个后台线程填充一个地图 liveSocketsByDatacenter 然后我有一个方法 getNextSocket 将由多个读者线程调用以获得可用的实时套接字,该套接字使用相同的地图来获取此信息。

I have a class in which I am populating a map liveSocketsByDatacenter from a single background thread every 30 seconds and then I have a method getNextSocket which will be called by multiple reader threads to get a live socket available which uses the same map to get this info.

public class SocketManager {
  private static final Random random = new Random();
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new HashMap<>();
  private final ZContext ctx = new ZContext();

  // Lazy Loaded Singleton Pattern
  private static class Holder {
    private static final SocketManager instance = new SocketManager();
  }

  public static SocketManager getInstance() {
    return Holder.instance;
  }

  private SocketManager() {
    connectToZMQSockets();
    scheduler.scheduleAtFixedRate(new Runnable() {
      public void run() {
        updateLiveSockets();
      }
    }, 30, 30, TimeUnit.SECONDS);
  }

  private void connectToZMQSockets() {
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
    for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
      liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
    }
  }

  private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) {
    List<SocketHolder> socketList = new ArrayList<>();
    for (String address : addresses) {
      try {
        Socket client = ctx.createSocket(socketType);
        // Set random identity to make tracing easier
        String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
        client.setIdentity(identity.getBytes(ZMQ.CHARSET));
        client.setTCPKeepAlive(1);
        client.setSendTimeOut(7);
        client.setLinger(0);
        client.connect(address);

        SocketHolder zmq = new SocketHolder(client, ctx, address, true);
        socketList.add(zmq);
      } catch (Exception ex) {
        // log error
      }
    }
    return socketList;
  }

  // this method will be called by multiple threads to get the next live socket
  public Optional<SocketHolder> getNextSocket() {
    Optional<SocketHolder> liveSocket = Optional.absent();
    List<Datacenters> dcs = Datacenters.getOrderedDatacenters();
    for (Datacenters dc : dcs) {
      liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        break;
      }
    }
    return liveSocket;
  }

  private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
    if (!CollectionUtils.isEmpty(listOfEndPoints)) {
      Collections.shuffle(listOfEndPoints);
      for (SocketHolder obj : listOfEndPoints) {
        if (obj.isLive()) {
          return Optional.of(obj);
        }
      }
    }
    return Optional.absent();
  }

  private void updateLiveSockets() {
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;

    for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();

        boolean status = SendToSocket.getInstance().execute(3, holder, socket);
        boolean isLive = (status) ? true : false;
        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(), liveUpdatedSockets);
    }
  }
}

正如你在我看到的那样上面的类:

As you can see in my above class:


  • 从一个每30秒运行一次的后台线程中,我填充 liveSocketsByDatacenter 映射所有实时套接字。

  • 然后从多个线程,我调用 getNextSocket 方法给我实时套接字可用使用 liveSocketsByDatacenter map来获取所需信息。

  • From a single background thread which runs every 30 seconds, I populate liveSocketsByDatacenter map with all the live sockets.
  • And then from multiple threads, I call getNextSocket method to give me live socket available which uses liveSocketsByDatacenter map to get the required information.

我的上面的代码是否正确安全,所有读者线程都会准确地看到 liveSocketsByDatacenter ?由于我正在修改 liveSocketsByDatacenter 从单个后台线程每30秒映射一次,然后从很多读者线程映射,我调用 getNextSocket 方法所以我不确定我在这里做错了什么。

Is my above code thread safe and all the reader threads will see liveSocketsByDatacenter accurately? Since I am modifying liveSocketsByDatacenter map every 30 seconds from a single background thread and then from a lot of reader threads, I am calling getNextSocket method so I am not sure if I did anything wrong here.

看起来我的getLiveSocket方法中可能存在线程安全问题从地图中获取共享的 ArrayList 并将其洗牌?而且可能还有一些我可能错过的地方。在我的代码中解决这些线程安全问题的最佳方法是什么?

It looks like there might be a thread safety issue in my "getLiveSocket" method as every read gets a shared ArrayList out of the map and shuffles it? And there might be few more places as well which I might have missed. What is the best way to fix these thread safety issues in my code?

如果有更好的方法可以重写这个,那么我也是开放的。

If there is any better way to rewrite this, then I am open for that as well.

推荐答案

为了保证线程安全,您的代码必须同步对所有共享可变状态的任何访问。

To be thread-safe, your code must synchronize any access to all shared mutable state.

在这里你共享 liveSocketsByDatacenter ,一个 HashMap的实例可以并发读取的 Map 非线程安全实现(通过 updateLiveSockets getNextSocket )并修改(通过 connectToZMQSockets updateLiveSockets )没有同步任何已经足以使您的代码非线程安全的访问。此外,此 Map 的值是 ArrayList 非线程安全实现的实例还可以同时读取的 List (通过 getNextSocket updateLiveSockets )并修改(通过 getLiveSocket 更准确地说是 Collections.shuffle )。

Here you share liveSocketsByDatacenter, an instance of HashMap a non thread-safe implementation of a Map that can potentially be concurrently read (by updateLiveSockets and getNextSocket) and modified (by connectToZMQSockets and updateLiveSockets) without synchronizing any access which is already enough to make your code non thread safe. Moreover, the values of this Map are instances of ArrayList a non thread-safe implementation of a List that can also potentially be concurrently read (by getNextSocket and updateLiveSockets) and modified (by getLiveSocket more precisely by Collections.shuffle).

解决2线程安全问题的简单方法可能是:

The simple way to fix your 2 thread safety issues could be to:


  1. 使用 ConcurrentHashMap 而不是 HashMap 为您的变量 liveSocketsByDatacenter ,因为它本身就是线程安全实现地图

  2. 放置不可修改的版本使用实例作为地图的值。 util.List-rel =nofollow noreferrer> Collections.unmodifiableList(List<?extends T>列表) ,您的列表将是不可变的,因此线程安全。

  1. use a ConcurrentHashMap instead of a HashMap for your variable liveSocketsByDatacenter as it is a natively thread safe implementation of a Map.
  2. put the unmodifiable version of your ArrayList instances as value of your map using Collections.unmodifiableList(List<? extends T> list), your lists would then be immutable so thread safe.

例如:

liveSocketsByDatacenter.put(
    entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)
);`




  1. 重写你的方法 getLiveSocket 以避免调用 Collections.shuffle 直接在你的列表上,你可以例如只播放实时套接字列表而不是所有套接字或使用列表的副本(例如 new ArrayList<>(listOfEndPoints))而不是列表本身。

  1. rewrite your method getLiveSocket to avoid calling Collections.shuffle directly on your list, you could for example shuffle only the list of live sockets instead of all sockets or use a copy of your list (with for example new ArrayList<>(listOfEndPoints)) instead of the list itself.

例如:

private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
    if (!CollectionUtils.isEmpty(listOfEndPoints)) {
        // The list of live sockets
        List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
        for (SocketHolder obj : listOfEndPoints) {
            if (obj.isLive()) {
                liveOnly.add(obj);
            }
        }
        if (!liveOnly.isEmpty()) {
            // The list is not empty so we shuffle it an return the first element
            Collections.shuffle(liveOnly);
            return Optional.of(liveOnly.get(0));
        }
    }
    return Optional.absent();
}






对于#1为你似乎经常阅读,很少(每30秒只修改一次)修改你的地图,你可以考虑重建你的地图然后分享它的不可变版本(使用 Collections.unmodifiableMap(Map<?extends K,?extends V> m) )每30秒,这种方法在大多数读取方案中非常有效,因为您不再为访问地图内容的任何同步机制付出代价。


For #1 as you seem to frequently read and rarely (only once every 30 seconds) modify your map, you could consider to rebuild your map then share its immutable version (using Collections.unmodifiableMap(Map<? extends K,? extends V> m)) every 30 seconds, this approach is very efficient in mostly read scenario as you no longer pay the price of any synchronization mechanism to access to the content of your map.

您的代码将是:

// Your variable is no more final, it is now volatile to ensure that all 
// threads will see the same thing at all time by getting it from
// the main memory instead of the CPU cache
private volatile Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter 
    = Collections.unmodifiableMap(new HashMap<>());

private void connectToZMQSockets() {
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
    // The map in which I put all the live sockets
    Map<Datacenters, List<SocketHolder>> liveSockets = new HashMap<>();
    for (Map.Entry<Datacenters, ImmutableList<String>> entry : 
        socketsByDatacenter.entrySet()) {

        List<SocketHolder> addedColoSockets = connect(
            entry.getKey(), entry.getValue(), ZMQ.PUSH
        );
        liveSockets.put(entry.getKey(), Collections.unmodifiableList(addedColoSockets));
    }
    // Set the new content of my map as an unmodifiable map
    this.liveSocketsByDatacenter = Collections.unmodifiableMap(liveSockets);
}

public Optional<SocketHolder> getNextSocket() {
    // For the sake of consistency make sure to use the same map instance
    // in the whole implementation of my method by getting my entries
    // from the local variable instead of the member variable
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = 
        this.liveSocketsByDatacenter;
    ...
}
...
// Added the modifier synchronized to prevent concurrent modification
// it is needed because to build the new map we first need to get the
// old one so both must be done atomically to prevent concistency issues
private synchronized void updateLiveSockets() {
    // Initialize my new map with the current map content
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = 
        new HashMap<>(this.liveSocketsByDatacenter);
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
    // The map in which I put all the live sockets
    Map<Datacenters, List<SocketHolder>> liveSockets = new HashMap<>();
    for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
        ...
        liveSockets.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
    }
    // Set the new content of my map as an unmodifiable map
    this.liveSocketsByDatacenter = Collections.unmodifiableMap(liveSocketsByDatacenter);
}

您的字段 liveSocketsByDatacenter 也可以是类型 AtomicReference< Map< Datacenters,List< SocketHolder>>> ,然后它将是 final ,您的地图仍将存储在 volatile 变量中但在类 AtomicReference 中。

Your field liveSocketsByDatacenter could also be of type AtomicReference<Map<Datacenters, List<SocketHolder>>> , it would then be final, your map will still be stored in a volatile variable but within the class AtomicReference.

之前的代码将是:

private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter 
    = new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));

...

private void connectToZMQSockets() {
    ...
    // Update the map content
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSockets));
}

public Optional<SocketHolder> getNextSocket() {
    // For the sake of consistency make sure to use the same map instance
    // in the whole implementation of my method by getting my entries
    // from the local variable instead of the member variable
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = 
        this.liveSocketsByDatacenter.get();
    ...
}

// Added the modifier synchronized to prevent concurrent modification
// it is needed because to build the new map we first need to get the
// old one so both must be done atomically to prevent concistency issues
private synchronized void updateLiveSockets() {
    // Initialize my new map with the current map content
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = 
        new HashMap<>(this.liveSocketsByDatacenter.get());
    ...
    // Update the map content
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
}

这篇关于从单个线程修改哈希映射并从多个线程读取?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆