前段时间关注到disruptor,一个高并发框架。能够在无锁(lock-free)的情况下处理多生产者消费者的并发问题。它可以看作一个消息队列,通过CAS而不是锁来处理并发。
因此实现了一个C++版本的disruptor,基于ring buffer,实现一个发送缓冲(多生产者,单消费者)。
写入缓冲
某个生产者要写入数据时,先申请所需空间(需要共享当前分配位置),然后直接执行写入,最后提交写入结果(需要共享当前写入位置)。整个写入过程由两个关键共享变量: atomic_ullong _alloc_count
和atomic_ullong _write_count
。前者负责管理和同步当前分配的空间,后者负责同步当前已经写入的空间。也就是说,整个过程分为三步:申请,写入,提交。
比如,有两个生产者P1和P2。P1申请到大小为50的空间,假设此时_alloc_count=10,那么P1将得到可写入位置10,此时_alloc_count更新为60。P1此时可以执行写入(无需上锁)。这个时候P2开始申请大小为10的空间,它将得到写入位置60,_alloc_count更新为70。因此实际上P1和P2是可以并发写的。如果P2比P1先写完,它会尝试提交,此时由于P1还没有提交它的写入结果,因此P2会自旋等待(不断尝试CAS操作)。直到P1提交写入结果后,P2才能提交。通过CAS可以保证这种提交顺序。提交操作会更新_write_count变量,提交之后的数据便可以被消费者读取使用。
上面的描述并没有提到缓冲区不够的问题,为了判断缓冲区当前可写空间,还需要一个变量 atomic_ullong _idle_count
用于记录当前缓冲区空闲大小。该变量在生产者申请空间后减小,在消费者使用数据后变大。初始等于整个ring buffer的大小。
核心代码
1 | SendBuffer::SendBuffer(size_t capacity /* = 65536 */) |
代码看起来不多,理解起来也不难。主要有以下三点:
1. 对原子变量的访问
对原子变量的使用要特别小心,由于没有锁的保护,对原子变量的每一次访问都要考虑到它的值已经改变。比如在Push函数的申请空间操作中,你不能通过
1 | if(_idle_count > len) |
来判断空闲空间是否足够,因为在if中它可能大于len,但是当你执行_idle_count.fetch_sub(len)
时,它的值可能就改变了,不再满足 > len。同理以下代码也是错的:
1 | _idle_count.fetch_sub(len); |
对原子变量的访问应该做到”原子性”,即每次逻辑上使用,都只访问一次。这也是和传统锁不一样的地方。而引进_idle_count这个原子变量而不是使用_read_count和_alloc_count来算出空闲空间(_capacity-(_alloc_count-_read_count)
)也是基于这个原因,多个生产者依赖于这个表达式的值,并且会对表达式的值造成更改(修改_alloc_count),就会导致P1读取表达式值后,判断空闲空间足够,在P1更改_alloc_count前,P2生产者更改_alloc_count分配了空间,使得空闲空间已经不足。这种读写分步的操作必须通过原子变量来保证访问的一致性。
而为什么我们在Peek中可以通过_write_count - _read_count
来得到当前可读数据,是因为我们只有一个消费者依赖于_write_count - _read_count
的值,并且其它生产者对_write_count做出的更改对消费者来说是”无害的”,即生产者只会使_write_count增加,让消费者读到更多的数据。
2. 通过CAS保证顺序提交
在Push函数中的第三步提交中,生产者自旋等待,直到它前面(按照申请顺序)的所有生产者都已提交完毕,此时_write_count即为本生产者的写入位置alloc_start,代表alloc_start之前的缓冲区都已经提交完成,此时该你提交写入结果了。提交完成之后,更新_write_count,而消费者则根据_write_count来判断哪些内容是可读的。
3. 单消费者无需原子变量
最后,由于只有一个消费者,因此_read_count不是原子变量。它只会在Peek和Pop中读取和修改。
源码地址:https://github.com/wudaijun/Code/tree/master/Demo/disruptor