几种简单并行算法的实现(for_each,find以及partial_sum)

简介

前文介绍了几种数据划分的方式,包括按照线程数量划分,按照递归方式划分,以及按照任务类型划分等。

本文结合之前的划分方式,基于stl的find, for_each以及partial_sum等算法实现并行版本。

并行版本for_each

实现并行的for_each,最简单的方式就是将数据划分,每个线程分别处理一段连续的数据即可。

在介绍并行版本之前,我们先实现一个管理线程 的类join_threads,用来管控线程防止线程过早退出

  1. class join_threads
  2. {
  3. std::vector<std::thread>& threads;
  4. public:
  5. explicit join_threads(std::vector<std::thread>& threads_) :
  6. threads(threads_)
  7. {}
  8. ~join_threads()
  9. {
  10. for (unsigned long i = 0; i < threads.size(); ++i)
  11. {
  12. if (threads[i].joinable())
  13. threads[i].join();
  14. }
  15. }
  16. };

接下来我们实现第一种方式

  1. template<typename Iterator, typename Func>
  2. void parallel_for_each(Iterator first, Iterator last, Func f)
  3. {
  4. unsigned long const length = std::distance(first, last);
  5. if (!length)
  6. return;
  7. unsigned long const min_per_thread = 25;
  8. unsigned long const max_threads =
  9. (length + min_per_thread - 1) / min_per_thread;
  10. unsigned long const hardware_threads =
  11. std::thread::hardware_concurrency();
  12. unsigned long const num_threads =
  13. std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
  14. unsigned long const block_size = length / num_threads;
  15. std::vector<std::future<void>> futures(num_threads - 1); //⇽-- - 1
  16. std::vector<std::thread> threads(num_threads - 1);
  17. join_threads joiner(threads);
  18. Iterator block_start = first;
  19. for (unsigned long i = 0; i < (num_threads - 1); ++i)
  20. {
  21. Iterator block_end = block_start;
  22. std::advance(block_end, block_size);
  23. std::packaged_task<void(void)> task( // ⇽-- - 2
  24. [=]()
  25. {
  26. std::for_each(block_start, block_end, f);
  27. });
  28. futures[i] = task.get_future();
  29. threads[i] = std::thread(std::move(task)); //⇽-- - 3
  30. block_start = block_end;
  31. }
  32. std::for_each(block_start, last, f);
  33. for (unsigned long i = 0; i < (num_threads - 1); ++i)
  34. {
  35. futures[i].get(); // ⇽-- - 4
  36. }
  37. }

1 我们规定如果处理的数量不超过25个则用单线程。否则根据处理的数量划分任务,计算开辟的线程数,如果要开辟的线程数大于内核线程的数量,则以内核线程数为准。

2 根据实际开辟的线程数num_threads计算每个线程处理的块大小。并且初始化两个vector,分别用来存储处理结果的future和处理任务的线程。

3 我们在(2处)代码生成了一个任务task,然后获取future赋值给vector对应下标为i的future元素,并且把任务绑定给对应下标为i的thread。

4 numthreads-1个线程并行处理for_each,剩下的主线程处理余下的for_each,最后通过futures.get汇总

第二种划分方式是我们采取递归的方式,我们知道采用递归的方式无法提前开辟准确数量的线程,我们采用async帮我们完成这个任务

  1. template<typename Iterator, typename Func>
  2. void async_for_each(Iterator first, Iterator last, Func f)
  3. {
  4. unsigned long const length = std::distance(first, last);
  5. if (!length)
  6. return;
  7. unsigned long const min_per_thread = 25;
  8. if (length < (2 * min_per_thread))
  9. {
  10. std::for_each(first, last, f); //⇽-- - 1
  11. }
  12. else
  13. {
  14. Iterator const mid_point = first + length / 2;
  15. //⇽-- - 2
  16. std::future<void> first_half = std::async(&async_for_each<Iterator, Func>,
  17. first, mid_point, f);
  18. //⇽-- - 3
  19. async_for_each(mid_point, last, f);
  20. // ⇽-- - 4
  21. first_half.get();
  22. }
  23. }

async可以帮助我们判断是否需要开启线程还是自动串行执行。每次我们将要处理的数据一分为2,前半部分交给一个async开辟线程处理,后半部分在本线程处理。而所谓的本线程不一定是主线程,因为我们通过async递归执行parallel_for_each,也就相当于在一个线程里独立执行了。

find的并行实现

find 的并行查找方式还是分两种,一种是将要查找的区间划分为几个段,每段交给一个线程查找。

另一种是采用递归的方式每次折半,前半部分交给一个线程查找,后半部分留在本线程查找。

我们先说第一种

find比较特殊,我们要防止线程忙等待,也要防止线程在其他线程已经查找到值后做无谓的浪费。可以用一个共享的全局atomic变量表示是否找到目标。

因为主线程要获取某个线程查找到的迭代器位置,所以我们用promise 设置 value为迭代器

  1. template<typename Iterator, typename MatchType>
  2. Iterator parallel_find(Iterator first, Iterator last, MatchType match)
  3. {
  4. struct find_element //⇽-- - 1
  5. {
  6. void operator()(Iterator begin,Iterator end,
  7. MatchType match,
  8. std::promise<Iterator>*result,
  9. std::atomic<bool>*done_flag)
  10. {
  11. try
  12. {
  13. for (; (begin != end) && !done_flag->load(); ++begin) //⇽-- - 2
  14. {
  15. if (*begin == match)
  16. {
  17. result->set_value(begin); //⇽-- - 3
  18. done_flag->store(true); //⇽-- - 4
  19. return;
  20. }
  21. }
  22. }
  23. catch (...) //⇽-- - 5
  24. {
  25. try
  26. {
  27. result->set_exception(std::current_exception()); //⇽-- - 6
  28. done_flag->store(true);
  29. }
  30. catch (...) //⇽-- - 7
  31. {}
  32. }
  33. }
  34. };
  35. unsigned long const length = std::distance(first, last);
  36. if (!length)
  37. return last;
  38. unsigned long const min_per_thread = 25;
  39. unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
  40. unsigned long const hardware_threads = std::thread::hardware_concurrency();
  41. unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
  42. unsigned long const block_size = length / num_threads;
  43. std::promise<Iterator> result; //⇽-- - 8
  44. std::atomic<bool> done_flag(false); //⇽-- - 9
  45. std::vector<std::thread> threads(num_threads - 1); //⇽-- - 10
  46. {
  47. join_threads joiner(threads);
  48. Iterator block_start = first;
  49. for (unsigned long i = 0; i < (num_threads - 1); ++i)
  50. {
  51. Iterator block_end = block_start;
  52. std::advance(block_end, block_size);
  53. // ⇽-- - 11
  54. threads[i] = std::thread(find_element(), block_start, block_end, match, &result, &done_flag);
  55. block_start = block_end;
  56. }
  57. // ⇽-- - 12
  58. find_element()(block_start, last, match, &result, &done_flag);
  59. }
  60. // ⇽-- - 13
  61. if (!done_flag.load())
  62. {
  63. return last;
  64. }
  65. //⇽-- - 14
  66. return result.get_future().get();
  67. }

1 find_element重载了()运算符,接受四个参数,分别是迭代器的开始,迭代起的结束,要查找的数值,以及用来通知外部的promise,还有线程之间用来检测是否有某个线程完成查找的原子变量。

2 find_element重载()的逻辑就是查找这个区间内满足某个值的位置,并将这个位置的迭代起设置到promise中,然后将完成的原子变量标记为true。

说第二种方式,利用递归折半查找,我们可以用async帮助我们完成并行任务。

  1. template<typename Iterator, typename MatchType>
  2. Iterator parallel_find_impl(Iterator first, Iterator last, MatchType match,
  3. std::atomic<bool>& done) // ⇽-- - 1
  4. {
  5. try
  6. {
  7. unsigned long const length = std::distance(first,last);
  8. unsigned long const min_per_thread = 25; // ⇽-- - 2
  9. if (length < (2 * min_per_thread)) //⇽-- - 3
  10. {
  11. for (; (first != last) && !done.load(); ++first) //⇽-- - 4
  12. {
  13. if (*first == match)
  14. {
  15. done = true; //⇽-- - 5
  16. return first;
  17. }
  18. }
  19. return last; //⇽-- - 6
  20. }
  21. else
  22. {
  23. //⇽-- - 7
  24. Iterator const mid_point = first + (length / 2);
  25. //⇽-- - 8
  26. std::future<Iterator> async_result = std::async(&parallel_find_impl<Iterator,MatchType>,
  27. mid_point,last,match,std::ref(done));
  28. //⇽-- - 9
  29. Iterator const direct_result = parallel_find_impl(first,mid_point,match,done);
  30. //⇽-- - 10
  31. return (direct_result == mid_point) ?async_result.get() : direct_result;
  32. }
  33. }
  34. catch (...)
  35. {
  36. // ⇽-- - 11
  37. done = true;
  38. throw;
  39. }
  40. }
  41. template<typename Iterator, typename MatchType>
  42. Iterator parallel_find_async(Iterator first, Iterator last, MatchType match)
  43. {
  44. std::atomic<bool> done(false);
  45. //⇽-- - 12
  46. return parallel_find_impl(first, last, match, done);
  47. }

1 并行查找的方式种我们先根据长度是否小于50决定是否开启并行任务,如果小于50则采取单线程方式。

2 如果采用并行的方式,我们将长度折半,前半部分交给async,后半部分交给本线程。

3 最后我们在主线程中汇合,获取结果。

partial_sum并行版本

C++ 提供了累计计算求和的功能,比如一个vector中存储的数据为{1,2,3},那么经过计算,第一个元素仍然为1,第二个元素为1+2, 第三个元素为1+2+3,结果为{1,3,6}.

关于并行版本我们可以这么思考,假设元数组为{1,2,3,4,5,6,7},那我们可以划分为三个部分,第一部分为{1,2,3}交给第一个线程处理, 第二部分{4,5,6}交给第二个线程处理,7交给本线程处理。

但是我们要考虑的一个问题是线程2要用到线程1最后计算的结果,线程1计算后{1,3,6},线程2需要用到6做累加,我们可以先让线程1计算出第3个元素值6,再将这个6传递给线程2,剩下的就可以并行计算了。同样的道理本线程要处理最后一个元素的累加结果,他需要等到线程2处理完第6个元素的值。

所以基本思路是每个线程优先处理分区的最后一个元素,通过promise设置给其他线程,在这个阶段线程之间是串行的,等到所有线程都开始计算其他位置后就是并行了。

  1. template<typename Iterator>
  2. void parallel_partial_sum(Iterator first, Iterator last)
  3. {
  4. typedef typename Iterator::value_type value_type;
  5. struct process_chunk //⇽-- - 1
  6. {
  7. void operator()(Iterator begin, Iterator last,
  8. std::future<value_type>* previous_end_value,
  9. std::promise<value_type>* end_value)
  10. {
  11. try
  12. {
  13. Iterator end = last;
  14. ++end;
  15. std::partial_sum(begin, end, begin); //⇽-- - 2
  16. if (previous_end_value) //⇽-- - 3
  17. {
  18. value_type addend = previous_end_value->get(); // ⇽-- - 4
  19. *last += addend; // ⇽-- - 5
  20. if (end_value)
  21. {
  22. end_value->set_value(*last); //⇽-- - 6
  23. }
  24. // ⇽-- - 7
  25. std::for_each(begin, last, [addend](value_type& item)
  26. {
  27. item += addend;
  28. });
  29. }
  30. else if (end_value)
  31. {
  32. // ⇽-- - 8
  33. end_value->set_value(*last);
  34. }
  35. }
  36. catch (...) // ⇽-- - 9
  37. {
  38. if (end_value)
  39. {
  40. end_value->set_exception(std::current_exception()); // ⇽-- - 10
  41. }
  42. else
  43. {
  44. throw; // ⇽-- - 11
  45. }
  46. }
  47. }
  48. };
  49. unsigned long const length = std::distance(first, last);
  50. if (!length) {
  51. return;
  52. }
  53. unsigned long const min_per_thread = 25; //⇽-- - 12
  54. unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
  55. unsigned long const hardware_threads = std::thread::hardware_concurrency();
  56. unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
  57. unsigned long const block_size = length / num_threads;
  58. typedef typename Iterator::value_type value_type;
  59. std::vector<std::thread> threads(num_threads - 1); // ⇽-- - 13
  60. std::vector<std::promise<value_type> > end_values(num_threads - 1); // ⇽-- - 14
  61. std::vector<std::future<value_type> > previous_end_values; // ⇽-- - 15
  62. previous_end_values.reserve(num_threads - 1); // ⇽-- - 16
  63. join_threads joiner(threads);
  64. Iterator block_start = first;
  65. for (unsigned long i = 0; i < (num_threads - 1); ++i)
  66. {
  67. Iterator block_last = block_start;
  68. std::advance(block_last, block_size - 1); // ⇽-- - 17
  69. // ⇽-- - 18
  70. threads[i] = std::thread(process_chunk(), block_start, block_last,
  71. (i != 0) ? &previous_end_values[i - 1] : 0,
  72. &end_values[i]);
  73. block_start = block_last;
  74. ++block_start; // ⇽-- - 19
  75. previous_end_values.push_back(end_values[i].get_future()); // ⇽-- - 20
  76. }
  77. Iterator final_element = block_start;
  78. std::advance(final_element, std::distance(block_start, last) - 1); // ⇽-- - 21
  79. // ⇽-- - 22
  80. process_chunk()(block_start, final_element, (num_threads > 1) ? &previous_end_values.back() : 0,
  81. 0);
  82. }

1 定义了process_chunk类,重载了()运算符,在重载的逻辑里我们先计算区间内的partial_sum累计求和(2处)

2 因为我们处理的区间不一定是首个区间,也就是他还需要加上前面区间处理得出的最后一个元素的值,所以我们通过previouse_end_value判断本区间不是首个区间,并且加上前面处理的结果。优先将最后一个值计算出来设置给promise。然后在利用for_each遍历计算其他位置的值。

总结

本文介绍了如何并行设计stl的相关算法,读者有好的思路可以互相交流一下。

测试代码和项目代码链接:

https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day14-ThreadSafeContainer

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

热门评论

热门文章

  1. Linux环境搭建和编码

    喜欢(594) 浏览(13624)
  2. Qt环境搭建

    喜欢(517) 浏览(24871)
  3. vscode搭建windows C++开发环境

    喜欢(596) 浏览(83977)
  4. 聊天项目(28) 分布式服务通知好友申请

    喜欢(507) 浏览(6115)
  5. 使用hexo搭建个人博客

    喜欢(533) 浏览(11961)

最新评论

  1. 构造函数 secondtonone1:构造函数是类的基础知识,要着重掌握
  2. Qt MVC结构之QItemDelegate介绍 胡歌-此生不换:gpt, google
  3. boost::asio之socket的创建和连接 项空月:发现一些错别字 :每隔vector存储  是不是是每个. asio::mutable_buffers_1 o或者    是不是多打了个o
  4. 聊天项目(15) 客户端实现TCP管理者 lkx:已经在&QTcpSocket::readyRead 回调函数中做了处理了的。
  5. 解决博客回复区被脚本注入的问题 secondtonone1:走到现在我忽然明白一个道理,无论工作也好生活也罢,最重要的是开心,即使一份安稳的工作不能给我带来事业上的积累也要合理的舍弃,所以我还是想去做喜欢的方向。
  6. Qt 对话框 Spade2077:QDialog w(); //这里是不是不需要带括号
  7. visual studio配置boost库 一giao里我离giaogiao:请问是修改成这样吗:.\b2.exe toolset=MinGW
  8. 处理网络粘包问题 zyouth: //消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里 if (bytes_transferred < data_len) { memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); _recv_msg_node->_cur_len += bytes_transferred; ::memset(_data, 0, MAX_LENGTH); _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self)); //头部处理完成 _b_head_parse = true; return; } 把_b_head_parse = true;放在_socket.async_read_some前面是不是更好
  9. 类和对象 陈宇航:支持!!!!
  10. C++ 并发三剑客future, promise和async Yunfei:大佬您好,如果这个线程池中加入的异步任务的形参如果有右值引用,这个commit中的返回类型推导和bind绑定就会出现问题,请问实际工程中,是不是不会用到这种任务,如果用到了,应该怎么解决?
  11. string类 WangQi888888:确实错了,应该是!isspace(sind[index]). 否则不进入循环,还是原来的字符串“some string”
  12. 再谈单例模式 secondtonone1:是的,C++11以后返回局部static变量对象能保证线程安全了。
  13. 答疑汇总(thread,async源码分析) Yagus:如果引用计数为0,则会执行 future 的析构进而等待任务执行完成,那么看到的输出将是 这边应该不对吧,std::future析构只在这三种情况都满足的时候才回block: 1.共享状态是std::async 创造的(类型是_Task_async_state) 2.共享状态没有ready 3.这个future是共享状态的最后一个引用 这边共享状态类型是“_Package_state”,引用计数即使为0也不应该block啊
  14. 创建项目和编译 secondtonone1:谢谢支持
  15. 聊天项目(9) redis服务搭建 pro_lin:redis线程池的析构函数,除了pop出队列,还要free掉redis连接把
  16. 面试题汇总(一) secondtonone1:看到网络上经常提问的go的问题,做了一下汇总,结合自己的经验给出的答案,如有纰漏,望指正批评。
  17. protobuf配置和使用 熊二:你可以把dll放到系统目录,也可以配置环境变量,还能把dll丢到lib里
  18. interface应用 secondtonone1:interface是万能类型,但是使用时要转换为实际类型来使用。interface丰富了go的多态特性,也降低了传统面向对象语言的耦合性。
  19. 堆排序 secondtonone1:堆排序非常实用,定时器就是这个原理制作的。
  20. 网络编程学习方法和图书推荐 Corleone:啥程度可以找工作
  21. 利用栅栏实现同步 Dzher:作者你好!我觉得 std::thread a(write_x); std::thread b(write_y); std::thread c(read_x_then_y); std::thread d(read_y_then_x); 这个例子中的assert fail并不会发生,原子变量设定了非relaxed内存序后一个线程的原子变量被写入,那么之后的读取一定会被同步的,c和d线程中只可能同时发生一个z++未执行的情况,最终z不是1就是2了,我测试了很多次都没有assert,请问我这个观点有什么错误,谢谢!
  22. 无锁并发队列 TenThousandOne:_head  和 _tail  替换为原子变量。那里pop的逻辑,val = _data[h] 可以移到循环外面吗
  23. 聊天项目(7) visualstudio配置grpc diablorrr:cmake文件得改一下 find_package(Boost REQUIRED COMPONENTS system filesystem),要加上filesystem。在target_link_libraries中也同样加上

个人公众号

个人微信