Exceptions asynchron

Das letzte, was dem ExecutionManager noch gefehlt hat: Tasks sollten ganz normal mittels Exceptions Fehlerbedingungen signalisieren können. Die Exceptions einfach rausfliegen lassen geht nicht: die müssen in einen anderen Thread übergeben werden, nämlich den des Aufrufers.

Video

Quellcode

#include <iostream>
#include <deque>
#include <memory>
#include <functional>
#include <chrono>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <list>

class ExecutionManager
{
  struct ExecutorBase
  {
    virtual void operator()() = 0;
  };
  
  template <typename Task, typename Ret> struct Executor : public ExecutorBase
  {
    Task fn;
    std::promise<Ret> result;
    
    Executor(Task const &f)
      : fn(f)
    {
    }
    
    void operator()() override
    {
      try {
	auto res = fn();
	result.set_value(res);
      }
      catch (...) {
	result.set_exception(std::current_exception());
      }
    }
  };
  
  std::deque<std::shared_ptr<ExecutorBase>> mTasks;
  std::list<std::shared_ptr<std::thread>> mThreads;
  std::mutex mMutex;
  std::condition_variable mCond;
  bool mFinish;

  void executor()
  {
    std::cout << "- Thread gestartet\n";
    bool running = true;
    while (running) {
      std::unique_lock<std::mutex> l(mMutex);
      mCond.wait(l, [this]() { 
	    return mFinish || mTasks.size() != 0;
      });
      std::cerr << "Prüfe Warteschlange\n";
      if (!mTasks.empty()) {
	auto task = mTasks.front();
	mTasks.pop_front();
	l.unlock();
	(*task)();  
      }
      else {
	running = !mFinish;
      }
    }
    std::cout << "- Thread beendet\n";
  }
  
public:
  
  explicit ExecutionManager(unsigned int threads)
  {
    mFinish = false;
    for (unsigned int i = 0; i < threads; ++i)
    {
      mThreads.push_back(std::shared_ptr<std::thread>(
	new std::thread(&ExecutionManager::executor, this)));
    }
  }
  
  template <typename F, typename... Arguments> auto addTask(F task, Arguments... args) 
      -> std::future<decltype(task(args...))> {
    typedef decltype(task(args...)) Ret;
    std::unique_lock<std::mutex> l(mMutex);
    auto t = std::make_shared<Executor<decltype(std::bind(task, args...)), Ret>>
	    (std::bind(task, args...));
    mTasks.push_back(t);
    mCond.notify_one();
    return t->result.get_future();
  }
  
  void joinAll()
  {
    {
      std::unique_lock<std::mutex> l(mMutex);
      mFinish = true;
    }
    mCond.notify_all();
    for (auto t : mThreads) {
      t->join();
    }
    std::cout << "-- Alle Threads beendet.\n";
  }
};

int add(int a, int b)
{
  std::cout << "add(" << a << ", " << b << ") aufgerufen\n";
  std::this_thread::sleep_for(std::chrono::seconds(3));
  if (a+b == 0) {
    std::cout << "add() abgebrochen\n";
    throw std::runtime_error("Ich mag keine Nullen!");
  }
  std::cout << "add() beendet\n";
  return a+b;
}

int main()
{
  ExecutionManager e(2);

  std::this_thread::sleep_for(std::chrono::seconds(2));
  
  auto  t1 = e.addTask([]() {     
    std::cout << "lambda aufgerufen\n";
    std::this_thread::sleep_for(std::chrono::seconds(7)); 
    std::cout << "lambda beendet\n"; 
    return 10;
  });
  auto t2 = e.addTask(add, 20, 55);
  auto t3 = e.addTask(add, 20, -20);
  
  auto r1 = t1.get();
  std::cout << "r1 (" << typeid(r1).name() << ") = " << r1 << std::endl;
  auto r2 = t2.get();
  std::cout << "r2 (" << typeid(r2).name() << ") = " << r2 << std::endl;
  auto r3 = t3.get();
  std::cout << "r3 (" << typeid(r3).name() << ") = " << r3 << std::endl;
  
  
  e.joinAll();
}

Erklärung

Löst ein Task eine Exception aus, dann muss die erstmal gefangen werden. Sonst fliegt sie nämlich in dem ausführenden Thread bis nach draußen, wird dort auch nirgends behandelt und bricht das Programm ab. Ein try-catch-Block in irgendeinem anderen Thread nützt da gar nichts. Also umgeben wir den Taskaufruf in Zeile 32 mit einem try-catch-Block, in dem wir samt und sonders alle Exceptions abfangen. Dafür ist der catch-all-Block in Zeile 35 gedacht: die Ellipse bedeutet dem Compiler, dass jedes beliebige Exceptionobjekt zu fangen ist. An dieser Stelle kann die Exception allerdings nicht sinnvoll behandelt werden. Letztlich kann nur der ursprüngliche Aufrufer des Tasks, der ja noch das std::future für das Ergebnis hält, darüber entscheiden, wie mit einem Fehler umzugehen ist. Ergo: die Exception muss zu ihm transportiert werden.

Die Lösung, die C++11 dafür anbietet, ist recht elegant: std::promise bietet eine Funktion namens set_exception, mittels derer man statt eines Rückgabewertes eine Exception signalisieren kann, die dann ausgelöst wird, wenn auf das entsprechende std::future die Methode get aufgerufen wird. Dazu übergibt man einen std::exception_ptr, ein Objekt, dessen genauere Struktur vom Standard nicht näher festgelegt ist. Es ist lediglich festgelegt, dass dieser Pointer nicht selbst allokiert oder aus einem anderen Pointer erzeugt werden kann und dass er in der Lage ist, ein Exceptionobjekt zu referenzieren. Um an solch einen Pointer zu kommen, rufen wir in dem betreffenden catch-Block std::current_exception() auf. Diese Methode liefert uns einen Pointer auf die gerade aktive Exception; eben genau jenen Pointer, den wir im std::promise speichern wollen. Durch den Aufruf von set_exception wird das promise (und damit das zugehörige future) ready, d.h. ein evtl. bereits blockierendes get() kehrt zurück. Statt eines Rückgabewertes wird allerdings die gespeicherte Exception ausgelöst und kann genauso behandelt werden, als hätte der ursprüngliche Aufrufer den Task direkt aufgerufen (statt asynchron über den ExecutionManager).