几种简单并行算法的实现(for_each,find以及partial_sum)

简介

前文介绍了几种数据划分的方式,包括按照线程数量划分,按照递归方式划分,以及按照任务类型划分等。

本文结合之前的划分方式,基于stl的find, for_each以及partial_sum等算法实现并行版本。

并行版本for_each

实现并行的for_each,最简单的方式就是将数据划分,每个线程分别处理一段连续的数据即可。

在介绍并行版本之前,我们先实现一个管理线程 的类join_threads,用来管控线程防止线程过早退出

  1. class join_threads
  2. {
  3. std::vector<std::thread>& threads;
  4. public:
  5. explicit join_threads(std::vector<std::thread>& threads_) :
  6. threads(threads_)
  7. {}
  8. ~join_threads()
  9. {
  10. for (unsigned long i = 0; i < threads.size(); ++i)
  11. {
  12. if (threads[i].joinable())
  13. threads[i].join();
  14. }
  15. }
  16. };

接下来我们实现第一种方式

  1. template<typename Iterator, typename Func>
  2. void parallel_for_each(Iterator first, Iterator last, Func f)
  3. {
  4. unsigned long const length = std::distance(first, last);
  5. if (!length)
  6. return;
  7. unsigned long const min_per_thread = 25;
  8. unsigned long const max_threads =
  9. (length + min_per_thread - 1) / min_per_thread;
  10. unsigned long const hardware_threads =
  11. std::thread::hardware_concurrency();
  12. unsigned long const num_threads =
  13. std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
  14. unsigned long const block_size = length / num_threads;
  15. std::vector<std::future<void>> futures(num_threads - 1); //⇽-- - 1
  16. std::vector<std::thread> threads(num_threads - 1);
  17. join_threads joiner(threads);
  18. Iterator block_start = first;
  19. for (unsigned long i = 0; i < (num_threads - 1); ++i)
  20. {
  21. Iterator block_end = block_start;
  22. std::advance(block_end, block_size);
  23. std::packaged_task<void(void)> task( // ⇽-- - 2
  24. [=]()
  25. {
  26. std::for_each(block_start, block_end, f);
  27. });
  28. futures[i] = task.get_future();
  29. threads[i] = std::thread(std::move(task)); //⇽-- - 3
  30. block_start = block_end;
  31. }
  32. std::for_each(block_start, last, f);
  33. for (unsigned long i = 0; i < (num_threads - 1); ++i)
  34. {
  35. futures[i].get(); // ⇽-- - 4
  36. }
  37. }

1 我们规定如果处理的数量不超过25个则用单线程。否则根据处理的数量划分任务,计算开辟的线程数,如果要开辟的线程数大于内核线程的数量,则以内核线程数为准。

2 根据实际开辟的线程数num_threads计算每个线程处理的块大小。并且初始化两个vector,分别用来存储处理结果的future和处理任务的线程。

3 我们在(2处)代码生成了一个任务task,然后获取future赋值给vector对应下标为i的future元素,并且把任务绑定给对应下标为i的thread。

4 numthreads-1个线程并行处理for_each,剩下的主线程处理余下的for_each,最后通过futures.get汇总

第二种划分方式是我们采取递归的方式,我们知道采用递归的方式无法提前开辟准确数量的线程,我们采用async帮我们完成这个任务

  1. template<typename Iterator, typename Func>
  2. void async_for_each(Iterator first, Iterator last, Func f)
  3. {
  4. unsigned long const length = std::distance(first, last);
  5. if (!length)
  6. return;
  7. unsigned long const min_per_thread = 25;
  8. if (length < (2 * min_per_thread))
  9. {
  10. std::for_each(first, last, f); //⇽-- - 1
  11. }
  12. else
  13. {
  14. Iterator const mid_point = first + length / 2;
  15. //⇽-- - 2
  16. std::future<void> first_half = std::async(&async_for_each<Iterator, Func>,
  17. first, mid_point, f);
  18. //⇽-- - 3
  19. async_for_each(mid_point, last, f);
  20. // ⇽-- - 4
  21. first_half.get();
  22. }
  23. }

async可以帮助我们判断是否需要开启线程还是自动串行执行。每次我们将要处理的数据一分为2,前半部分交给一个async开辟线程处理,后半部分在本线程处理。而所谓的本线程不一定是主线程,因为我们通过async递归执行parallel_for_each,也就相当于在一个线程里独立执行了。

find的并行实现

find 的并行查找方式还是分两种,一种是将要查找的区间划分为几个段,每段交给一个线程查找。

另一种是采用递归的方式每次折半,前半部分交给一个线程查找,后半部分留在本线程查找。

我们先说第一种

find比较特殊,我们要防止线程忙等待,也要防止线程在其他线程已经查找到值后做无谓的浪费。可以用一个共享的全局atomic变量表示是否找到目标。

因为主线程要获取某个线程查找到的迭代器位置,所以我们用promise 设置 value为迭代器

  1. template<typename Iterator, typename MatchType>
  2. Iterator parallel_find(Iterator first, Iterator last, MatchType match)
  3. {
  4. struct find_element //⇽-- - 1
  5. {
  6. void operator()(Iterator begin,Iterator end,
  7. MatchType match,
  8. std::promise<Iterator>*result,
  9. std::atomic<bool>*done_flag)
  10. {
  11. try
  12. {
  13. for (; (begin != end) && !done_flag->load(); ++begin) //⇽-- - 2
  14. {
  15. if (*begin == match)
  16. {
  17. result->set_value(begin); //⇽-- - 3
  18. done_flag->store(true); //⇽-- - 4
  19. return;
  20. }
  21. }
  22. }
  23. catch (...) //⇽-- - 5
  24. {
  25. try
  26. {
  27. result->set_exception(std::current_exception()); //⇽-- - 6
  28. done_flag->store(true);
  29. }
  30. catch (...) //⇽-- - 7
  31. {}
  32. }
  33. }
  34. };
  35. unsigned long const length = std::distance(first, last);
  36. if (!length)
  37. return last;
  38. unsigned long const min_per_thread = 25;
  39. unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
  40. unsigned long const hardware_threads = std::thread::hardware_concurrency();
  41. unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
  42. unsigned long const block_size = length / num_threads;
  43. std::promise<Iterator> result; //⇽-- - 8
  44. std::atomic<bool> done_flag(false); //⇽-- - 9
  45. std::vector<std::thread> threads(num_threads - 1); //⇽-- - 10
  46. {
  47. join_threads joiner(threads);
  48. Iterator block_start = first;
  49. for (unsigned long i = 0; i < (num_threads - 1); ++i)
  50. {
  51. Iterator block_end = block_start;
  52. std::advance(block_end, block_size);
  53. // ⇽-- - 11
  54. threads[i] = std::thread(find_element(), block_start, block_end, match, &result, &done_flag);
  55. block_start = block_end;
  56. }
  57. // ⇽-- - 12
  58. find_element()(block_start, last, match, &result, &done_flag);
  59. }
  60. // ⇽-- - 13
  61. if (!done_flag.load())
  62. {
  63. return last;
  64. }
  65. //⇽-- - 14
  66. return result.get_future().get();
  67. }

1 find_element重载了()运算符,接受四个参数,分别是迭代器的开始,迭代起的结束,要查找的数值,以及用来通知外部的promise,还有线程之间用来检测是否有某个线程完成查找的原子变量。

2 find_element重载()的逻辑就是查找这个区间内满足某个值的位置,并将这个位置的迭代起设置到promise中,然后将完成的原子变量标记为true。

说第二种方式,利用递归折半查找,我们可以用async帮助我们完成并行任务。

  1. template<typename Iterator, typename MatchType>
  2. Iterator parallel_find_impl(Iterator first, Iterator last, MatchType match,
  3. std::atomic<bool>& done) // ⇽-- - 1
  4. {
  5. try
  6. {
  7. unsigned long const length = std::distance(first,last);
  8. unsigned long const min_per_thread = 25; // ⇽-- - 2
  9. if (length < (2 * min_per_thread)) //⇽-- - 3
  10. {
  11. for (; (first != last) && !done.load(); ++first) //⇽-- - 4
  12. {
  13. if (*first == match)
  14. {
  15. done = true; //⇽-- - 5
  16. return first;
  17. }
  18. }
  19. return last; //⇽-- - 6
  20. }
  21. else
  22. {
  23. //⇽-- - 7
  24. Iterator const mid_point = first + (length / 2);
  25. //⇽-- - 8
  26. std::future<Iterator> async_result = std::async(&parallel_find_impl<Iterator,MatchType>,
  27. mid_point,last,match,std::ref(done));
  28. //⇽-- - 9
  29. Iterator const direct_result = parallel_find_impl(first,mid_point,match,done);
  30. //⇽-- - 10
  31. return (direct_result == mid_point) ?async_result.get() : direct_result;
  32. }
  33. }
  34. catch (...)
  35. {
  36. // ⇽-- - 11
  37. done = true;
  38. throw;
  39. }
  40. }
  41. template<typename Iterator, typename MatchType>
  42. Iterator parallel_find_async(Iterator first, Iterator last, MatchType match)
  43. {
  44. std::atomic<bool> done(false);
  45. //⇽-- - 12
  46. return parallel_find_impl(first, last, match, done);
  47. }

1 并行查找的方式种我们先根据长度是否小于50决定是否开启并行任务,如果小于50则采取单线程方式。

2 如果采用并行的方式,我们将长度折半,前半部分交给async,后半部分交给本线程。

3 最后我们在主线程中汇合,获取结果。

partial_sum并行版本

C++ 提供了累计计算求和的功能,比如一个vector中存储的数据为{1,2,3},那么经过计算,第一个元素仍然为1,第二个元素为1+2, 第三个元素为1+2+3,结果为{1,3,6}.

关于并行版本我们可以这么思考,假设元数组为{1,2,3,4,5,6,7},那我们可以划分为三个部分,第一部分为{1,2,3}交给第一个线程处理, 第二部分{4,5,6}交给第二个线程处理,7交给本线程处理。

但是我们要考虑的一个问题是线程2要用到线程1最后计算的结果,线程1计算后{1,3,6},线程2需要用到6做累加,我们可以先让线程1计算出第3个元素值6,再将这个6传递给线程2,剩下的就可以并行计算了。同样的道理本线程要处理最后一个元素的累加结果,他需要等到线程2处理完第6个元素的值。

所以基本思路是每个线程优先处理分区的最后一个元素,通过promise设置给其他线程,在这个阶段线程之间是串行的,等到所有线程都开始计算其他位置后就是并行了。

  1. template<typename Iterator>
  2. void parallel_partial_sum(Iterator first, Iterator last)
  3. {
  4. typedef typename Iterator::value_type value_type;
  5. struct process_chunk //⇽-- - 1
  6. {
  7. void operator()(Iterator begin, Iterator last,
  8. std::future<value_type>* previous_end_value,
  9. std::promise<value_type>* end_value)
  10. {
  11. try
  12. {
  13. Iterator end = last;
  14. ++end;
  15. std::partial_sum(begin, end, begin); //⇽-- - 2
  16. if (previous_end_value) //⇽-- - 3
  17. {
  18. value_type addend = previous_end_value->get(); // ⇽-- - 4
  19. *last += addend; // ⇽-- - 5
  20. if (end_value)
  21. {
  22. end_value->set_value(*last); //⇽-- - 6
  23. }
  24. // ⇽-- - 7
  25. std::for_each(begin, last, [addend](value_type& item)
  26. {
  27. item += addend;
  28. });
  29. }
  30. else if (end_value)
  31. {
  32. // ⇽-- - 8
  33. end_value->set_value(*last);
  34. }
  35. }
  36. catch (...) // ⇽-- - 9
  37. {
  38. if (end_value)
  39. {
  40. end_value->set_exception(std::current_exception()); // ⇽-- - 10
  41. }
  42. else
  43. {
  44. throw; // ⇽-- - 11
  45. }
  46. }
  47. }
  48. };
  49. unsigned long const length = std::distance(first, last);
  50. if (!length) {
  51. return;
  52. }
  53. unsigned long const min_per_thread = 25; //⇽-- - 12
  54. unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
  55. unsigned long const hardware_threads = std::thread::hardware_concurrency();
  56. unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
  57. unsigned long const block_size = length / num_threads;
  58. typedef typename Iterator::value_type value_type;
  59. std::vector<std::thread> threads(num_threads - 1); // ⇽-- - 13
  60. std::vector<std::promise<value_type> > end_values(num_threads - 1); // ⇽-- - 14
  61. std::vector<std::future<value_type> > previous_end_values; // ⇽-- - 15
  62. previous_end_values.reserve(num_threads - 1); // ⇽-- - 16
  63. join_threads joiner(threads);
  64. Iterator block_start = first;
  65. for (unsigned long i = 0; i < (num_threads - 1); ++i)
  66. {
  67. Iterator block_last = block_start;
  68. std::advance(block_last, block_size - 1); // ⇽-- - 17
  69. // ⇽-- - 18
  70. threads[i] = std::thread(process_chunk(), block_start, block_last,
  71. (i != 0) ? &previous_end_values[i - 1] : 0,
  72. &end_values[i]);
  73. block_start = block_last;
  74. ++block_start; // ⇽-- - 19
  75. previous_end_values.push_back(end_values[i].get_future()); // ⇽-- - 20
  76. }
  77. Iterator final_element = block_start;
  78. std::advance(final_element, std::distance(block_start, last) - 1); // ⇽-- - 21
  79. // ⇽-- - 22
  80. process_chunk()(block_start, final_element, (num_threads > 1) ? &previous_end_values.back() : 0,
  81. 0);
  82. }

1 定义了process_chunk类,重载了()运算符,在重载的逻辑里我们先计算区间内的partial_sum累计求和(2处)

2 因为我们处理的区间不一定是首个区间,也就是他还需要加上前面区间处理得出的最后一个元素的值,所以我们通过previouse_end_value判断本区间不是首个区间,并且加上前面处理的结果。优先将最后一个值计算出来设置给promise。然后在利用for_each遍历计算其他位置的值。

总结

本文介绍了如何并行设计stl的相关算法,读者有好的思路可以互相交流一下。

测试代码和项目代码链接:

https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day14-ThreadSafeContainer

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

热门评论

热门文章

  1. C++ 类的继承封装和多态

    喜欢(588) 浏览(2063)
  2. 解密定时器的实现细节

    喜欢(566) 浏览(1492)
  3. slice介绍和使用

    喜欢(521) 浏览(1510)
  4. Linux环境搭建和编码

    喜欢(594) 浏览(4349)
  5. windows环境搭建和vscode配置

    喜欢(587) 浏览(1579)

最新评论

  1. 构造函数 secondtonone1:构造函数是类的基础知识,要着重掌握
  2. 类和对象 陈宇航:支持!!!!
  3. visual studio配置boost库 secondtonone1:环境变量的方式我没搞过,回头我查一查补充一下。
  4. slice介绍和使用 恋恋风辰:切片作为引用类型极大的提高了数据传递的效率和性能,但也要注意切片的浅拷贝隐患,算是一把双刃剑,这世间的常态就是在两极之间寻求一种稳定。
  5. Linux环境搭建和编码 恋恋风辰:Linux环境下go的安装比较简单,可以不用设置GOPATH环境变量,后期我们学习go mod 之后就拜托了go文件目录的限制了。

个人公众号

个人微信