Очередь фиксированного размера с одним производителем и одним потребителем, не требующая ожидания и блокировки, написанная на C++11. Эта реализация быстрее, чем boost::lockfree::spsc и folly::ProducerConsumerQueue .
SPSCQueue< int > q ( 1 );
auto t = std::thread([&] {
while (!q. front ());
std::cout << *q. front () << std::endl;
q. pop ();
});
q.push( 1 );
t.join();
Полный пример см. в src/SPSCQueueExample.cpp
.
SPSCQueue<T>(size_t capacity);
Создайте очередь SPSCqueue
содержащую элементы типа T
с capacity
. Емкость должна быть не менее 1.
void emplace(Args &&... args);
Поставьте элемент в очередь, используя конструкцию на месте. Блокируется, если очередь заполнена.
bool try_emplace(Args &&... args);
Попробуйте поставить элемент в очередь, используя конструкцию на месте. Возвращает true
в случае успеха и false
если очередь заполнена.
void push(const T &v);
Поставьте элемент в очередь, используя конструкцию копирования. Блокируется, если очередь заполнена.
template <typename P> void push(P &&v);
Поставьте элемент в очередь, используя конструкцию перемещения. Участвует в разрешении перегрузки, только если std::is_constructible<T, P&&>::value == true
. Блокируется, если очередь заполнена.
bool try_push(const T &v);
Попробуйте поставить элемент в очередь, используя конструкцию копирования. Возвращает true
в случае успеха и false
если очередь заполнена.
template <typename P> bool try_push(P &&v);
Попробуйте поставить элемент в очередь, используя конструкцию перемещения. Возвращает true
в случае успеха и false
если очередь заполнена. Участвует в разрешении перегрузки, только если std::is_constructible<T, P&&>::value == true
.
T *front();
Вернуть указатель на начало очереди. Возвращает nullptr
если очередь пуста.
void pop();
Удалить из очереди первый элемент очереди. Перед вызовом pop необходимо убедиться, что очередь не пуста. Это означает, что front()
должен возвращать значение, отличное от nullptr
перед каждым вызовом pop()
. Требуется std::is_nothrow_destructible<T>::value == true
.
size_t size();
Возвращает количество элементов, доступных в очереди.
bool empty();
Верните true, если очередь в данный момент пуста.
Только один поток записи может выполнять операции постановки в очередь, и только один поток чтения может выполнять операции удаления из очереди. Любое другое использование недопустимо.
Помимо поддержки пользовательского распределения через стандартный интерфейс пользовательского распределителя, эта библиотека также поддерживает стандартное предложение P0401R3. Обеспечение обратной связи по размеру в интерфейсе распределителя. Это позволяет удобно использовать огромные страницы, не тратя выделенное пространство. Использование обратной связи по размеру поддерживается только при включении C++17.
В настоящее время библиотека не включает в себя распределитель огромных страниц, поскольку API для выделения огромных страниц зависят от платформы, а обработка огромных размеров страниц и поддержка NUMA зависят от приложения.
Ниже приведен пример огромного распределителя страниц для Linux:
# include < sys/mman.h >
template < typename T> struct Allocator {
using value_type = T;
struct AllocationResult {
T *ptr;
size_t count;
};
size_t roundup ( size_t n) { return (((n - 1 ) >> 21 ) + 1 ) << 21 ; }
AllocationResult allocate_at_least ( size_t n) {
size_t count = roundup ( sizeof (T) * n);
auto p = static_cast <T *>( mmap ( nullptr , count, PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB,
- 1 , 0 ));
if (p == MAP_FAILED) {
throw std::bad_alloc ();
}
return {p, count / sizeof (T)};
}
void deallocate (T *p, size_t n) { munmap (p, roundup ( sizeof (T) * n)); }
};
См. src/SPSCQueueExampleHugepages.cpp
для получения полного примера использования огромных страниц в Linux.
Базовая реализация основана на кольцевом буфере.
Были приняты меры, чтобы избежать проблем с ложным обменом информацией. Индексы заголовка и хвоста выравниваются и дополняются до ложного диапазона совместного использования (размера строки кэша). Кроме того, буфер слотов дополняется диапазоном ложного совместного использования в начале и конце, что предотвращает ложное совместное использование с любыми соседними выделениями.
Эта реализация имеет более высокую пропускную способность, чем типичный параллельный кольцевой буфер, за счет локального кэширования индексов начала и конца в записывающем и считывающем устройствах соответственно. Кэширование увеличивает пропускную способность за счет уменьшения объема трафика когерентности кэша.
Чтобы понять, как это работает, сначала рассмотрим операцию чтения при отсутствии кэширования: необходимо обновить индекс головки (индекс чтения), и, таким образом, эта строка кэша загружается в кэш L1 в эксклюзивном состоянии. Хвост (индекс записи) необходимо прочитать, чтобы убедиться, что очередь не пуста и, таким образом, загружена в кэш L1 в общем состоянии. Поскольку операция записи в очередь требует чтения головного индекса, вполне вероятно, что операция записи потребует некоторого трафика когерентности кэша, чтобы вернуть строку кэша головного индекса обратно в эксклюзивное состояние. В худшем случае для каждой операции чтения и записи будет одна строка кэша, переходящая из общей в монопольную.
Далее рассмотрим устройство чтения очереди, которое кэширует хвостовой индекс: если кэшированный хвостовой индекс указывает, что очередь пуста, загрузите хвостовой индекс в кэшированный хвостовой индекс. Если очередь была непустой, несколько операций чтения до тех пор, пока кэшированный хвостовой индекс не сможет завершиться без кражи эксклюзивного состояния строки кэша хвостового индекса записывающего устройства. Таким образом, трафик согласованности кэша снижается. Аналогичный аргумент можно привести и для операции записи в очередь.
Эта реализация допускает произвольную не мощность двух мощностей, вместо этого выделяя дополнительный слот очереди для обозначения полной очереди. Если вы не хотите тратить память на дополнительный слот очереди, вам следует использовать другую реализацию.
Ссылки:
Тестировать алгоритмы без блокировок сложно. Я использую два подхода для тестирования реализации:
Тест пропускной способности измеряет пропускную способность между двумя потоками для очереди элементов int
.
Тест задержки измеряет время прохождения туда и обратно между двумя потоками, взаимодействующими с использованием двух очередей элементов int
.
Результаты тестирования 12-ядерного процессора AMD Ryzen 9 3900X: два потока выполняются на разных ядрах одного и того же чиплета:
Очередь | Пропускная способность (операций/мс) | Задержка RTT (нс) |
---|---|---|
SPSCQueue | 362723 | 133 |
boost::lockfree::spsc | 209877 | 222 |
глупость::ProducerConsumerQueue | 148818 | 147 |
SPSCQueue цитируется в следующих статьях:
Этот проект был создан Эриком Ригторпом <[email protected]>.