Atomare Datentypen

Korrektes Beabeiten von Daten in mehreren Threads muss nicht unbedingt immer mit std::mutex abgesichert sein. Manchmal reicht auch std::atomic, ein (potentiell) lock-freier Datentyp.

Video

Quelltext

#include <iostream>
#include <thread>
#include <atomic>
#include <mutex>
#include <chrono>
#include <vector>

int const count = 10000000;

int value;
std::mutex m;

std::atomic<int> atomic_value;

void atomic_runner()
{
    std::cout << " Starte runner\n";
    for (int i=0; i < count; ++i) {
        ++atomic_value;
    }
    std::cout << " Beende runner\n";
}

void mutex_runner()
{
    std::cout << " Starte runner\n";
    for (int i=0; i < count; ++i) {
        m.lock();
        ++value;
        m.unlock();
    }
    std::cout << " Beende runner\n";
}

void runner()
{
    std::cout << " Starte runner\n";
    for (int i=0; i < count; ++i) {
        ++value;
    }
    std::cout << " Beende runner\n";
}

template <typename T> void execute(std::function<void()> const &runner, 
                                   T &variable, 
                                   int thread_num = 2)
{
    unsigned long start =
        std::chrono::duration_cast<std::chrono::milliseconds>(
            std::chrono::system_clock::now().time_since_epoch()).count();
    variable = 0;
    std::vector<std::thread> threads;
    for (int i = 0; i < thread_num; ++i) {
        threads.push_back(std::thread(runner));
    }

    for (auto&& t : threads) {
        t.join();
    }

    if (variable != count*thread_num) {
        std::cout << " Fehlerhafter Zähler. Erwarte " << count*thread_num 
                  << ", tatsächlich: " 
                  << static_cast<int>(variable) << std::endl;
    }
    unsigned long end =
        std::chrono::duration_cast<std::chrono::milliseconds>(
            std::chrono::system_clock::now().time_since_epoch()).count();
    std::cout << "Zeitdauer: " << (end - start) << " ms\n";
}

int main()
{
    std::cout << "Starte Single-Thread-Test\n";
    execute(&runner, value, 1);

    std::cout << "Starte unsynchronisierten Test\n";
    execute(&runner, value);

    std::cout << "Starte std::mutex Test\n";
    execute(&mutex_runner, value);

    std::cout << "Starte std::atomic Test\n";
    execute(&atomic_runner, atomic_value);
}

Erklärung

Die Funktion runner() im Beispiel oben sieht eigentlich erstmal unverdächtig aus: eine (in dem Fall globale) Variable wird in einer Schleife hochgezählt. So ungefährlich ist das allerdings gar nicht mal: alle, die schon öfter mit Threads gearbeitet haben, kriegen bei der Verwendung einer globalen Variable spätestens Gänsehaut (und alle anderen hoffentlich auch, aber das ist ja auch nur ein plakatives Beispiel. Manchmal sind die Interaktionen etwas subtiler). Immer dann, wenn mehrere Threads gleichzeitig auf den gleichen Daten schreiben wollen, ist das eine Einladung zur Katastrophe. Nun mag das hier im Beispiel mit dem Integer so sein, dass auf den meisten Plattformen das eigentliche Schreiben eines Integers atomar ist (man bekommt also nicht irgendein halbfertiges Bitmuster zu sehen, wenn jemand beim Schreiben unterbrochen wird), aber der operator++() ist es deswegen noch lang nicht.

Auch wenn er so harmlos aussieht, besteht er aus 3 Schritten:

  1. Lade die Variable in ein Register
  2. Inkrementiere das Register
  3. Schreibe das Register zurück in den Speicher

Zwischen jedem dieser Punkte kann natürlich der Vorgang durch einen anderen Thread unterbrochen werden. So könnte bspw. folgender Ablauf mit zwei Threads entstehen:

  1. Thread 1 lädt die Variable mit dem Wert 1
  2.              Thread 2 unterbricht und lädt die Variable mit dem Wert 1
  3.              Thread 2 inkrementiert
  4.              Thread 2 schreibt die Variable mit dem Wert 2 in den Speicher
  5. Thread 1 inkrementiert (den vorher geladenen Wert 1)
  6. Thread 1 schreibt den Wert 2 in den Speicher zurück.

Statt also wie erwartet nach 2 Inkrements von 1 auf 3 zu kommen, steht am Ende nur eine 2 im Speicher und eins der Inkrements hat sich einfach in Luft aufgelöst.

Um diese Probleme zu vermeiden nutzt man normalerweise Mutexe (wie im mutex_runner im Beispiel oben). Die kritischen Bereiche werden dabei mit einem gemeinsamen Mutex abgesichert, der immer nur von einem Thread gleichzeitig gehalten werden kann. Mutexe haben leider einige unpraktische Nebenwirkungen: sie sind relativ teuer (Locking und Unlocking ist vergleichsweise langsam), es kann zu einer Priority Inversion kommen (ein hochpriorer Thread wartet auf einen mit niedriger Priorität, weil der eben grad den Mutex hat) und bei Verwendung mehrerer Mutexe kann es zu Deadlocks kommen (Beispiel 2 Mutexe: Thead 1 hält Mutex A und wartet auf Mutex B, Thread 2 umgekehrt. In dieser Situation wird keiner der beiden Threads jemals weiterlaufen können, weil der jeweils andere Mutex nicht mehr freigegeben wird).

Für bestimmte Anwendungsfälle kann man Algorithmen Lockfree implementieren. Ein Beispiel für so eine (möglicherweise) lock-free Datenstruktur ist das Template std::atomic. Verwendet man das im Beispiel oben (im atomic_runner) auch ohne Locking, dann werden die Inkrements trotzdem erfolgreich ausgeführt. Intern sorgt das Template dafür, dass der operator++() nicht unterbrochen wird (entweder durch eigenes Locking oder durch eine lock-freie Implementierung des Zählvorganges. Näheres verrät Google, aber Vorsicht: lock-freie Algorithmen machen Kopfschmerzen!). Im Allgmeinen wird die Verwendung von std::atomic schneller sein, als eine klassische, mutex-basierte Implementierung, allerdings auch langsamer, als die Implementierung ohne Synchronisation (die dafür allerdings nicht korrekt funktioniert und damit natürlich auch nur minder brauchbar ist).