Ein Threadpool

Der naive Ansatz "ein Task = ein Thread" führt im Zweifelsfall nicht zu einer besseren Auslastung der Hardware, sondern verschlechtert Dinge eher. Sowohl Speicher, als auch CPU werden verschwendet. Besser ist es, die CPU mit Threads optimal auszulasten und die Tasks darauf nacheinander zu verteilen.

Video

Quellcode

#include <iostream>
#include <deque>
#include <memory>
#include <functional>
#include <chrono>
#include <thread>
#include <mutex>
#include <list>

class ExecutionManager
{
  std::deque<std::function<void ()>> mTasks;
  std::list<std::shared_ptr<std::thread>> mThreads;
  std::mutex mMutex;
  bool mFinish;

  void executor()
  {
    std::cout << "- Thread gestartet\n";
    bool running = true;
    while (running) {
      mMutex.lock();
      if (!mTasks.empty()) {
	auto task = mTasks.front();
	mTasks.pop_front();
	mMutex.unlock();
	task();  
      }
      else {
	running = !mFinish;
        mMutex.unlock();
      }
    }
    std::cout << "- Thread beendet\n";
  }
  
public:
  
  explicit ExecutionManager(unsigned int threads)
  {
    mFinish = false;
    for (unsigned int i = 0; i < threads; ++i)
    {
      mThreads.push_back(std::shared_ptr<std::thread>(
	new std::thread(&ExecutionManager::executor, this)));
    }
  }
  
  void addTask(std::function<void ()> const &task) {
    mMutex.lock();
    mTasks.push_back(task);
    mMutex.unlock();
  }
  
  void joinAll()
  {
    mMutex.lock();
    mFinish = true;
    mMutex.unlock();
    for (auto t : mThreads) {
      t->join();
    }
    std::cout << "-- Alle Threads beendet.\n";
  }
};

void taskFn()
{
  std::cout << "taskFn() aufgerufen\n";
  std::this_thread::sleep_for(std::chrono::seconds(5));
  std::cout << "taskFn() beendet\n";
}

struct TaskFunctor
{
  void operator()()
  {
    std::cout << "TaskFunctor aufgerufen\n";
    std::this_thread::sleep_for(std::chrono::seconds(10));
    std::cout << "TaskFunctor beendet\n";
  }
};

int main()
{
  ExecutionManager e(4);
  
  e.addTask([]() {     
    std::cout << "lambda aufgerufen\n";
    std::this_thread::sleep_for(std::chrono::seconds(7)); 
    std::cout << "lambda beendet\n"; });
  e.addTask(taskFn);
  e.addTask(TaskFunctor());
  e.addTask(taskFn);
  
  std::this_thread::sleep_for(std::chrono::seconds(2));
  
  e.addTask([]() { std::cout << "Nachzügler!\n"; });
    
  e.joinAll();
}

Erklärung

Statt der execute()-Funktion, die alle Tasks auf einmal startet, legt die neue Version des ExecutionManager eine vorgebene Menge an Threads an. Diese rufen nun nicht mehr direkt die Tasks auf, sondern haben eine Hauptfunktion, die sich um den Zugriff auf die Taskliste und das Ausführen eines Tasks kümmert. Die Funktion executor() läuft n mal (in unserem Beispiel 4 mal) parallel und versucht, auf die Taskliste zuzugreifen. Das funktioniert nicht so einfach. Ein zentrales Thema beim parallelen Zugriff auf gemeinsame Datenstrukturen ist die Frage der Konsistenz. Im ungünstigsten Fall könnte ein Thread eine Struktur in mehreren Teilschritten ändern und ein anderer direkt dazwischen grätschen und so einen ungültigen Übergangszustand zu sehen bekommen. Das kann zu Abstürzen führen oder gar zu schwer zu entdeckenden Korruptionen von Datenstrukturen. 

Um solche parallelen Zugriffe sauber zu trennen gibt es Mutexe. Ein Mutex ist eine Datenstruktur, die exklusiv nur von einem Thread gehalten werden kann. C++11 bietet mit std::mutex eine simple Implementierung an. Für uns interessaant sind die Funktionen lock() und unlock(), die genau das tun, wonach sie klingen: lock() sperrt den Mutex für den Zugriff durch andere und unlock() gibt ihn wieder frei. Wird dabei lock() auf einen bereits gesperrten Mutex aufgerufen, blockiert die Funktion bis zur Freigabe des Mutex und der Thread muss warten. Auf diese Art lassen sich parallele Zugriffe serialisieren und sicher gestalten. Ein wenig aufpassen muss mal allerdings: lock() und unlock() müssen sich innerhalb eines Threads immer genau abwechseln. Wird lock() zweimal nacheinander aus demselben Thread aufgerufen, dann kann der zweite Aufruf blockieren und damit den Thread anhalten, der eigentlich das Lock wieder freigeben müsste. In dieser Situation kommt es irgendwann zum Stillstand des Programms, wenn bspw. irgendwann alle Threads an dem selben Lock hängen und der einzige, der das freigeben könnte, sich ein Bein gestellt hat. Das Verhalten eines doppelten unlock() ist lauf Standard undefiniert. Es könnte also funktionieren, könnte aber auch einen Absturz verursachen. Grundregel also: ein lock() gefolgt von einem unlock() (innerhalb eines Threads natürlich).

Eine kleine Besonderheit ist noch in Zeile 45 zu sehen: der Aufruf des std::thread-Konstruktors mit den zwei Parametern dient dem Aufrufe einer Memberfunktion eines Objektes. Mit &ExecutionManager::executor wird der Zeiger auf die Memberfunktion executor der Klasse ExecutionManager erzeugt. Der allein nützt aber noch nichts, weil unbekannt ist, für welches Objekt diese Funktion aufgerufen werden soll. Deswegen der zweite Parameter: der this pointer liefert das betreffende Objekt nach.