Signalisierung

Lässt man den ExecutionManager einfach in allen Threads ständig in einer Schleife auf neue Tasks warten, dann verbrät das unnötig viel CPU-Zeit. Vor allem, wenn das Objekt nur als Verwalter für Hintergrundtasks zur Verfügung stehen soll und meist nicht direkt gebraucht wird, ist das ineffizient (und killt effektiv jegliche Stromsparmechanismen moderner Hardware). Viel besser ist es, wenn die Threads nur dann aufwachen, wenn es wirklich Arbeit gibt.

Video

Quelltext

#include <iostream>
#include <deque>
#include <memory>
#include <functional>
#include <chrono>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <list>

class ExecutionManager
{
  std::deque<std::function<void ()>> mTasks;
  std::list<std::shared_ptr<std::thread>> mThreads;
  std::mutex mMutex;
  std::condition_variable mCond;
  bool mFinish;

  void executor()
  {
    std::cout << "- Thread gestartet\n";
    bool running = true;
    while (running) {
      std::unique_lock<std::mutex> l(mMutex);
      mCond.wait(l, [this]() { 
	    return mFinish || mTasks.size() != 0;
      });
      std::cerr << "Prüfe Warteschlange\n";
      if (!mTasks.empty()) {
	auto task = mTasks.front();
	mTasks.pop_front();
	l.unlock();
	task();  
      }
      else {
	running = !mFinish;
      }
    }
    std::cout << "- Thread beendet\n";
  }
  
public:
  
  explicit ExecutionManager(unsigned int threads)
  {
    mFinish = false;
    for (unsigned int i = 0; i < threads; ++i)
    {
      mThreads.push_back(std::shared_ptr<std::thread>(
	new std::thread(&ExecutionManager::executor, this)));
    }
  }
  
  void addTask(std::function<void ()> const &task) {
    std::unique_lock<std::mutex> l(mMutex);
    mTasks.push_back(task);
    mCond.notify_one();
  }
  
  void joinAll()
  {
    {
      std::unique_lock<std::mutex> l(mMutex);
      mFinish = true;
    }
    mCond.notify_all();
    for (auto t : mThreads) {
      t->join();
    }
    std::cout << "-- Alle Threads beendet.\n";
  }
};

int main()
{
  ExecutionManager e(2);

  std::this_thread::sleep_for(std::chrono::seconds(2));
  
  e.addTask([]() {     
    std::cout << "lambda aufgerufen\n";
    std::this_thread::sleep_for(std::chrono::seconds(7)); 
    std::cout << "lambda beendet\n"; });
  e.addTask([]() { std::cout << "Zweiter Task!\n"; });
  
  e.joinAll();
}

Erklärung

Der Schlüssel zu unserem Problem ist die Klasse std::condition_variable. Diese repräsentiert ein Objekt, mit dem zwischen zwei Threads signalisiert werden kann. Die Signalisierung erfolgt in der Form, dass ein Thread (oder mehrere) auf das Signal (sprich: die Variable) wartet, während ein anderer das Signal auslöst. Im Beispiel ist das Warten in Zeile 25/26 zu sehen. Die Klasse std::condition_variable stellt eine Methode wait() zur Verfügung. Diese Methode blockiert und schickt den aufrufenden Thread schlafen, bis die condition_variable signalisiert wird (mittels notify_one() oder notify_all(), siehe Zeilen 57 und 66). Eine condition_variable ist immer mit einem std::mutex verbunden. Der wartende Thread reserviert den Mutex (automatisch mittels std::unique_lock in Zeile 24) und ruft dann wait() auf. Die Methode gibt den Mutex wieder frei und blockiert. Damit sind andere Threads frei, auf den durch den Mutex geschützten Daten zu arbeiten. Wird die condition_variable signalisiert, dann versucht sie, den Mutex wieder zu reservieren und wait() kehrt zurück. Der wartende Thread hat also nach dem Aufruf von wait() den Mutex auf jeden Fall reserviert und kann auf die entsprechend geschützten Daten problemlos arbeiten.

wait() existiert in zwei Varianten: mit einem Mutex als Parameter und einem Mutex und einem Predicate. Die zweite Variante (wie oben verwendet) ist praktisch, um ein versehentliches Aufwachen zu vermeiden. Das kann je nach zugrundeliegendem System nämlich durchaus passieren. Um da ein echtes Aufwachen zu vermeiden kann mittels des Predicate eine komplexere Bedingung geprüft werden. Der Code hinter dieser wait()-Variante sieht in etwa so aus (zur besseren Übersichtlichkeit ohne Typ-Annotationen):

void wait(mutex, predicate)
{
  while (!predicate()) {
    wait(mutex);
  }
}

Intern wird also die Variante mit einem Parameter aufgerufen. Das Predicate wird bei jedem Aufwachen geprüft und wenn es false liefert, dann wird erneut gewartet. Im Beispiel oben verwenden wir das, um zu prüfen, ob entweder mFinish gesetzt wurde oder mindestens ein Task vorhanden ist (geprüft durch die lambda-Funktion).

Zur Signalisierung einer condition_variable gibt es zwei Varianten: notify_one() und notify_all(). Ersteres weckt genau einen der wartenden Thread auf, während die zweite Variante alle aufweckt.