前段时间关注到disruptor,一个高并发框架。能够在无锁(lock-free)的情况下处理多生产者消费者的并发问题。它可以看作一个消息队列,通过CAS而不是锁来处理并发。
因此实现了一个C++版本的disruptor,基于ring buffer,实现一个发送缓冲(多生产者,单消费者)。
写入缓冲
某个生产者要写入数据时,先申请所需空间(需要共享当前分配位置),然后直接执行写入,最后提交写入结果(需要共享当前写入位置)。整个写入过程由两个关键共享变量: atomic_ullong _alloc_count
和atomic_ullong _write_count
。前者负责管理和同步当前分配的空间,后者负责同步当前已经写入的空间。也就是说,整个过程分为三步:申请,写入,提交。
比如,有两个生产者P1和P2。P1申请到大小为50的空间,假设此时_alloc_count=10,那么P1将得到可写入位置10,此时_alloc_count更新为60。P1此时可以执行写入(无需上锁)。这个时候P2开始申请大小为10的空间,它将得到写入位置60,_alloc_count更新为70。因此实际上P1和P2是可以并发写的。如果P2比P1先写完,它会尝试提交,此时由于P1还没有提交它的写入结果,因此P2会自旋等待(不断尝试CAS操作)。直到P1提交写入结果后,P2才能提交。通过CAS可以保证这种提交顺序。提交操作会更新_write_count变量,提交之后的数据便可以被消费者读取使用。
上面的描述并没有提到缓冲区不够的问题,为了判断缓冲区当前可写空间,还需要一个变量 atomic_ullong _idle_count
用于记录当前缓冲区空闲大小。该变量在生产者申请空间后减小,在消费者使用数据后变大。初始等于整个ring buffer的大小。