醍醐灌頂全方位擊破C++執行緒池及非同步處理

1。 C++執行緒池

1。1 基礎概念

執行緒池: 當進行並行的任務作業操作時,執行緒的建立與銷燬的開銷是,阻礙效能進步的關鍵,因此執行緒池,由此產生。使用多個執行緒,無限制迴圈等待佇列,進行計算和操作。幫助快速降低和減少效能損耗。

1。2 執行緒池的組成

執行緒池管理器:初始化和建立執行緒,啟動和停止執行緒,調配任務;管理執行緒池

工作執行緒:執行緒池中等待並執行分配的任務

任務介面:新增任務的介面,以提供工作執行緒排程任務的執行。

任務佇列:用於存放沒有處理的任務,提供一種緩衝機制,同時具有排程功能,高優先順序的任務放在佇列前面

1。3 執行緒池工作的四種情況

1。3。1 沒有任務要執行,緩衝佇列為空

醍醐灌頂全方位擊破C++執行緒池及非同步處理

1。3。2 佇列中任務數量,小於等於執行緒池中執行緒任務數量

醍醐灌頂全方位擊破C++執行緒池及非同步處理

1。3。3 任務數量大於執行緒池數量,緩衝佇列未滿

醍醐灌頂全方位擊破C++執行緒池及非同步處理

1。3。4 任務數量大於執行緒池數量,緩衝佇列已滿

醍醐灌頂全方位擊破C++執行緒池及非同步處理

1。4 醍醐灌頂學習執行緒池

免費訂閱,醍醐灌頂讓你全方位擊破多執行緒各個細節知識

醍醐灌頂全方位擊破C++執行緒池及非同步處理

立即預約直播地址:

https://ke。qq。com/course/417774?flowToken=1036031

2。 執行緒池的C++實現

執行緒池的主要組成有上面三個部分:

任務佇列(Task Quene)

執行緒池(Thread Pool)

完成佇列(Completed Tasks)

2。1 佇列

我們使用佇列來儲存工作,因為它是更合理的資料結構。 我們希望以與傳送它相同的順序啟動工作。 但是,這個佇列有點特殊。正如我在上一節中所說的,執行緒是連續的(好吧,不是真的,但我們假設它們是)查詢佇列要求工作。當有可用的工作時,執行緒從佇列中獲取工作並執行它。如果兩個執行緒試圖同時執行相同的工作會發生什麼? 好吧,程式會崩潰。

為了避免這種問題,我在標準C ++ Queue上實現了一個包裝器,它使用mutex來限制併發訪問。 讓我們看一下SafeQueue類的一小部分

示例:

void enqueue(T& t) {    std::unique_lock lock(m_mutex);    m_queue。push(t);}

要排隊我們做的第一件事就是鎖定互斥鎖以確保沒有其他人正在訪問該資源。然後,我們將元素推送到佇列中。 當鎖超出範圍時,它會自動釋放。好嗎,對吧?這樣,我們使Queue執行緒安全,因此我們不必擔心許多執行緒在相同的“時間”訪問和/或修改它。

2。2 提交函式

執行緒池最重要的方法是負責向佇列新增工作的方法。我打電話給這個方法提交。不難理解它是如何工作的,但它的實現起初可能看起來很嚇人。讓我們考慮應該做什麼,之後我們會擔心如何做到這一點。 什麼:

接受任何引數的任何函式。

立即返回“東西”以避免阻塞主執行緒。 此返回的物件最終應包含操作的結果。

完整的提交函式如下所示:

// Submit a function to be executed asynchronously by the pool template auto submit(F&& f, Args&&。。。 args) -> std::future {    // Create a function with bounded parameters ready to execute    std::function func = std::bind(std::forward(f), std::forward(args)。。。);    // Encapsulate it into a shared ptr in order to be able to copy construct / assign     auto task_ptr = std::make_shared>(func);    // Wrap packaged task into void function    std::function wrapper_func = [task_ptr]() {      (*task_ptr)();     };    // Enqueue generic wrapper function    m_queue。enqueue(wrapperfunc);    // Wake up one thread if its waiting    m_conditional_lock。notify_one();     // Return future from promise    return task_ptr->get_future();}

2。3 佇列完整原始碼

// SafeQueue。h #pragma once#include #include // Thread safe implementation of a Queue using a std::queuetemplate class SafeQueue {private:  std::queue m_queue; //利用模板函式構造佇列  std::mutex m_mutex;//訪問互斥訊號量public:  SafeQueue() { //空建構函式  }  SafeQueue(SafeQueue& other) {//複製建構函式    //TODO:  }  ~SafeQueue() { //解構函式  }  bool empty() {  //佇列是否為空    std::unique_lock lock(m_mutex); //互斥訊號變數加鎖,防止m_queue被改變    return m_queue。empty();  }    int size() {    std::unique_lock lock(m_mutex); //互斥訊號變數加鎖,防止m_queue被改變    return m_queue。size();  }//佇列新增元素  void enqueue(T& t) {    std::unique_lock lock(m_mutex);    m_queue。push(t);  }//佇列取出元素  bool dequeue(T& t) {    std::unique_lock lock(m_mutex); //佇列加鎖    if (m_queue。empty()) {      return false;    }    t = std::move(m_queue。front()); //取出隊首元素,返回隊首元素值,並進行右值引用        m_queue。pop(); //彈出入隊的第一個元素    return true;  }};

2。4 執行緒池完整程式碼

//ThreadPool。h #pragma once#include #include #include #include #include #include #include #include “SafeQueue。h”class ThreadPool {private:  class ThreadWorker {//內建執行緒工作類  private:    int m_id; //工作id    ThreadPool * m_pool;//所屬執行緒池  public:    //建構函式    ThreadWorker(ThreadPool * pool, const int id)       : m_pool(pool), m_id(id) {    }    //過載`()`操作    void operator()() {      std::function func; //定義基礎函式類func            bool dequeued; //是否正在取出佇列中元素            //判斷執行緒池是否關閉,沒有關閉,迴圈提取      while (!m_pool->m_shutdown) {        {          //為執行緒環境鎖加鎖,互訪問工作執行緒的休眠和喚醒          std::unique_lock lock(m_pool->m_conditional_mutex);          //如果任務佇列為空,阻塞當前執行緒          if (m_pool->m_queue。empty()) {            m_pool->m_conditional_lock。wait(lock); //等待條件變數通知,開啟執行緒           }          //取出任務佇列中的元素          dequeued = m_pool->m_queue。dequeue(func);        }        //如果成功取出,執行工作函式        if (dequeued) {          func();        }      }    }  };  bool m_shutdown; //執行緒池是否關閉  SafeQueue> m_queue;//執行函式安全佇列,即任務佇列  std::vector m_threads; //工作執行緒佇列  std::mutex m_conditional_mutex;//執行緒休眠鎖互斥變數  std::condition_variable m_conditional_lock; //執行緒環境鎖,讓執行緒可以處於休眠或者喚醒狀態public:    //執行緒池建構函式  ThreadPool(const int n_threads)    : m_threads(std::vector(n_threads)), m_shutdown(false) {  }  ThreadPool(const ThreadPool &) = delete; //複製建構函式,並且取消預設父類建構函式  ThreadPool(ThreadPool &&) = delete; // 複製建構函式,允許右值引用  ThreadPool & operator=(const ThreadPool &) = delete; // 賦值操作  ThreadPool & operator=(ThreadPool &&) = delete; //賦值操作  // Inits thread pool  void init() {    for (int i = 0; i < m_threads。size(); ++i) {      m_threads[i] = std::thread(ThreadWorker(this, i));//分配工作執行緒    }  }  // Waits until threads finish their current task and shutdowns the pool  void shutdown() {    m_shutdown = true;    m_conditional_lock。notify_all(); //通知 喚醒所有工作執行緒        for (int i = 0; i < m_threads。size(); ++i) {      if(m_threads[i]。joinable()) { //判斷執行緒是否正在等待        m_threads[i]。join();  //將執行緒加入等待佇列      }    }  }

使用樣例程式碼

#include #include  #include “。。/include/ThreadPool。h”std::random_device rd; //真實隨機數產生器std::mt19937 mt(rd()); //生成計算隨機數mt;std::uniform_int_distribution dist(-1000, 1000);//生成-1000到1000之間的離散均勻分部數auto rnd = std::bind(dist, mt);//設定執行緒睡眠時間void simulate_hard_computation() {  std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));}// 新增兩個數字的簡單函式並列印結果void multiply(const int a, const int b) {  simulate_hard_computation();  const int res = a * b;  std::cout << a << “ * ” << b << “ = ” << res << std::endl;}//新增並輸出結果void multiply_output(int & out, const int a, const int b) {  simulate_hard_computation();  out = a * b;  std::cout << a << “ * ” << b << “ = ” << out << std::endl;}// 結果返回int multiply_return(const int a, const int b) {  simulate_hard_computation();  const int res = a * b;  std::cout << a << “ * ” << b << “ = ” << res << std::endl;  return res;}void example() {  // 建立3個執行緒的執行緒池  ThreadPool pool(3);  // 初始化執行緒池  pool。init();  // 提交乘法操作,總共30個  for (int i = 1; i < 3; ++i) {    for (int j = 1; j < 10; ++j) {      pool。submit(multiply, i, j);    }  }  // 使用ref傳遞的輸出引數提交函式  int output_ref;  auto future1 = pool。submit(multiply_output, std::ref(output_ref), 5, 6);  // 等待乘法輸出完成  future1。get();  std::cout << “Last operation result is equals to ” << output_ref << std::endl;  // 使用return引數提交函式  auto future2 = pool。submit(multiply_return, 5, 3);  // 等待乘法輸出完成  int res = future2。get();  std::cout << “Last operation result is equals to ” << res << std::endl;    //關閉執行緒池  pool。shutdown();}  // Submit a function to be executed asynchronously by the pool  //執行緒的主要工作函式,使用了後置返回型別,自動判斷函式返回值  template  auto submit(F&& f, Args&&。。。 args) -> std::future {    // Create a function with bounded parameters ready to execute    //     std::function func = std::bind(std::forward(f), std::forward(args)。。。);//連線函式和引數定義,特殊函式型別,避免左、右值錯誤    // Encapsulate it into a shared ptr in order to be able to copy construct // assign     //封裝獲取任務物件,方便另外一個執行緒檢視結果    auto task_ptr = std::make_shared>(func);    // Wrap packaged task into void function    //利用正則表示式,返回一個函式物件    std::function wrapper_func = [task_ptr]() {      (*task_ptr)();     };    // 佇列通用安全封包函式,並壓入安全佇列    m_queue。enqueue(wrapper_func);    // 喚醒一個等待中的執行緒    m_conditional_lock。notify_one();    // 返回先前註冊的任務指標    return task_ptr->get_future();  }};

3。 C++11執行緒池實現

c++11 加入了執行緒庫,從此告別了標準庫不支援併發的歷史。然而 c++ 對於多執行緒的支援還是比較低階,稍微高階一點的用法都需要自己去實現,譬如執行緒池、訊號量等。

#ifndef THREAD_POOL_H#define THREAD_POOL_H #include #include #include #include #include #include #include #include  // 名稱空間namespace ilovers {    class TaskExecutor;} class ilovers::TaskExecutor{    using Task = std::function;private:    // 執行緒池    std::vector pool;    // 任務佇列    std::queue tasks;    // 同步    std::mutex m_task;    std::condition_variable cv_task;    // 是否關閉提交    std::atomic stop;    public:    // 構造    TaskExecutor(size_t size = 4): stop {false}{        size = size < 1 ? 1 : size;        for(size_t i = 0; i< size; ++i){            pool。emplace_back(&TaskExecutor::schedual, this);    // push_back(std::thread{。。。})        }    }        // 析構    ~TaskExecutor(){        for(std::thread& thread : pool){            thread。detach();    // 讓執行緒“自生自滅”            //thread。join();        // 等待任務結束, 前提:執行緒一定會執行完        }    }        // 停止任務提交    void shutdown(){        this->stop。store(true);    }        // 重啟任務提交    void restart(){        this->stop。store(false);    }        // 提交一個任務    template    auto commit(F&& f, Args&&。。。 args) ->std::future {        if(stop。load()){    // stop == true ??            throw std::runtime_error(“task executor have closed commit。”);        }                using ResType =  decltype(f(args。。。));    // typename std::result_of::type, 函式 f 的返回值型別        auto task = std::make_shared>(                        std::bind(std::forward(f), std::forward(args)。。。)                );    // wtf !        {    // 新增任務到佇列            std::lock_guard lock {m_task};            tasks。emplace([task](){   // push(Task{。。。})                (*task)();            });        }        cv_task。notify_all();    // 喚醒執行緒執行                std::future future = task->get_future();        return future;    }    private:    // 獲取一個待執行的 task    Task get_one_task(){        std::unique_lock lock {m_task};        cv_task。wait(lock, [this](){ return !tasks。empty(); });    // wait 直到有 task        Task task {std::move(tasks。front())};    // 取一個 task        tasks。pop();        return task;    }        // 任務排程    void schedual(){        while(true){            if(Task task = get_one_task()){                task();    //            }else{                // return;    // done            }        }    }}; #endif

void f(){    std::cout << “hello, f !” << std::endl;} struct G{    int operator()(){        std::cout << “hello, g !” << std::endl;        return 42;    }};  int main()try{    ilovers::TaskExecutor executor {10};        std::future ff = executor。commit(f);    std::future fg = executor。commit(G{});    std::future fh = executor。commit([]()->std::string { std::cout << “hello, h !” << std::endl; return “hello,fh !”;});        executor。shutdown();        ff。get();    std::cout << fg。get() << “ ” << fh。get() << std::endl;    std::this_thread::sleep_for(std::chrono::seconds(5));    executor。restart();    // 重啟任務    executor。commit(f)。get();    //        std::cout << “end。。。” << std::endl;    return 0;}catch(std::exception& e){    std::cout << “some unhappy happened。。。 ” << e。what() << std::endl;}