12#ifndef ROC_CORE_MPSC_QUEUE_H_
13#define ROC_CORE_MPSC_QUEUE_H_
15#include "roc_core/atomic_ops.h"
39template <
class T,
template <
class TT>
class OwnershipPolicy = RefCountedOwnership>
45 typedef typename OwnershipPolicy<T>::Pointer
Pointer;
74 OwnershipPolicy<T>::acquire(obj);
78 change_owner_(node, NULL,
this);
100 change_owner_(node,
this, NULL);
103 OwnershipPolicy<T>::release(*obj);
125 change_owner_(node,
this, NULL);
128 OwnershipPolicy<T>::release(*obj);
136 void change_owner_(MpscQueueData* node,
void* from,
void* to) {
139 roc_panic(
"mpsc queue: unexpected node owner: from=%p to=%p cur=%p", from, to,
144 void push_node_(MpscQueueData* node) {
152 template <
bool CanSpin> MpscQueueData* pop_node_() {
156 if (head == &stub_) {
164 if (!(next = (CanSpin ? wait_next_(head) : try_wait_next_(head)))) {
187 if (!(next = (CanSpin ? wait_next_(head) : try_wait_next_(head)))) {
205 MpscQueueData* wait_next_(MpscQueueData* node) {
206 if (MpscQueueData* next = try_wait_next_(node)) {
221 MpscQueueData* try_wait_next_(MpscQueueData* node) {
235 MpscQueueData* tail_;
236 MpscQueueData* head_;
static void store_relaxed(T1 &var, T2 val)
Atomic store (no barrier).
static T1 exchange_seq_cst(T1 &var, T2 val)
Atomic exchange (full barrier).
static T load_relaxed(const T &var)
Atomic load (no barrier).
static T load_acquire(const T &var)
Atomic load (acquire barrier).
static void store_release(T1 &var, T2 val)
Atomic store (release barrier).
static bool compare_exchange_relaxed(T1 &var, T1 &exp, T2 des)
Atomic compare-and-swap (no barrier).
static T load_seq_cst(const T &var)
Atomic load (full barrier).
Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
void push_back(T &obj)
Add object to the end of the queue. Can be called concurrently. Acquires ownership of obj....
Pointer pop_front_exclusive()
Remove object from the beginning of the queue (blocking version). Should NOT be called concurrently....
Pointer try_pop_front_exclusive()
Try to remove object from the beginning of the queue (non-blocking version). Should NOT be called con...
OwnershipPolicy< T >::Pointer Pointer
Pointer type.
Base class for non-copyable objects.
CPU-specific instructions.
void cpu_relax()
CPU pause instruction.
#define roc_panic(...)
Print error message and terminate program gracefully.
MpscQueueNode * container_of()
Get MpscQueueNode object that contains this ListData object.