Une file d'attente de taille fixe sans attente et sans verrouillage, à producteur unique et à consommateur unique, écrite en C++11. Cette implémentation est plus rapide que boost::lockfree::spsc et folly::ProducerConsumerQueue .
SPSCQueue< int > q ( 1 );
auto t = std::thread([&] {
while (!q. front ());
std::cout << *q. front () << std::endl;
q. pop ();
});
q.push( 1 );
t.join();
Voir src/SPSCQueueExample.cpp
pour l'exemple complet.
SPSCQueue<T>(size_t capacity);
Créez une file d'attente SPSCqueue
contenant des éléments de type T
avec une capacity
capacité . La capacité doit être d'au moins 1.
void emplace(Args &&... args);
Mettez un élément en file d’attente à l’aide de la construction sur place. Bloque si la file d'attente est pleine.
bool try_emplace(Args &&... args);
Essayez de mettre un élément en file d'attente en utilisant la construction sur place. Renvoie true
en cas de succès et false
si la file d'attente est pleine.
void push(const T &v);
Mettez un élément en file d’attente à l’aide de la construction par copie. Bloque si la file d'attente est pleine.
template <typename P> void push(P &&v);
Mettez un élément en file d’attente en utilisant la construction de déplacement. Participe à la résolution de surcharge uniquement si std::is_constructible<T, P&&>::value == true
. Bloque si la file d'attente est pleine.
bool try_push(const T &v);
Essayez de mettre un élément en file d'attente en utilisant la construction par copie. Renvoie true
en cas de succès et false
si la file d'attente est pleine.
template <typename P> bool try_push(P &&v);
Essayez de mettre un élément en file d'attente en utilisant la construction de déplacement. Renvoie true
en cas de succès et false
si la file d'attente est pleine. Participe à la résolution de surcharge uniquement si std::is_constructible<T, P&&>::value == true
.
T *front();
Renvoie le pointeur au début de la file d’attente. Renvoie nullptr
si la file d'attente est vide.
void pop();
Supprimer le premier élément de la file d'attente. Vous devez vous assurer que la file d'attente n'est pas vide avant d'appeler pop. Cela signifie que front()
doit avoir renvoyé un non- nullptr
avant chaque appel à pop()
. Nécessite std::is_nothrow_destructible<T>::value == true
.
size_t size();
Renvoie le nombre d'éléments disponibles dans la file d'attente.
bool empty();
Renvoie vrai si la file d’attente est actuellement vide.
Un seul thread d'écriture peut effectuer des opérations de mise en file d'attente et un seul thread de lecteur peut effectuer des opérations de retrait de la file d'attente. Toute autre utilisation est invalide.
En plus de prendre en charge l'allocation personnalisée via l'interface d'allocation personnalisée standard, cette bibliothèque prend également en charge la proposition standard P0401R3 fournissant des informations sur la taille dans l'interface d'allocation. Cela permet une utilisation pratique de pages volumineuses sans gaspiller l’espace alloué. L’utilisation du retour de taille n’est prise en charge que lorsque C++17 est activé.
La bibliothèque n'inclut actuellement pas d'allocateur de pages énormes, car les API permettant d'allouer des pages énormes dépendent de la plate-forme et la gestion des pages de grande taille et la prise en compte de NUMA sont spécifiques à l'application.
Vous trouverez ci-dessous un exemple d'allocateur de pages géantes pour Linux :
# include < sys/mman.h >
template < typename T> struct Allocator {
using value_type = T;
struct AllocationResult {
T *ptr;
size_t count;
};
size_t roundup ( size_t n) { return (((n - 1 ) >> 21 ) + 1 ) << 21 ; }
AllocationResult allocate_at_least ( size_t n) {
size_t count = roundup ( sizeof (T) * n);
auto p = static_cast <T *>( mmap ( nullptr , count, PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB,
- 1 , 0 ));
if (p == MAP_FAILED) {
throw std::bad_alloc ();
}
return {p, count / sizeof (T)};
}
void deallocate (T *p, size_t n) { munmap (p, roundup ( sizeof (T) * n)); }
};
Voir src/SPSCQueueExampleHugepages.cpp
pour l'exemple complet sur la façon d'utiliser des pages volumineuses sous Linux.
L'implémentation sous-jacente est basée sur un tampon en anneau.
Des précautions ont été prises pour éviter tout problème de faux partage. Les indices de tête et de queue sont alignés et complétés selon la plage de faux partage (taille de la ligne de cache). De plus, le tampon des emplacements est complété par la plage de faux partage au début et à la fin, ce qui empêche un faux partage avec des allocations adjacentes.
Cette implémentation a un débit plus élevé qu'un tampon en anneau concurrent typique en mettant localement en cache les index de tête et de queue dans l'enregistreur et le lecteur respectivement. La mise en cache augmente le débit en réduisant la quantité de trafic de cohérence du cache.
Pour comprendre comment cela fonctionne, considérons d'abord une opération de lecture en absence de mise en cache : l'index principal (index de lecture) doit être mis à jour et donc cette ligne de cache est chargée dans le cache L1 dans un état exclusif. La queue (index d'écriture) doit être lue afin de vérifier que la file d'attente n'est pas vide et est donc chargée dans le cache L1 en état partagé. Étant donné qu'une opération d'écriture en file d'attente doit lire l'index principal, il est probable qu'une opération d'écriture nécessite un certain trafic de cohérence du cache pour ramener la ligne de cache de l'index principal dans un état exclusif. Dans le pire des cas, il y aura une transition de ligne de cache de partagée à exclusive pour chaque opération de lecture et d'écriture.
Considérons ensuite un lecteur de file d'attente qui met en cache l'index de queue : si l'index de queue mis en cache indique que la file d'attente est vide, chargez l'index de queue dans l'index de queue mis en cache. Si la file d'attente n'était pas vide, plusieurs opérations de lecture jusqu'à ce que l'index de queue mis en cache puisse se terminer sans voler l'état exclusif de la ligne de cache d'index de queue de l'écrivain. Le trafic de cohérence du cache est donc réduit. Un argument analogue peut être avancé pour l’opération d’écriture en file d’attente.
Cette implémentation permet une non-alimentation arbitraire de deux capacités, en allouant à la place un emplacement de file d'attente supplémentaire pour indiquer une file d'attente pleine. Si vous ne souhaitez pas gaspiller de stockage pour un emplacement de file d'attente supplémentaire, vous devez utiliser une implémentation différente.
Références :
Tester des algorithmes sans verrouillage est difficile. J'utilise deux approches pour tester l'implémentation :
Le benchmark de débit mesure le débit entre 2 threads pour une file d'attente d'éléments int
.
Le test de latence mesure le temps d'aller-retour entre 2 threads communiquant à l'aide de 2 files d'attente d'éléments int
.
Résultats de référence pour un processeur AMD Ryzen 9 3900X 12 cœurs, les 2 threads s'exécutent sur des cœurs différents sur le même chipset :
File d'attente | Débit (opérations/ms) | Latence RTT (ns) |
---|---|---|
File d'attente SPSC | 362723 | 133 |
boost :: lockfree :: spsc | 209877 | 222 |
folie :: ProducerConsumerQueue | 148818 | 147 |
SPSCQueue a été cité par les articles suivants :
Ce projet a été créé par Erik Rigtorp <[email protected]>.