Container, die keine sind

Wie kann unser ExecutionManager möglichst Task-Prioritäten unterstützen? C++ hält mit der std::priority_queue die passende Datenstruktur bereit.

Video

Quellcode

#include <iostream>
#include <memory>
#include <functional>
#include <chrono>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <list>
#include <queue>
 
class ExecutionManager
{
  struct ExecutorBase
  {
    uint8_t prio;

    explicit ExecutorBase(uint8_t p = 0xFF)
      : prio { p }
    {
    }

    bool operator<(ExecutorBase const &other)
    {
      return prio < other.prio;
    }
    
    virtual void operator()() = 0;
  };
  
  template <typename Task, typename Ret> struct Executor : public ExecutorBase
  {
    Task fn;
    std::promise<Ret> result;
     
    Executor(Task const &f, uint8_t prio)
      : ExecutorBase(prio), fn(f)
    {
    }
     
    void operator()() override
    {
      try {
	auto res = fn();
	result.set_value(res);
      }
      catch (...) {
	result.set_exception(std::current_exception());
      }
    }
  };

  template <typename Task> struct Executor<Task, void> : public ExecutorBase
  {
    Task fn;
    std::promise<void> result;
     
    Executor(Task const &f, uint8_t prio)
      : ExecutorBase(prio), fn(f)
    {
    }
     
    void operator()() override
    {
      try {
	fn();
	result.set_value();
      }
      catch (...) {
	result.set_exception(std::current_exception());
      }
    }
  };
  
  struct DerefComparator
  {
    template <typename PT> bool operator()(PT const &p1, PT const &p2)
    {
      std::less<decltype(*p1)> l;
      return l(*p1, *p2);
    }
  };
   
  std::priority_queue<
	      std::shared_ptr<ExecutorBase>, 
	      std::vector<std::shared_ptr<ExecutorBase>>,
	      DerefComparator
	  > mTasks;
  std::list<std::shared_ptr<std::thread>> mThreads;
  std::mutex mMutex;
  std::condition_variable mCond;
  bool mFinish;
 
  void executor()
  {
    std::cout << "- Thread gestartet\n";
    bool running = true;
    while (running) {
      std::unique_lock<std::mutex> l(mMutex);
      mCond.wait(l, [this]() { 
        return mFinish || mTasks.size() != 0;
      });
      std::cerr << "Prüfe Warteschlange\n";
      if (!mTasks.empty()) {
    auto task = mTasks.top();
    mTasks.pop();
    l.unlock();
    (*task)();  
      }
      else {
    running = !mFinish;
      }
    }
    std::cout << "- Thread beendet\n";
  }
   
public:
   
  explicit ExecutionManager(unsigned int threads)
  {
    mFinish = false;
    for (unsigned int i = 0; i < threads; ++i)
    {
      mThreads.push_back(std::shared_ptr<std::thread>(
    new std::thread(&ExecutionManager::executor, this)));
    }
  }

  template <typename F, typename... Arguments> auto addTask(F task, Arguments... args) 
      -> std::future<decltype(task(args...))> {
    return addTask(0, task, args...);
  }
  
  template <typename F, typename... Arguments> auto addTask(uint8_t prio, F task, Arguments... args) 
      -> std::future<decltype(task(args...))> {
    typedef decltype(task(args...)) Ret;
    std::unique_lock<std::mutex> l(mMutex);
    auto t = std::make_shared<Executor<decltype(std::bind(task, args...)), Ret>>
        (std::bind(task, args...), prio);
    mTasks.push(t);
    mCond.notify_one();
    return t->result.get_future();
  }
   
  void joinAll()
  {
    {
      std::unique_lock<std::mutex> l(mMutex);
      mFinish = true;
    }
    mCond.notify_all();
    for (auto t : mThreads) {
      t->join();
    }
    std::cout << "-- Alle Threads beendet.\n";
  }
};
 
int main()
{
  ExecutionManager e(2);
   
  e.addTask(1, []() {std::cout << "low aufgerufen\n";});
  e.addTask(1, []() {std::cout << "low2 aufgerufen\n";});
  e.addTask(5, []() {std::cout << "mid aufgerufen\n";});  
  e.addTask([]() {std::cout << "min aufgerufen\n";});  
  e.addTask(10, []() {std::cout << "high aufgerufen\n";});

  e.joinAll();
}

Erklärung

Tasks priorisieren können, ohne dabei selbst zuviel Aufwand zu treiben... Wie üblich bietet die C++-Standardbibliothek hier eine Lösung an: std::priority_queue. Elemente werden dort beim Einfügen anhand ihrer Priorität sortiert gespeichert.

Das interessante an dieser Datenstruktur ist allerdings, dass sie gar keine ist. Ähnlich wie std::queue und std::stack ist std::priority_queue ein sogenannter Containeradapter: eine Klasse die einen der anderen Container verpackt und mit neuer Schnittstelle/neuen Fähigkeiten ausstattet. Im Fall von std::priority_queue ist das eben die Fähigkeit zum automatischen sortieren nach Priorität. Die wird dabei durch den angegebenen Vergleichsoperator festgestellt (hier im Beispiel der DerefComparator, standardmäßig std::less<T> für den entsprechenden Objekttyp).Elemente, die damit als kleiner vergleichen, landen weiter hinten in der Queue. Entnommen werden kann immer nur das oberste, also wichtigste Element. Die Klasse speichert dabei die Elemente nicht selbst, sondern überlässt das der zugrundeliegenden Datenstruktur. Im Beispiel hier (und auch im Standardfall) ist das ein std::vector<T> für die entsprechenden Elemente. Andere Container sind möglich, solange sie die Anforderungen der Klasse erfüllen: einen Random Access Iterator zur Verfügung stellen und front(), push_back() und pop_back() unterstützen. Man kann dort also bspw. auch einen spezialisiserten eigenen Container verwenden, der diese Anforderungen unterstützt.

Im Beispiel oben noch interessant ist der DerefComparator als Beispiel für einen generisch nützlichen Vergleichsoperator. Das Problem: in der priority_queue werden Pointer gespeichert. Wenn man nun keinen eigenen Comparator angibt, dann vergleicht die Queue standardmäßig die Pointerwerte miteinander, was natürlich keinerlei Sinn hinsichtlich der Sortierung nach Priorität ergibt. Der DerefComparator hingegen ermöglicht es, generisch Elemente zu vergleichen, auf die nur Pointer vorhanden sind, indem er den Aufruf der operator() mit zwei Pointern umleitet auf einen Aufruf von std::less<T> für die dereferenzierten Pointer.