Eine in C++11 geschriebene wartefreie und sperrenfreie Warteschlange mit fester Größe für einen einzelnen Produzenten und einen einzelnen Verbraucher. Diese Implementierung ist schneller als sowohl boost::lockfree::spsc als auch folly::ProducerConsumerQueue .
SPSCQueue< int > q ( 1 );
auto t = std::thread([&] {
while (!q. front ());
std::cout << *q. front () << std::endl;
q. pop ();
});
q.push( 1 );
t.join();
Das vollständige Beispiel finden Sie src/SPSCQueueExample.cpp
.
SPSCQueue<T>(size_t capacity);
Erstellen Sie eine SPSCqueue
Warteschlange mit Elementen vom Typ T
und der capacity
. Die Kapazität muss mindestens 1 betragen.
void emplace(Args &&... args);
Stellen Sie ein Element mithilfe der Inplace-Konstruktion in die Warteschlange. Blockiert, wenn die Warteschlange voll ist.
bool try_emplace(Args &&... args);
Versuchen Sie, ein Element mithilfe der Inplace-Konstruktion in die Warteschlange zu stellen. Gibt bei Erfolg true
und false
zurück, wenn die Warteschlange voll ist.
void push(const T &v);
Stellen Sie ein Element mithilfe der Kopierkonstruktion in die Warteschlange. Blockiert, wenn die Warteschlange voll ist.
template <typename P> void push(P &&v);
Stellen Sie ein Element mithilfe der Bewegungskonstruktion in die Warteschlange. Nimmt nur an der Überladungsauflösung teil, wenn std::is_constructible<T, P&&>::value == true
. Blockiert, wenn die Warteschlange voll ist.
bool try_push(const T &v);
Versuchen Sie, ein Element mithilfe der Kopierkonstruktion in die Warteschlange einzureihen. Gibt bei Erfolg true
und false
zurück, wenn die Warteschlange voll ist.
template <typename P> bool try_push(P &&v);
Versuchen Sie, einen Gegenstand mithilfe der Bewegungskonstruktion in die Warteschlange zu stellen. Gibt bei Erfolg true
und false
zurück, wenn die Warteschlange voll ist. Nimmt nur an der Überladungsauflösung teil, wenn std::is_constructible<T, P&&>::value == true
.
T *front();
Gibt den Zeiger an den Anfang der Warteschlange zurück. Gibt nullptr
zurück, wenn die Warteschlange leer ist.
void pop();
Erstes Element der Warteschlange aus der Warteschlange entfernen. Sie müssen sicherstellen, dass die Warteschlange nicht leer ist, bevor Sie pop aufrufen. Das bedeutet, dass front()
vor jedem Aufruf von pop()
einen PTR ungleich nullptr
zurückgegeben haben muss. Erfordert std::is_nothrow_destructible<T>::value == true
.
size_t size();
Gibt die Anzahl der in der Warteschlange verfügbaren Elemente zurück.
bool empty();
Gibt „true“ zurück, wenn die Warteschlange derzeit leer ist.
Nur ein einzelner Writer-Thread kann Enqueue-Vorgänge ausführen und nur ein einzelner Reader-Thread kann Dequeue-Vorgänge ausführen. Jede andere Verwendung ist ungültig.
Zusätzlich zur Unterstützung der benutzerdefinierten Zuordnung über die standardmäßige benutzerdefinierte Allocator-Schnittstelle unterstützt diese Bibliothek auch den Standardvorschlag P0401R3 und stellt Größenrückmeldungen in der Allocator-Schnittstelle bereit. Dies ermöglicht die bequeme Nutzung großer Seiten, ohne dass zugewiesener Speicherplatz verschwendet wird. Die Verwendung von Größenrückmeldungen wird nur unterstützt, wenn C++17 aktiviert ist.
Die Bibliothek enthält derzeit keinen Zuweiser für große Seiten, da die APIs für die Zuweisung großer Seiten plattformabhängig sind und der Umgang mit großen Seitengrößen und die NUMA-Erkennung anwendungsspezifisch sind.
Unten finden Sie ein Beispiel für eine große Seitenzuweisung für Linux:
# include < sys/mman.h >
template < typename T> struct Allocator {
using value_type = T;
struct AllocationResult {
T *ptr;
size_t count;
};
size_t roundup ( size_t n) { return (((n - 1 ) >> 21 ) + 1 ) << 21 ; }
AllocationResult allocate_at_least ( size_t n) {
size_t count = roundup ( sizeof (T) * n);
auto p = static_cast <T *>( mmap ( nullptr , count, PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB,
- 1 , 0 ));
if (p == MAP_FAILED) {
throw std::bad_alloc ();
}
return {p, count / sizeof (T)};
}
void deallocate (T *p, size_t n) { munmap (p, roundup ( sizeof (T) * n)); }
};
Das vollständige Beispiel zur Verwendung großer Seiten unter Linux finden Sie unter src/SPSCQueueExampleHugepages.cpp
.
Die zugrunde liegende Implementierung basiert auf einem Ringpuffer.
Es wurde darauf geachtet, Probleme durch falsche Weitergabe zu vermeiden. Die Kopf- und Schwanzindizes werden ausgerichtet und auf den False-Sharing-Bereich (Cache-Zeilengröße) aufgefüllt. Zusätzlich wird der Slot-Puffer am Anfang und am Ende mit dem Bereich für falsche gemeinsame Nutzung aufgefüllt, wodurch eine falsche gemeinsame Nutzung mit benachbarten Zuweisungen verhindert wird.
Diese Implementierung hat einen höheren Durchsatz als ein typischer gleichzeitiger Ringpuffer, indem die Head- und Tail-Indizes jeweils im Writer und Reader lokal zwischengespeichert werden. Das Caching erhöht den Durchsatz, indem es die Menge des Cache-Kohärenz-Verkehrs reduziert.
Um zu verstehen, wie das funktioniert, betrachten Sie zunächst einen Lesevorgang ohne Caching: Der Kopfindex (Leseindex) muss aktualisiert werden und somit wird diese Cache-Zeile im exklusiven Zustand in den L1-Cache geladen. Der Tail (Schreibindex) muss gelesen werden, um zu überprüfen, ob die Warteschlange nicht leer ist und daher im gemeinsam genutzten Zustand in den L1-Cache geladen wird. Da ein Warteschlangenschreibvorgang den Kopfindex lesen muss, ist es wahrscheinlich, dass ein Schreibvorgang etwas Cache-Kohärenzverkehr erfordert, um die Kopfindex-Cachezeile wieder in den exklusiven Zustand zu versetzen. Im schlimmsten Fall gibt es für jeden Lese- und Schreibvorgang einen Übergang der Cache-Zeile von „gemeinsam“ zu „exklusiv“.
Betrachten Sie als Nächstes einen Warteschlangenleser, der den Endindex zwischenspeichert: Wenn der zwischengespeicherte Endindex angibt, dass die Warteschlange leer ist, laden Sie den Endindex in den zwischengespeicherten Endindex. Wenn die Warteschlange nicht leer war, können mehrere Lesevorgänge ausgeführt werden, bis der zwischengespeicherte Endindex abgeschlossen werden kann, ohne den exklusiven Status der Endindex-Cachezeile des Schreibers zu stehlen. Dadurch wird der Cache-Kohärenzverkehr reduziert. Ein analoges Argument kann für den Warteschlangenschreibvorgang angeführt werden.
Diese Implementierung ermöglicht beliebige Nicht-Potenz-Zweier-Kapazitäten und weist stattdessen einen zusätzlichen Warteschlangenslot zu, um eine volle Warteschlange anzuzeigen. Wenn Sie keinen Speicherplatz für einen zusätzlichen Warteschlangenplatz verschwenden möchten, sollten Sie eine andere Implementierung verwenden.
Referenzen:
Das Testen von sperrfreien Algorithmen ist schwierig. Ich verwende zwei Ansätze, um die Implementierung zu testen:
Der Durchsatz-Benchmark misst den Durchsatz zwischen zwei Threads für eine Warteschlange mit int
-Elementen.
Der Latenz-Benchmark misst die Roundtrip-Zeit zwischen zwei Threads, die über zwei Warteschlangen mit int
-Elementen kommunizieren.
Benchmark-Ergebnisse für einen AMD Ryzen 9 3900X 12-Core-Prozessor, die 2 Threads laufen auf unterschiedlichen Kernen auf demselben Chiplet:
Warteschlange | Durchsatz (ops/ms) | Latenz RTT (ns) |
---|---|---|
SPSCQueue | 362723 | 133 |
boost::lockfree::spsc | 209877 | 222 |
Torheit::ProducerConsumerQueue | 148818 | 147 |
SPSCQueue wurde in den folgenden Artikeln zitiert:
Dieses Projekt wurde von Erik Rigtorp <[email protected]> erstellt.