简介
前文介绍了几种数据划分的方式,包括按照线程数量划分,按照递归方式划分,以及按照任务类型划分等。
本文结合之前的划分方式,基于stl的find, for_each以及partial_sum等算法实现并行版本。
并行版本for_each
实现并行的for_each,最简单的方式就是将数据划分,每个线程分别处理一段连续的数据即可。
在介绍并行版本之前,我们先实现一个管理线程 的类join_threads,用来管控线程防止线程过早退出
class join_threads{std::vector<std::thread>& threads;public:explicit join_threads(std::vector<std::thread>& threads_) :threads(threads_){}~join_threads(){for (unsigned long i = 0; i < threads.size(); ++i){if (threads[i].joinable())threads[i].join();}}};
接下来我们实现第一种方式
template<typename Iterator, typename Func>void parallel_for_each(Iterator first, Iterator last, Func f){unsigned long const length = std::distance(first, last);if (!length)return;unsigned long const min_per_thread = 25;unsigned long const max_threads =(length + min_per_thread - 1) / min_per_thread;unsigned long const hardware_threads =std::thread::hardware_concurrency();unsigned long const num_threads =std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);unsigned long const block_size = length / num_threads;std::vector<std::future<void>> futures(num_threads - 1); //⇽-- - 1std::vector<std::thread> threads(num_threads - 1);join_threads joiner(threads);Iterator block_start = first;for (unsigned long i = 0; i < (num_threads - 1); ++i){Iterator block_end = block_start;std::advance(block_end, block_size);std::packaged_task<void(void)> task( // ⇽-- - 2[=](){std::for_each(block_start, block_end, f);});futures[i] = task.get_future();threads[i] = std::thread(std::move(task)); //⇽-- - 3block_start = block_end;}std::for_each(block_start, last, f);for (unsigned long i = 0; i < (num_threads - 1); ++i){futures[i].get(); // ⇽-- - 4}}
1 我们规定如果处理的数量不超过25个则用单线程。否则根据处理的数量划分任务,计算开辟的线程数,如果要开辟的线程数大于内核线程的数量,则以内核线程数为准。
2 根据实际开辟的线程数num_threads计算每个线程处理的块大小。并且初始化两个vector,分别用来存储处理结果的future和处理任务的线程。
3 我们在(2处)代码生成了一个任务task,然后获取future赋值给vector对应下标为i的future元素,并且把任务绑定给对应下标为i的thread。
4 numthreads-1个线程并行处理for_each,剩下的主线程处理余下的for_each,最后通过futures.get汇总
第二种划分方式是我们采取递归的方式,我们知道采用递归的方式无法提前开辟准确数量的线程,我们采用async帮我们完成这个任务
template<typename Iterator, typename Func>void async_for_each(Iterator first, Iterator last, Func f){unsigned long const length = std::distance(first, last);if (!length)return;unsigned long const min_per_thread = 25;if (length < (2 * min_per_thread)){std::for_each(first, last, f); //⇽-- - 1}else{Iterator const mid_point = first + length / 2;//⇽-- - 2std::future<void> first_half = std::async(&async_for_each<Iterator, Func>,first, mid_point, f);//⇽-- - 3async_for_each(mid_point, last, f);// ⇽-- - 4first_half.get();}}
async可以帮助我们判断是否需要开启线程还是自动串行执行。每次我们将要处理的数据一分为2,前半部分交给一个async开辟线程处理,后半部分在本线程处理。而所谓的本线程不一定是主线程,因为我们通过async递归执行parallel_for_each,也就相当于在一个线程里独立执行了。
find的并行实现
find 的并行查找方式还是分两种,一种是将要查找的区间划分为几个段,每段交给一个线程查找。
另一种是采用递归的方式每次折半,前半部分交给一个线程查找,后半部分留在本线程查找。
我们先说第一种
find比较特殊,我们要防止线程忙等待,也要防止线程在其他线程已经查找到值后做无谓的浪费。可以用一个共享的全局atomic变量表示是否找到目标。
因为主线程要获取某个线程查找到的迭代器位置,所以我们用promise 设置 value为迭代器
template<typename Iterator, typename MatchType>Iterator parallel_find(Iterator first, Iterator last, MatchType match){struct find_element //⇽-- - 1{void operator()(Iterator begin,Iterator end,MatchType match,std::promise<Iterator>*result,std::atomic<bool>*done_flag){try{for (; (begin != end) && !done_flag->load(); ++begin) //⇽-- - 2{if (*begin == match){result->set_value(begin); //⇽-- - 3done_flag->store(true); //⇽-- - 4return;}}}catch (...) //⇽-- - 5{try{result->set_exception(std::current_exception()); //⇽-- - 6done_flag->store(true);}catch (...) //⇽-- - 7{}}}};unsigned long const length = std::distance(first, last);if (!length)return last;unsigned long const min_per_thread = 25;unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;unsigned long const hardware_threads = std::thread::hardware_concurrency();unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);unsigned long const block_size = length / num_threads;std::promise<Iterator> result; //⇽-- - 8std::atomic<bool> done_flag(false); //⇽-- - 9std::vector<std::thread> threads(num_threads - 1); //⇽-- - 10{join_threads joiner(threads);Iterator block_start = first;for (unsigned long i = 0; i < (num_threads - 1); ++i){Iterator block_end = block_start;std::advance(block_end, block_size);// ⇽-- - 11threads[i] = std::thread(find_element(), block_start, block_end, match, &result, &done_flag);block_start = block_end;}// ⇽-- - 12find_element()(block_start, last, match, &result, &done_flag);}// ⇽-- - 13if (!done_flag.load()){return last;}//⇽-- - 14return result.get_future().get();}
1 find_element重载了()运算符,接受四个参数,分别是迭代器的开始,迭代起的结束,要查找的数值,以及用来通知外部的promise,还有线程之间用来检测是否有某个线程完成查找的原子变量。
2 find_element重载()的逻辑就是查找这个区间内满足某个值的位置,并将这个位置的迭代起设置到promise中,然后将完成的原子变量标记为true。
说第二种方式,利用递归折半查找,我们可以用async帮助我们完成并行任务。
template<typename Iterator, typename MatchType>Iterator parallel_find_impl(Iterator first, Iterator last, MatchType match,std::atomic<bool>& done) // ⇽-- - 1{try{unsigned long const length = std::distance(first,last);unsigned long const min_per_thread = 25; // ⇽-- - 2if (length < (2 * min_per_thread)) //⇽-- - 3{for (; (first != last) && !done.load(); ++first) //⇽-- - 4{if (*first == match){done = true; //⇽-- - 5return first;}}return last; //⇽-- - 6}else{//⇽-- - 7Iterator const mid_point = first + (length / 2);//⇽-- - 8std::future<Iterator> async_result = std::async(¶llel_find_impl<Iterator,MatchType>,mid_point,last,match,std::ref(done));//⇽-- - 9Iterator const direct_result = parallel_find_impl(first,mid_point,match,done);//⇽-- - 10return (direct_result == mid_point) ?async_result.get() : direct_result;}}catch (...){// ⇽-- - 11done = true;throw;}}template<typename Iterator, typename MatchType>Iterator parallel_find_async(Iterator first, Iterator last, MatchType match){std::atomic<bool> done(false);//⇽-- - 12return parallel_find_impl(first, last, match, done);}
1 并行查找的方式种我们先根据长度是否小于50决定是否开启并行任务,如果小于50则采取单线程方式。
2 如果采用并行的方式,我们将长度折半,前半部分交给async,后半部分交给本线程。
3 最后我们在主线程中汇合,获取结果。
partial_sum并行版本
C++ 提供了累计计算求和的功能,比如一个vector中存储的数据为{1,2,3},那么经过计算,第一个元素仍然为1,第二个元素为1+2, 第三个元素为1+2+3,结果为{1,3,6}.
关于并行版本我们可以这么思考,假设元数组为{1,2,3,4,5,6,7},那我们可以划分为三个部分,第一部分为{1,2,3}交给第一个线程处理, 第二部分{4,5,6}交给第二个线程处理,7交给本线程处理。
但是我们要考虑的一个问题是线程2要用到线程1最后计算的结果,线程1计算后{1,3,6},线程2需要用到6做累加,我们可以先让线程1计算出第3个元素值6,再将这个6传递给线程2,剩下的就可以并行计算了。同样的道理本线程要处理最后一个元素的累加结果,他需要等到线程2处理完第6个元素的值。
所以基本思路是每个线程优先处理分区的最后一个元素,通过promise设置给其他线程,在这个阶段线程之间是串行的,等到所有线程都开始计算其他位置后就是并行了。
template<typename Iterator>void parallel_partial_sum(Iterator first, Iterator last){typedef typename Iterator::value_type value_type;struct process_chunk //⇽-- - 1{void operator()(Iterator begin, Iterator last,std::future<value_type>* previous_end_value,std::promise<value_type>* end_value){try{Iterator end = last;++end;std::partial_sum(begin, end, begin); //⇽-- - 2if (previous_end_value) //⇽-- - 3{value_type addend = previous_end_value->get(); // ⇽-- - 4*last += addend; // ⇽-- - 5if (end_value){end_value->set_value(*last); //⇽-- - 6}// ⇽-- - 7std::for_each(begin, last, [addend](value_type& item){item += addend;});}else if (end_value){// ⇽-- - 8end_value->set_value(*last);}}catch (...) // ⇽-- - 9{if (end_value){end_value->set_exception(std::current_exception()); // ⇽-- - 10}else{throw; // ⇽-- - 11}}}};unsigned long const length = std::distance(first, last);if (!length) {return;}unsigned long const min_per_thread = 25; //⇽-- - 12unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;unsigned long const hardware_threads = std::thread::hardware_concurrency();unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);unsigned long const block_size = length / num_threads;typedef typename Iterator::value_type value_type;std::vector<std::thread> threads(num_threads - 1); // ⇽-- - 13std::vector<std::promise<value_type> > end_values(num_threads - 1); // ⇽-- - 14std::vector<std::future<value_type> > previous_end_values; // ⇽-- - 15previous_end_values.reserve(num_threads - 1); // ⇽-- - 16join_threads joiner(threads);Iterator block_start = first;for (unsigned long i = 0; i < (num_threads - 1); ++i){Iterator block_last = block_start;std::advance(block_last, block_size - 1); // ⇽-- - 17// ⇽-- - 18threads[i] = std::thread(process_chunk(), block_start, block_last,(i != 0) ? &previous_end_values[i - 1] : 0,&end_values[i]);block_start = block_last;++block_start; // ⇽-- - 19previous_end_values.push_back(end_values[i].get_future()); // ⇽-- - 20}Iterator final_element = block_start;std::advance(final_element, std::distance(block_start, last) - 1); // ⇽-- - 21// ⇽-- - 22process_chunk()(block_start, final_element, (num_threads > 1) ? &previous_end_values.back() : 0,0);}
1 定义了process_chunk类,重载了()运算符,在重载的逻辑里我们先计算区间内的partial_sum累计求和(2处)
2 因为我们处理的区间不一定是首个区间,也就是他还需要加上前面区间处理得出的最后一个元素的值,所以我们通过previouse_end_value判断本区间不是首个区间,并且加上前面处理的结果。优先将最后一个值计算出来设置给promise。然后在利用for_each遍历计算其他位置的值。
总结
本文介绍了如何并行设计stl的相关算法,读者有好的思路可以互相交流一下。
测试代码和项目代码链接:
https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day14-ThreadSafeContainer
视频链接
https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290