同时阅读地图,而单个后台线程会定期修改它 [英] Concurrently reading a Map while a single background thread regularly modifies it

查看:119
本文介绍了同时阅读地图,而单个后台线程会定期修改它的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个类,它在 updateLiveSockets() liveSocketsByDatacenter $ c>方法,然后我有一个方法 getNextSocket()这将被多个读取器线程调用,以获得一个可用的活动套接字,它使用相同的地图来获取此信息。

  public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final AtomicReference< Map< Datacenters,List< SocketHolder>>> liveSocketsByDatacenter =
New AtomicReference<>(Collections.unmodifiableMap(new HashMap<>));
private final ZContext ctx = new ZContext();

// Lazy Loaded Singleton Pattern
private static class Holder {
private static final SocketManager instance = new SocketManager();
}

public static SocketManager getInstance(){
return Holder.instance;


私人SocketManager(){
connectToZMQSockets();
scheduler.scheduleAtFixedRate(new Runnable(){
public void run(){
updateLiveSockets();
}
},30,30,TimeUnit.SECONDS) ;


//在启动期间进行连接并填充一次
private void connectToZMQSockets(){
Map< Datacenters,ImmutableList< String>> socketsByDatacenter = Utils.SERVERS;
//将所有活插座映射到的地图
Map< Datacenters,List< SocketHolder>> updatedLiveSocketsByDatacenter = new HashMap<>(); (Map.Entry< Datacenters,ImmutableList< String>> entry:socketsByDatacenter.entrySet()){
List< SocketHolder> addedColoSockets = connect(entry.getKey(),entry.getValue(),ZMQ.PUSH);
updatedLiveSocketsByDatacenter.put(entry.getKey(),
Collections.unmodifiableList(addedColoSockets));
}
//更新地图内容
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter));
}

private List< SocketHolder> connect(Datacenters colo,List< String> addresses,int socketType){
List< SocketHolder> socketList = new ArrayList<>();
for(String address:addresses){
try {
Socket client = ctx.createSocket(socketType);
//设置随机标识以使跟踪更容易
String identity = String.format(%04X-%04X,random.nextInt(),random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.setTCPKeepAlive(1);
client.setSendTimeOut(7);
client.setLinger(0);
client.connect(address);

SocketHolder zmq = new SocketHolder(client,ctx,address,true);
socketList.add(zmq);
} catch(Exception ex){
// log error
}
}
return socketList;
}

//这个方法将被多个线程调用以获取下一个活动套接字
//是否存在任何并发或线程安全问题或竞争条件?
public可选< SocketHolder> getNextSocket(){
//为了保持一致性,请确保在我的方法的整个实现中使用相同的映射实例
//通过从局部变量中获取条目
// //而不是成员变量
Map< Datacenters,List< SocketHolder>> liveSocketsByDatacenter =
this.liveSocketsByDatacenter.get();
可选< SocketHolder> liveSocket = Optional.absent();
列出<数据中心> dcs = Datacenters.getOrderedDatacenters();
for(Datacenters dc:dcs){
liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if(liveSocket.isPresent()){
break;
}
}
return liveSocket;
}

//是否有任何并发​​或线程安全问题或竞争条件?
private可选< SocketHolder> getLiveSocketX(final List< SocketHolder> endpoints){
if(!CollectionUtils.isEmpty(endpoints)){
//活套接字列表
List< SocketHolder> liveOnly = new ArrayList<>(endpoints.size());
for(SocketHolder obj:endpoints){
if(obj.isLive()){
liveOnly.add(obj);


if(!liveOnly.isEmpty()){
//列表不是空的,所以我们将其拖回第一个元素
Collections。洗牌(liveOnly);
return Optional.of(liveOnly.get(0));
}
}
return Optional.absent();
}

//添加了同步修饰符以防止并发修改
//需要它,因为要构建新映射,我们首先需要获得
//因此必须以原子方式完成以防止出现一致性问题
private synchronized void updateLiveSockets(){
Map< Datacenters,ImmutableList< String>> socketsByDatacenter = Utils.SERVERS;

//使用当前地图内容初始化我的新地图
Map< Datacenters,List< SocketHolder>> liveSocketsByDatacenter =
new HashMap<>(this.liveSocketsByDatacenter.get()); (Entry< Datacenters,ImmutableList< String>> entry:socketsByDatacenter.entrySet()){
List< SocketHolder>

liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List< SocketHolder> liveUpdatedSockets = new ArrayList<>(); (SocketHolder liveSocket:liveSockets)
{// LINE A
Socket socket = liveSocket.getSocket();
字符串endpoint = liveSocket.getEndpoint();
Map< byte [],byte []> holder = populateMap();
留言信息=新留言(持有者,Partition.COMMAND);

布尔状态= SendToSocket.getInstance()。execute(message.getAdd(),holder,socket);
布尔isLive =(状态)?真假;
//我使用`SocketHolder`类的方式有什么问题吗?
SocketHolder zmq = new SocketHolder(socket,liveSocket.getContext(),endpoint,isLive);
liveUpdatedSockets.add(zmq);

liveSocketsByDatacenter.put(entry.getKey(),
Collections.unmodifiableList(liveUpdatedSockets));
}
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));




$ b $ p
$ b

正如你在我的课程中所看到的那样:




  • 从每隔30秒运行一次的后台线程中,我将所有的 liveSocketsByDatacenter updateLiveSockets()方法中的活插座。
  • 然后从多线程中,我调用 getNextSocket() 方法给我一个可用的活套接字,它使用 liveSocketsByDatacenter 地图来获取所需信息。



我有我的代码工作正常,没有任何问题,并希望看看是否有更好或更有效的方式来写这个。我也想得到关于线程安全问题或任何竞争条件的意见,但到目前为止我还没有看到任何,但我可能是错的。



我主要担心 updateLiveSockets()方法和 getLiveSocketX()方法。我正在迭代 liveSockets ,它是 List 中的 SocketHolder at LINE A,然后创建一个新的 SocketHolder 对象并添加到另一个新列表中。在这里可以吗?



注意: SocketHolder 是一个不可变的类。您可以忽略 ZeroMQ 我拥有的东西。

解决方案

以下同步技术。


  1. 带有活动套接字数据的地图位于原子参考之后,这可以安全地切换地图。 b $ b
  2. updateLiveSockets()方法是同步的(隐式地在此上),这将阻止同时由两个线程切换地图。 b
  3. 如果切换发生在 getNextSocket()方法中,则在使用它时避免混淆,从而对该映射进行本地引用。

它现在是线程安全吗?

线程安全始终取决于共享可变数据是否有适当的同步。在这种情况下,共享可变数据是数据中心到其SocketHolders列表的映射。



事实上,映射位于 AtomicReference ,并且使用本地副本以便在地图上进行足够的同步。你的方法需要一个版本的地图并使用它,由于 AtomicReference 的性质,切换版本是线程安全的。这也可以通过为map volatile 创建成员字段来实现,因为您只需更新引用(您不会执行任何check-then-act
$ b 由于 scheduleAtFixedRate()保证传递的 Runnable code>不会与其自身同时运行,但不需要 updateLiveSockets()上的 synchronized ,但是,它也没有造成任何实际的危害。



所以是的,这个类是线程安全的,因为它是。



但是,并不完全清楚 SocketHolder 是否可以被多个线程同时使用。实际上,这个类只是尝试通过选择一个随机的实例来最小化并发使用 SocketHolder s(不需要随机选择一个随机索引来拖动整个数组)。它不会真正阻止并发使用。



能否提高效率?

我相信它可以。在查看 updateLiveSockets()方法时,它似乎构建了完全相同的映射,除了 SocketHolder s可能对 isLive 标志有不同的值。这使我得出结论,我不想切换整个地图,而只想切换地图中的每个列表。如果我使用 ConcurrentHashMap



一个 ConcurrentHashMap ,并且不切换地图,而是切换地图中的值,我可以摆脱 AtomicReference

要更改映射,我可以创建新列表并将其直接放入地图中。这是更高效的,因为我更快发布数据,并且创建更少的对象,而我的同步只是建立在现成的组件上,这有利于可读性。



这是我的构建为简洁起见,省略了一些不太相关的部分)

  public class SocketManager {
private static final Random random = new随机();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map< Datacenters,List< SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); //使用ConcurrentHashMap
private final ZContext ctx = new ZContext();

// ...

私人SocketManager(){
connectToZMQSockets();
scheduler.scheduleAtFixedRate(this :: updateLiveSockets,30,30,TimeUnit.SECONDS);


//在启动期间进行连接并填充一次
private void connectToZMQSockets(){
Map< Datacenters,List< String>> socketsByDatacenter = Utils.SERVERS; (Map.Entry< Datacenters,List< String>> entry:socketsByDatacenter.entrySet()){
List< SocketHolder> addedColoSockets = connect(entry.getValue(),ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(),addedColoSockets); //我们可以直接放入地图
}
}

// ...

//这个方法将被多个线程来获得下一个活套接字
//是否有任何并发​​或线程安全问题或竞争条件?
public可选< SocketHolder> getNextSocket(){
for(Datacenters dc:Datacenters.getOrderedDatacenters()){
可选< SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); //不再需要本地副本ConcurrentHashMap,确保获得最新映射的List< SocketHolder>
if(liveSocket.isPresent()){
return liveSocket;
}
}
return Optional.absent();
}

//是否有任何并发​​或线程安全问题或竞争条件?
private可选< SocketHolder> getLiveSocket(final List< SocketHolder> listOfEndPoints){
if(!CollectionUtils.isEmpty(listOfEndPoints)){
//活套接字列表
List< SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for(SocketHolder obj:listOfEndPoints){
if(obj.isLive()){
liveOnly.add(obj);


if(!liveOnly.isEmpty()){
//列表不是空的,所以我们随机地将它返回第一个元素
return可选.of(liveOnly.get(random.nextInt(liveOnly.size()))); //只需选择一个
}
}
return Optional.absent();
}

//不需要进行同步
private void updateLiveSockets(){
Map< Datacenters,List< String>> socketsByDatacenter = Utils.SERVERS; (Map.Entry< Datacenters,List< String>> entry:socketsByDatacenter.entrySet()){
List< SocketHolder>

。 liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List< SocketHolder> liveUpdatedSockets = new ArrayList<>(); (SocketHolder liveSocket:liveSockets)
{// LINE A
Socket socket = liveSocket.getSocket();
字符串endpoint = liveSocket.getEndpoint();
Map< byte [],byte []> holder = populateMap();
留言信息=新留言(持有者,Partition.COMMAND);

布尔状态= SendToSocket.getInstance()。execute(message.getAdd(),holder,socket);
布尔isLive =(状态)?真假;

SocketHolder zmq = new SocketHolder(socket,liveSocket.getContext(),endpoint,isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(),Collections.unmodifiableList(liveUpdatedSockets)); //将它直接放入地图中,映射将以线程安全的方式更新。
}
}

}


I have a class in which I am populating a map liveSocketsByDatacenter from a single background thread every 30 seconds inside updateLiveSockets() method and then I have a method getNextSocket() which will be called by multiple reader threads to get a live socket available which uses the same map to get this information.

public class SocketManager {
  private static final Random random = new Random();
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter =
      new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
  private final ZContext ctx = new ZContext();

  // Lazy Loaded Singleton Pattern
  private static class Holder {
    private static final SocketManager instance = new SocketManager();
  }

  public static SocketManager getInstance() {
    return Holder.instance;
  }

  private SocketManager() {
    connectToZMQSockets();
    scheduler.scheduleAtFixedRate(new Runnable() {
      public void run() {
        updateLiveSockets();
      }
    }, 30, 30, TimeUnit.SECONDS);
  }

  // during startup, making a connection and populate once
  private void connectToZMQSockets() {
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
    // The map in which I put all the live sockets
    Map<Datacenters, List<SocketHolder>> updatedLiveSocketsByDatacenter = new HashMap<>();
    for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
      updatedLiveSocketsByDatacenter.put(entry.getKey(),
          Collections.unmodifiableList(addedColoSockets));
    }
    // Update the map content
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter));
  }

  private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) {
    List<SocketHolder> socketList = new ArrayList<>();
    for (String address : addresses) {
      try {
        Socket client = ctx.createSocket(socketType);
        // Set random identity to make tracing easier
        String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
        client.setIdentity(identity.getBytes(ZMQ.CHARSET));
        client.setTCPKeepAlive(1);
        client.setSendTimeOut(7);
        client.setLinger(0);
        client.connect(address);

        SocketHolder zmq = new SocketHolder(client, ctx, address, true);
        socketList.add(zmq);
      } catch (Exception ex) {
        // log error
      }
    }
    return socketList;
  }

  // this method will be called by multiple threads to get the next live socket
  // is there any concurrency or thread safety issue or race condition here?
  public Optional<SocketHolder> getNextSocket() {
    // For the sake of consistency make sure to use the same map instance
    // in the whole implementation of my method by getting my entries
    // from the local variable instead of the member variable
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
        this.liveSocketsByDatacenter.get();
    Optional<SocketHolder> liveSocket = Optional.absent();
    List<Datacenters> dcs = Datacenters.getOrderedDatacenters();
    for (Datacenters dc : dcs) {
      liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        break;
      }
    }
    return liveSocket;
  }

  // is there any concurrency or thread safety issue or race condition here?
  private Optional<SocketHolder> getLiveSocketX(final List<SocketHolder> endpoints) {
    if (!CollectionUtils.isEmpty(endpoints)) {
      // The list of live sockets
      List<SocketHolder> liveOnly = new ArrayList<>(endpoints.size());
      for (SocketHolder obj : endpoints) {
        if (obj.isLive()) {
          liveOnly.add(obj);
        }
      }
      if (!liveOnly.isEmpty()) {
        // The list is not empty so we shuffle it an return the first element
        Collections.shuffle(liveOnly);
        return Optional.of(liveOnly.get(0));
      }
    }
    return Optional.absent();
  }

  // Added the modifier synchronized to prevent concurrent modification
  // it is needed because to build the new map we first need to get the
  // old one so both must be done atomically to prevent concistency issues
  private synchronized void updateLiveSockets() {
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;

    // Initialize my new map with the current map content
    Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
        new HashMap<>(this.liveSocketsByDatacenter.get());

    for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) { // LINE A
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
        boolean isLive = (status) ? true : false;
        // is there any problem the way I am using `SocketHolder` class?
        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(),
          Collections.unmodifiableList(liveUpdatedSockets));
    }
    this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
  }
}

As you can see in my class:

  • From a single background thread which runs every 30 seconds, I populate liveSocketsByDatacenter map with all the live sockets in updateLiveSockets() method.
  • And then from multiple threads, I call the getNextSocket() method to give me a live socket available which uses a liveSocketsByDatacenter map to get the required information.

I have my code working fine without any issues and wanted to see if there is any better or more efficient way to write this. I also wanted to get an opinion on thread safety issues or any race conditions if any are there, but so far I haven't seen any but I could be wrong.

I am mostly worried about updateLiveSockets() method and getLiveSocketX() method. I am iterating liveSockets which is a List of SocketHolder at LINE A and then making a new SocketHolder object and adding to another new list. Is this ok here?

Note: SocketHolder is an immutable class. And you can ignore ZeroMQ stuff I have.

解决方案

You use the following synchronization techniques.

  1. The map with live socket data is behind an atomic reference, this allows safely switching the map.
  2. The updateLiveSockets() method is synchronized (implicitly on this), this will prevent switching the map by two threads simultaneously.
  3. You make a local reference to the map when using it to avoid mixups if the switch happens during the getNextSocket() method.

Is it thread safe, as it is now?

Thread safety always hinges on whether there is proper synchronization on shared mutable data. In this case the shared mutable data is the map of datacenters to their list of SocketHolders.

The fact that the map is in an AtomicReference, and making a local copy for use is enough synchronization on the map. Your methods take a version of the map and use that, switching versions is thread safe due to the nature of AtomicReference. This could also have been achieved with just making the member field for the map volatile, as all you do is update the reference (you don't do any check-then-act operations on it).

As scheduleAtFixedRate() guarantees that the passed Runnable will not be run concurrently with itself, the synchronized on updateLiveSockets() is not needed, however, it also doesn't do any real harm.

So yes, this class is thread safe, as it is.

However, it's not entirely clear if a SocketHolder can be used by multiple threads simultaneously. As it is, this class just tries to minimize concurrent use of SocketHolders by picking a random live one (no need to shuffle the entire array to pick one random index though). It does nothing to actually prevent concurrent use.

Can it be made more efficient?

I believe it can. When looking at the updateLiveSockets() method, it seems it builds the exact same map, except that the SocketHolders may have different values for the isLive flag. This leads me to conclude that, rather than switching the entire map, i just want to switch each of the lists in the map. And for changing entries in a map in a thread safe manner, I can just use ConcurrentHashMap.

If I use a ConcurrentHashMap, and don't switch the map, but rather, the values in the map, I can get rid of the AtomicReference.

To change the mapping I can just build the new list and put it straight into the map. This is more efficient, as I publish data sooner, and I create fewer objects, while my synchronization just builds on ready made components, which benefits readability.

Here's my build (omitted some parts that were less relevant, for brevity)

public class SocketManager {
    private static final Random random = new Random();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap
    private final ZContext ctx = new ZContext();

    // ...

    private SocketManager() {
      connectToZMQSockets();
      scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
    }

    // during startup, making a connection and populate once
    private void connectToZMQSockets() {
      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
        List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
        liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map
      }
    }

    // ...      

    // this method will be called by multiple threads to get the next live socket
    // is there any concurrency or thread safety issue or race condition here?
    public Optional<SocketHolder> getNextSocket() {
      for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
        Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List<SocketHolder>
        if (liveSocket.isPresent()) {
          return liveSocket;
        }
      }
      return Optional.absent();
    }

    // is there any concurrency or thread safety issue or race condition here?
    private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
      if (!CollectionUtils.isEmpty(listOfEndPoints)) {
        // The list of live sockets
        List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
        for (SocketHolder obj : listOfEndPoints) {
          if (obj.isLive()) {
            liveOnly.add(obj);
          }
        }
        if (!liveOnly.isEmpty()) {
          // The list is not empty so we shuffle it an return the first element
          return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
        }
      }
      return Optional.absent();
    }

    // no need to make this synchronized
    private void updateLiveSockets() {
      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
        List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
        List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
        for (SocketHolder liveSocket : liveSockets) { // LINE A
          Socket socket = liveSocket.getSocket();
          String endpoint = liveSocket.getEndpoint();
          Map<byte[], byte[]> holder = populateMap();
          Message message = new Message(holder, Partition.COMMAND);

          boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
          boolean isLive = (status) ? true : false;

          SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
          liveUpdatedSockets.add(zmq);
        }
        liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner.
      }
    }

}

这篇关于同时阅读地图,而单个后台线程会定期修改它的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆