无锁堆栈实现想法-目前已中断 [英] Lock Free stack implementation idea - currently broken

查看:83
本文介绍了无锁堆栈实现想法-目前已中断的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想出了一个想法,我试图实现一个无锁堆栈,该堆栈不依赖引用计数来解决ABA问题,并且还可以正确处理内存回收.它在概念上与RCU相似,并且依赖于两个功能:将列表条目标记为已删除,以及跟踪遍历该列表的读者.前者很简单,它只使用指针的LSB.后者是我聪明"地尝试实现一种无界无锁堆栈的方法.

I came up with an idea I am trying to implement for a lock free stack that does not rely on reference counting to resolve the ABA problem, and also handles memory reclamation properly. It is similar in concept to RCU, and relies on two features: marking a list entry as removed, and tracking readers traversing the list. The former is simple, it just uses the LSB of the pointer. The latter is my "clever" attempt at an approach to implementing an unbounded lock free stack.

基本上,当任何线程尝试遍历列表时,一个原子计数器(list.entries)都会增加.遍历完成后,第二个计数器(list.exits)将增加.

Basically, when any thread attempts to traverse the list, one atomic counter (list.entries) is incremented. When the traversal is complete, a second counter (list.exits) is incremented.

节点分配由推处理,而释放则由pop处理.

Node allocation is handled by push, and deallocation is handled by pop.

push和pop操作与朴素的无锁堆栈实现非常相似,但是必须遍历标记为要删除的节点才能到达未标记的条目.因此,基本上,推送"非常类似于链接列表的插入.

The push and pop operations are fairly similar to the naive lock-free stack implementation, but the nodes marked for removal must be traversed to arrive at a non-marked entry. Push basically is therefore much like a linked list insertion.

pop操作类似地遍历列表,但是它使用atomic_fetch_or在遍历时将节点标记为已删除,直到到达未标记的节点.

The pop operation similarly traverses the list, but it uses atomic_fetch_or to mark the nodes as removed while traversing, until it reaches a non-marked node.

遍历0个或更多标记节点的列表后,正在弹出的线程将尝试将CAS的头部设为CAS.至少有一个并发弹出的线程将成功,并且此后所有进入堆栈的读取器将不再看到以前标记的节点.

After traversing the list of 0 or more marked nodes, a thread that is popping will attempt to CAS the head of the stack. At least one thread concurrently popping will succeed, and after this point all readers entering the stack will no longer see the formerly marked nodes.

成功更新列表的线程然后加载atomic list.entries,并基本上旋转加载atomic.exits,直到该计数器最终超过list.entries.这应该意味着该列表的旧"版本的所有读者都已完成.然后,该线程只需释放它从列表顶部交换过来的标记节点的列表即可.

The thread that successfully updates the list then loads the atomic list.entries, and basically spin-loads atomic.exits until that counter finally exceeds list.entries. This should imply that all readers of the "old" version of the list have completed. The thread then simply frees the the list of marked nodes that it swapped off the top of the list.

所以pop操作的含义应该是(我认为)不会有ABA问题,因为在所有使用它们的并发读取器都已完成之前,被释放的节点不会返回到可用的指针池中.出于相同的原因,也可以处理内存回收问题.

So the implications from the pop operation should be (I think) that there can be no ABA problem because the nodes that are freed are not returned to the usable pool of pointers until all concurrent readers using them have completed, and obviously the memory reclamation issue is handled as well, for the same reason.

所以无论如何,这是理论上的问题,但是我仍然会抓紧实现,因为它目前不起作用(在多线程情况下).似乎我在免费问题之后就得到了一些写作,但是我在发现问题时遇到了麻烦,或者我的假设有缺陷并且根本行不通.

So anyhow, that is theory, but I'm still scratching my head on the implementation, because it is currently not working (in the multithreaded case). It seems like I am getting some write after free issues among other things, but I'm having trouble spotting the issue, or maybe my assumptions are flawed and it just won't work.

无论是在概念上,还是在调试代码的方法上,任何见识都将不胜感激.

Any insights would be greatly appreciated, both on the concept, and on approaches to debugging the code.

这是我当前的(损坏的)代码(使用gcc -D_GNU_SOURCE -std = c11 -Wall -O0 -g -pthread -o list list.c进行编译):

Here is my current (broken) code (compile with gcc -D_GNU_SOURCE -std=c11 -Wall -O0 -g -pthread -o list list.c):

#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>

#include <sys/resource.h>

#include <stdio.h>
#include <unistd.h>

#define NUM_THREADS 8
#define NUM_OPS (1024 * 1024)

typedef uint64_t list_data_t;

typedef struct list_node_t {
    struct list_node_t * _Atomic next;
    list_data_t data;
} list_node_t;

typedef struct {
    list_node_t * _Atomic head;
    int64_t _Atomic size;
    uint64_t _Atomic entries;
    uint64_t _Atomic exits;
} list_t;

enum {
    NODE_IDLE    = (0x0),
    NODE_REMOVED = (0x1 << 0),
    NODE_FREED   = (0x1 << 1),
    NODE_FLAGS    = (0x3),
};

static __thread struct {
    uint64_t add_count;
    uint64_t remove_count;
    uint64_t added;
    uint64_t removed;
    uint64_t mallocd;
    uint64_t freed;
} stats;

#define NODE_IS_SET(p, f) (((uintptr_t)p & f) == f)
#define NODE_SET_FLAG(p, f) ((void *)((uintptr_t)p | f))
#define NODE_CLR_FLAG(p, f) ((void *)((uintptr_t)p & ~f))
#define NODE_POINTER(p) ((void *)((uintptr_t)p & ~NODE_FLAGS))

list_node_t * list_node_new(list_data_t data)
{
    list_node_t * new = malloc(sizeof(*new));
    new->data = data;
    stats.mallocd++;

    return new;
}

void list_node_free(list_node_t * node)
{
    free(node);
    stats.freed++;
}

static void list_add(list_t * list, list_data_t data)
{
    atomic_fetch_add_explicit(&list->entries, 1, memory_order_seq_cst);

    list_node_t * new = list_node_new(data);
    list_node_t * _Atomic * next = &list->head;
    list_node_t * current = atomic_load_explicit(next,  memory_order_seq_cst);
    do
    {
        stats.add_count++;
        while ((NODE_POINTER(current) != NULL) &&
                NODE_IS_SET(current, NODE_REMOVED))
        {
                stats.add_count++;
                current = NODE_POINTER(current);
                next = &current->next;
                current = atomic_load_explicit(next, memory_order_seq_cst);
        }
        atomic_store_explicit(&new->next, current, memory_order_seq_cst);
    }
    while(!atomic_compare_exchange_weak_explicit(
            next, &current, new,
            memory_order_seq_cst, memory_order_seq_cst));

    atomic_fetch_add_explicit(&list->exits, 1, memory_order_seq_cst);
    atomic_fetch_add_explicit(&list->size, 1, memory_order_seq_cst);
    stats.added++;
}

static bool list_remove(list_t * list, list_data_t * pData)
{
    uint64_t entries = atomic_fetch_add_explicit(
            &list->entries, 1, memory_order_seq_cst);

    list_node_t * start = atomic_fetch_or_explicit(
            &list->head, NODE_REMOVED, memory_order_seq_cst);
    list_node_t * current = start;

    stats.remove_count++;
    while ((NODE_POINTER(current) != NULL) &&
            NODE_IS_SET(current, NODE_REMOVED))
    {
        stats.remove_count++;
        current = NODE_POINTER(current);
        current = atomic_fetch_or_explicit(&current->next,
                NODE_REMOVED, memory_order_seq_cst);
    }

    uint64_t exits = atomic_fetch_add_explicit(
            &list->exits, 1, memory_order_seq_cst) + 1;

    bool result = false;
    current = NODE_POINTER(current);
    if (current != NULL)
    {
        result = true;
        *pData = current->data;

        current = atomic_load_explicit(
                &current->next, memory_order_seq_cst);

        atomic_fetch_add_explicit(&list->size,
                -1, memory_order_seq_cst);

        stats.removed++;
    }

    start = NODE_SET_FLAG(start, NODE_REMOVED);
    if (atomic_compare_exchange_strong_explicit(
            &list->head, &start, current,
            memory_order_seq_cst, memory_order_seq_cst))
    {
        entries = atomic_load_explicit(&list->entries, memory_order_seq_cst);
        while ((int64_t)(entries - exits) > 0)
        {
            pthread_yield();
            exits = atomic_load_explicit(&list->exits, memory_order_seq_cst);
        }

        list_node_t * end = NODE_POINTER(current);
        list_node_t * current = NODE_POINTER(start);
        while (current != end)
        {
            list_node_t * tmp = current;
            current = atomic_load_explicit(&current->next, memory_order_seq_cst);
            list_node_free(tmp);
            current = NODE_POINTER(current);
        }
    }

    return result;
}

static list_t list;

pthread_mutex_t ioLock = PTHREAD_MUTEX_INITIALIZER;

void * thread_entry(void * arg)
{
    sleep(2);
    int id = *(int *)arg;

    for (int i = 0; i < NUM_OPS; i++)
    {
        bool insert = random() % 2;

        if (insert)
        {
            list_add(&list, i);
        }
        else
        {
            list_data_t data;
            list_remove(&list, &data);
        }
    }

    struct rusage u;
    getrusage(RUSAGE_THREAD, &u);

    pthread_mutex_lock(&ioLock);
    printf("Thread %d stats:\n", id);
    printf("\tadded = %lu\n", stats.added);
    printf("\tremoved = %lu\n", stats.removed);
    printf("\ttotal added = %ld\n", (int64_t)(stats.added - stats.removed));
    printf("\tadded count = %lu\n", stats.add_count);
    printf("\tremoved count = %lu\n", stats.remove_count);
    printf("\tadd average = %f\n", (float)stats.add_count / stats.added);
    printf("\tremove average = %f\n", (float)stats.remove_count / stats.removed);
    printf("\tmallocd = %lu\n", stats.mallocd);
    printf("\tfreed = %lu\n", stats.freed);
    printf("\ttotal mallocd = %ld\n", (int64_t)(stats.mallocd - stats.freed));
    printf("\tutime = %f\n", u.ru_utime.tv_sec
            + u.ru_utime.tv_usec / 1000000.0f);
    printf("\tstime = %f\n", u.ru_stime.tv_sec
                    + u.ru_stime.tv_usec / 1000000.0f);
    pthread_mutex_unlock(&ioLock);

    return NULL;
}

int main(int argc, char ** argv)
{
    struct {
            pthread_t thread;
            int id;
    }
    threads[NUM_THREADS];
    for (int i = 0; i < NUM_THREADS; i++)
    {
        threads[i].id = i;
        pthread_create(&threads[i].thread, NULL, thread_entry, &threads[i].id);
    }

    for (int i = 0; i < NUM_THREADS; i++)
    {
        pthread_join(threads[i].thread, NULL);
    }

    printf("Size = %ld\n", atomic_load(&list.size));

    uint32_t count = 0;

    list_data_t data;
    while(list_remove(&list, &data))
    {
        count++;
    }
    printf("Removed %u\n", count);
}

推荐答案

您提到您正在尝试解决ABA问题,但是描述和代码实际上是在尝试解决更难的问题:

You mention you are trying to solve the ABA problem, but the description and code is actually an attempt to solve a harder problem: the memory reclamation problem.

此问题通常出现在以无垃圾回收的语言实现的无锁集合的删除"功能中.核心问题是,从共享结构中删除节点的线程通常不知道何时可以安全地释放已删除的节点,因为其他读取可能仍会引用该节点.经常解决此问题的副作用是, 也解决了ABA问题:这特别是与CAS操作成功有关的,即使基础指针(和对象的状态)已被更改了至少两次.同时,以原始的 value 结束,但呈现出完全不同的状态.

This problem typically arises in the "deletion" functionality of lock-free collections implemented in languages without garbage collection. The core issue is that a thread removing a node from a shared structure often doesn't know when it is safe to free the removed node as because other reads may still have a reference to it. Solving this problem often, as a side effect, also solves the ABA problem: which is specifically about a CAS operation succeeding even though the underlying pointer (and state of the object) has been been changed at least twice in the meantime, ending up with the original value but presenting a totally different state.

就ABA问题而言,有几种直接解决方案,特别是不会导致解决内存回收"问题的解决方案,因此ABA问题更容易解决.从某种意义上说,可以检测到位置修改(例如使用LL/SC或事务性存储原语)的硬件可能根本不会出现此问题.

The ABA problem is easier in the sense that there are several straightforward solutions to the ABA problem specifically that don't lead to a solution to the "memory reclamation" problem. It is also easier in the sense that hardware that can detect the modification of the location, e.g., with LL/SC or transactional memory primitives, might not exhibit the problem at all.

也就是说,您正在寻找一种解决内存回收问题的方法,它还将避免ABA问题.

So that said, you are hunting for a solution to the memory reclamation problem, and it will also avoid the ABA problem.

问题的核心是以下陈述:

The core of your issue is this statement:

成功更新列表的线程然后加载原子 list.entries,并基本上旋转加载atomic.exits直到该计数器 最终超过了list.entries. 这意味着所有的读者 列表的旧"版本已经完成.然后,该线程简单地 释放从其顶部交换过来的标记节点的列表 列表.

The thread that successfully updates the list then loads the atomic list.entries, and basically spin-loads atomic.exits until that counter finally exceeds list.entries. This should imply that all readers of the "old" version of the list have completed. The thread then simply frees the the list of marked nodes that it swapped off the top of the list.

此逻辑不成立.等待list.exits(您说 atomic.exits ,但我认为这是一个错字,因为您仅在其他地方谈论过list.exits)大于list.entries仅告诉您现在已经有 条目总数> 的总退出数量.但是,这些退出可能是由新读者来来往往所引起的:这并不意味着您所声称的所有旧读者都已完成

This logic doesn't hold. Waiting for list.exits (you say atomic.exits but I think it's a typo as you only talk about list.exits elsewhere) to be greater than list.entries only tells you there have now been more total exits than there were entries at the point the mutating thread captured the entry count. However, these exits may have been generated by new readers coming and going: it doesn't at all imply that all the old readers have finished as you claim!

这是一个简单的例子.首先,写线程T1和读线程T2大约同时访问列表,因此list.entries为2,而list.exits为0.写线程弹出一个节点,并保存当前值(2) list.entries的值,并等待lists.exits大于2.现在又有三个读取线程T3T4T5到达并快速读取列表并离开.现在lists.exits为3,并且您的条件得到满足,并且T1释放了该节点. T2却什么也没走,因为它正在读取释放的节点而炸毁了!

Here's a simple example. First a writing thread T1 and a reading thread T2 access the list around the same time, so list.entries is 2 and list.exits is 0. The writing thread pops an node, and saves the current value (2) of list.entries and waits for lists.exits to be greater than 2. Now three more reading threads, T3, T4, T5 arrive and do a quick read of the list and leave. Now lists.exits is 3, and your condition is met and T1 frees the node. T2 hasn't gone anywhere though and blows up since it is reading a freed node!

您拥有的基本想法是可行的,但是您的两种对立方式绝对是行不通的.

The basic idea you have can work, but your two counter approach in particular definitely doesn't work.

这是一个经过充分研究的问题,因此您不必发明自己的算法(请参见上面的链接),甚至不必编写自己的代码,因为 concurrencykit 已经存在.

This is a well-studied problem, so you don't have to invent your own algorithm (see the link above), or even write your own code since things like librcu and concurrencykit already exist.

但是,如果您想要将其用于教育目的,一种方法是使用确保修改开始后进入的线程使用另一组list.entry/exit计数器.一种实现方式是生成计数器,当编写者想要修改列表时,它会增加生成计数器,这会使新的读者切换到另一组list.entry/exit计数器.

If you wanted to make this work for educational purposes though, one approach would be to use ensure that threads coming in after a modification have started use a different set of list.entry/exit counters. One way to do this would be a generation counter, and when the writer wants to modify the list, it increments the generation counter, which causes new readers to switch to a different set of list.entry/exit counters.

现在,编写者只需要等待list.entry[old] == list.exists[old],这意味着所有 old 读者都已离开.您也可以每代只使用一个计数器:您实际上并不需要两个entry/exit计数器(尽管这可能有助于减少争用).

Now the writer just has to wait for list.entry[old] == list.exists[old], which means all the old readers have left. You could also just get away with a single counter per generation: you don't really two entry/exit counters (although it might help reduce contention).

当然,您知道还有一个新问题,即需要管理每个世代的独立计数器列表……这看起来像是构建无锁列表的原始问题!但是,此问题要容易一些,因为您可以对运行中"的世代数进行合理的限制,然后将它们全部预先分配,或者可以实现有限类型的无锁列表,这更易于推理因为添加和删除仅发生在头部或尾部.

Of course, you know have a new problem of managing this list of separate counters per generation... which kind of looks like the original problem of building a lock-free list! This problem is a bit easier though because you might put some reasonable bound on the number of generations "in flight" and just allocate them all up-front, or you might implement a limited type of lock-free list that is easier to reason about because additions and deletions only occur at the head or tail.

这篇关于无锁堆栈实现想法-目前已中断的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆