Uma fila de tamanho fixo sem espera e sem bloqueio, de produtor único, consumidor único, escrita em C++ 11. Esta implementação é mais rápida que boost::lockfree::spsc e folly::ProducerConsumerQueue .
SPSCQueue< int > q ( 1 );
auto t = std::thread([&] {
while (!q. front ());
std::cout << *q. front () << std::endl;
q. pop ();
});
q.push( 1 );
t.join();
Consulte src/SPSCQueueExample.cpp
para obter o exemplo completo.
SPSCQueue<T>(size_t capacity);
Crie um SPSCqueue
contendo itens do tipo T
com capacity
capacidade. A capacidade precisa ser de pelo menos 1.
void emplace(Args &&... args);
Enfileirar um item usando construção local. Bloqueia se a fila estiver cheia.
bool try_emplace(Args &&... args);
Tente enfileirar um item usando construção local. Retorna true
em caso de sucesso e false
se a fila estiver cheia.
void push(const T &v);
Enfileirar um item usando construção de cópia. Bloqueia se a fila estiver cheia.
template <typename P> void push(P &&v);
Enfileirar um item usando a construção de movimentação. Participa da resolução de sobrecarga somente se std::is_constructible<T, P&&>::value == true
. Bloqueia se a fila estiver cheia.
bool try_push(const T &v);
Tente enfileirar um item usando construção de cópia. Retorna true
em caso de sucesso e false
se a fila estiver cheia.
template <typename P> bool try_push(P &&v);
Tente enfileirar um item usando a construção de movimentos. Retorna true
em caso de sucesso e false
se a fila estiver cheia. Participa da resolução de sobrecarga somente se std::is_constructible<T, P&&>::value == true
.
T *front();
Retorna o ponteiro para o início da fila. Retorna nullptr
se a fila estiver vazia.
void pop();
Retire o primeiro item da fila. Você deve garantir que a fila não esteja vazia antes de chamar o pop. Isso significa que front()
deve ter retornado um valor não nullptr
antes de cada chamada para pop()
. Requer std::is_nothrow_destructible<T>::value == true
.
size_t size();
Retorne o número de itens disponíveis na fila.
bool empty();
Retorne verdadeiro se a fila estiver vazia no momento.
Apenas um único encadeamento gravador pode realizar operações de enfileiramento e apenas um único encadeamento leitor pode realizar operações de desenfileiramento. Qualquer outro uso é inválido.
Além de oferecer suporte à alocação personalizada por meio da interface padrão do alocador personalizado, esta biblioteca também oferece suporte à proposta padrão P0401R3. Fornece feedback de tamanho na interface do Allocator. Isso permite o uso conveniente de páginas enormes sem desperdiçar espaço alocado. O uso de feedback de tamanho só tem suporte quando o C++17 está habilitado.
A biblioteca atualmente não inclui um grande alocador de páginas, uma vez que as APIs para alocar páginas grandes dependem da plataforma e o manuseio de tamanhos de páginas grandes e o reconhecimento de NUMA é específico do aplicativo.
Abaixo está um exemplo de grande alocador de páginas para Linux:
# include < sys/mman.h >
template < typename T> struct Allocator {
using value_type = T;
struct AllocationResult {
T *ptr;
size_t count;
};
size_t roundup ( size_t n) { return (((n - 1 ) >> 21 ) + 1 ) << 21 ; }
AllocationResult allocate_at_least ( size_t n) {
size_t count = roundup ( sizeof (T) * n);
auto p = static_cast <T *>( mmap ( nullptr , count, PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB,
- 1 , 0 ));
if (p == MAP_FAILED) {
throw std::bad_alloc ();
}
return {p, count / sizeof (T)};
}
void deallocate (T *p, size_t n) { munmap (p, roundup ( sizeof (T) * n)); }
};
Consulte src/SPSCQueueExampleHugepages.cpp
para obter um exemplo completo de como usar páginas enormes no Linux.
A implementação subjacente é baseada em um buffer circular.
Foi tomado cuidado para evitar quaisquer problemas com compartilhamento falso. Os índices head e tail são alinhados e preenchidos com o falso intervalo de compartilhamento (tamanho da linha de cache). Além disso, o buffer de slots é preenchido com o intervalo de compartilhamento falso no início e no final, o que evita o compartilhamento falso com quaisquer alocações adjacentes.
Esta implementação tem um rendimento mais alto do que um buffer de anel simultâneo típico, armazenando em cache localmente os índices inicial e final no gravador e no leitor, respectivamente. O cache aumenta o rendimento reduzindo a quantidade de tráfego de coerência do cache.
Para entender como isso funciona, primeiro considere uma operação de leitura na ausência de cache: o índice principal (índice de leitura) precisa ser atualizado e, portanto, essa linha de cache é carregada no cache L1 em estado exclusivo. O final (índice de gravação) precisa ser lido para verificar se a fila não está vazia e, portanto, é carregada no cache L1 em estado compartilhado. Como uma operação de gravação de fila precisa ler o índice principal, é provável que uma operação de gravação exija algum tráfego de coerência de cache para trazer a linha de cache do índice principal de volta ao estado exclusivo. Na pior das hipóteses, haverá uma transição de linha de cache de compartilhada para exclusiva para cada operação de leitura e gravação.
Em seguida, considere um leitor de fila que armazena em cache o índice final: se o índice final armazenado em cache indicar que a fila está vazia, carregue o índice final no índice final armazenado em cache. Se a fila não estiver vazia, várias operações de leitura até que o índice final armazenado em cache possam ser concluídos sem roubar o estado exclusivo da linha de cache do índice final do gravador. O tráfego de coerência de cache é, portanto, reduzido. Um argumento análogo pode ser feito para a operação de gravação na fila.
Esta implementação permite a não potência arbitrária de duas capacidades, em vez disso aloca um slot de fila extra para indicar fila cheia. Se você não quiser desperdiçar armazenamento com um slot de fila extra, use uma implementação diferente.
Referências:
Testar algoritmos sem bloqueio é difícil. Estou usando duas abordagens para testar a implementação:
O benchmark de rendimento mede o rendimento entre 2 threads para uma fila de itens int
.
O benchmark de latência mede o tempo de ida e volta entre 2 threads se comunicando usando 2 filas de itens int
.
Resultados de benchmark para um processador AMD Ryzen 9 3900X de 12 núcleos, os 2 threads estão sendo executados em núcleos diferentes no mesmo chiplet:
Fila | Taxa de transferência (operações/ms) | Latência RTT (ns) |
---|---|---|
SPSCQueue | 362723 | 133 |
boost::lockfree::spsc | 209877 | 222 |
loucura::ProducerConsumerQueue | 148818 | 147 |
SPSCQueue foi citado pelos seguintes artigos:
Este projeto foi criado por Erik Rigtorp <[email protected]>.