Actor和CSP设计模式

简介

本文介绍两种并发设计中常用的设计模式,包括Actor和CSP模式。传统的并发设计经常都是通过共享内存加锁保证逻辑安全,这种模式有几个缺点,包括1 频繁加锁影响性能,2 耦合度高。后来大家提出了Actor和CSP设计模式。

Actor设计模式

简单点说,actor通过消息传递的方式与外界通信。消息传递是异步的。每个actor都有一个邮箱,该邮箱接收并缓存其他actor发过来的消息,actor一次只能同步处理一个消息,处理消息过程中,除了可以接收消息,不能做任何其他操作。
每一个类独立在一个线程里称作Actor,Actor之间通过队列通信,比如Actor1 发消息给Actor2, Actor2 发消息给Actor1都是投递到对方的队列中。好像给对方发邮件,对方从邮箱中取出一样。如下图

https://cdn.llfc.club/1696556496577.jpg

Actor模型的另一个好处就是可以消除共享状态,因为它每次只能处理一条消息,所以actor内部可以安全的处理状态,而不用考虑锁机制。

https://cdn.llfc.club/img_2f50c511653189037c3ac1a36c7962a2.png

我们在网络编程中对于逻辑层的处理就采用了将要处理的逻辑消息封装成包投递给逻辑队列,逻辑类从队列中消费的思想,其实就是一种Actor设计模式。Erlang是天然支持Actor的语言。

CSP模式

CSP 是 Communicating Sequential Process 的简称,中文可以叫做通信顺序进程,是一种并发编程模型,是一个很强大的并发数据模型,是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。相对于Actor模型,CSP中channel是第一类对象,它不关注发送消息的实体,而关注与发送消息时使用的channel。go是天然支持csp模式的语言。

CSP和Actor类似,只不过CSP将消息投递给channel,至于谁从channel中取数据,发送的一方是不关注的。简单的说Actor在发送消息前是直到接收方是谁,而接受方收到消息后也知道发送方是谁,更像是邮件的通信模式。而csp是完全解耦合的。

https://cdn.llfc.club/csp.png

无论Actor还是CSP,他们都有一个共同的特性”Do not communicate by sharing memory; instead, share memory by communicating”

go风格的csp

我们通过生产者和消费者模型给大家演示csp模式的使用方式,用go来做示例

  1. package main
  2. import (
  3. "cspdemo/message"
  4. "fmt"
  5. "os"
  6. "os/signal"
  7. "syscall"
  8. )
  9. var closeChan chan struct{}
  10. var sigs chan os.Signal
  11. func init() {
  12. //类似于auto
  13. sigs = make(chan os.Signal)
  14. //具体类型初始化
  15. closeChan = make(chan struct{})
  16. signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
  17. //可以理解为C++ 的匿名函数,或者js的匿名函数,此处通过go原语启动一个协程并行执行
  18. go func() {
  19. sig := <-sigs
  20. fmt.Println("receive signal is ", sig)
  21. close(closeChan)
  22. message.ConsumerInst().Exit()
  23. message.ProducerInst().Exit()
  24. }()
  25. }
  26. func main() {
  27. fmt.Println("Main Process begin!")
  28. <-closeChan
  29. message.ConsumerInst().Join()
  30. message.ProducerInst().Join()
  31. fmt.Println("Main Process exit!")
  32. }

在上面的代码中我们启动了一个协程监听Ctrl+C等退出操作,当收到Ctrl+C的信号后,会关闭closeChan这个channel。这样主函数中<-closeChan就会从channel中取出数据。然后等待消费者和生产者退出。

接下来我们将生产者和消费者的实现放入message包,先看下message公共数据的定义

  1. package message
  2. const MAX_COUNT = 200
  3. var msgChan = make(chan int, MAX_COUNT)

上面的代码中我们定义了一个channel,大小为200,大家可以理解为仓库的大小为200,生产者向仓库中投递数据如果达到200就会阻塞。直到有消费者从中消费数据,如果消费者发现channel中数据为0,则阻塞。

  1. package message
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. var producer *Producer = nil
  9. var producer_once sync.Once
  10. func init() {
  11. // Consumer1 = new(Consumer)
  12. //类似于C++ std::call_once
  13. producer_once.Do(func() {
  14. producer = new(Producer)
  15. producer._exited = make(chan struct{})
  16. producer._ctx, producer._cancel = context.WithCancel(context.Background())
  17. producer.StartWork()
  18. })
  19. }
  20. func ProducerInst() *Producer {
  21. return producer
  22. }
  23. type Producer struct {
  24. _exited chan struct{}
  25. _ctx context.Context
  26. _cancel context.CancelFunc
  27. }
  28. func (producer *Producer) Join() {
  29. <-producer._exited
  30. fmt.Println("producer exited")
  31. }
  32. func (producer *Producer) Exit() {
  33. producer._cancel()
  34. }
  35. func (producer *Producer) StartWork() {
  36. go func() {
  37. i := 0
  38. for {
  39. i++
  40. select {
  41. case <-producer._ctx.Done():
  42. {
  43. close(producer._exited)
  44. return
  45. }
  46. case msgChan <- i:
  47. fmt.Println("producer produce number is ", i)
  48. }
  49. time.Sleep(50 * time.Millisecond)
  50. }
  51. }()
  52. }

我们通过init函数中只调用一次的方式初始化了producer,之后生成了一个名为_exited的channel,用来通知Join返回。
同样我们还初始化了一个可以取消的context,主要是在Exit函数内调用cancel取消上下文,会触发StartWorkproducer._ctx.Done()进而终止生产工作,再发出退出信号,达到优雅退出的效果。

类似的消费者也是相似逻辑

  1. package message
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. )
  8. var consumer *Consumer = nil
  9. var consumer_once sync.Once
  10. func init() {
  11. // Consumer1 = new(Consumer)
  12. //类似于C++ std::call_once
  13. consumer_once.Do(func() {
  14. consumer = new(Consumer)
  15. consumer._exited = make(chan struct{})
  16. consumer._ctx, consumer._cancel = context.WithCancel(context.Background())
  17. consumer.StartWork()
  18. })
  19. }
  20. func ConsumerInst() *Consumer {
  21. return consumer
  22. }
  23. type Consumer struct {
  24. _exited chan struct{}
  25. _ctx context.Context
  26. _cancel context.CancelFunc
  27. }
  28. func (consumer *Consumer) Join() {
  29. <-consumer._exited
  30. fmt.Println("consumer exited")
  31. }
  32. func (consumer *Consumer) Exit() {
  33. consumer._cancel()
  34. }
  35. func (consumer *Consumer) StartWork() {
  36. go func() {
  37. i := 0
  38. for {
  39. select {
  40. case <-consumer._ctx.Done():
  41. {
  42. close(consumer._exited)
  43. return
  44. }
  45. case i = <-msgChan:
  46. fmt.Println("consumer consum number is ", i)
  47. }
  48. time.Sleep(100 * time.Millisecond)
  49. }
  50. }()
  51. }

C++ 风格的csp

C++是万能的,我们可以用C++实现一个类似于go的channel,采用csp模式解耦合,实现类似的生产者和消费者问题

  1. #include <iostream>
  2. #include <queue>
  3. #include <mutex>
  4. #include <condition_variable>
  5. template <typename T>
  6. class Channel {
  7. private:
  8. std::queue<T> queue_;
  9. std::mutex mtx_;
  10. std::condition_variable cv_producer_;
  11. std::condition_variable cv_consumer_;
  12. size_t capacity_;
  13. bool closed_ = false;
  14. public:
  15. Channel(size_t capacity = 0) : capacity_(capacity) {}
  16. bool send(T value) {
  17. std::unique_lock<std::mutex> lock(mtx_);
  18. cv_producer_.wait(lock, [this]() {
  19. // 对于无缓冲的channel,我们应该等待直到有消费者准备好
  20. return (capacity_ == 0 && queue_.empty()) || queue_.size() < capacity_ || closed_;
  21. });
  22. if (closed_) {
  23. return false;
  24. }
  25. queue_.push(value);
  26. cv_consumer_.notify_one();
  27. return true;
  28. }
  29. bool receive(T& value) {
  30. std::unique_lock<std::mutex> lock(mtx_);
  31. cv_consumer_.wait(lock, [this]() { return !queue_.empty() || closed_; });
  32. if (closed_ && queue_.empty()) {
  33. return false;
  34. }
  35. value = queue_.front();
  36. queue_.pop();
  37. cv_producer_.notify_one();
  38. return true;
  39. }
  40. void close() {
  41. std::unique_lock<std::mutex> lock(mtx_);
  42. closed_ = true;
  43. cv_producer_.notify_all();
  44. cv_consumer_.notify_all();
  45. }
  46. };
  47. // 示例使用
  48. int main() {
  49. Channel<int> ch(10); // 10缓冲的channel
  50. std::thread producer([&]() {
  51. for (int i = 0; i < 5; ++i) {
  52. ch.send(i);
  53. std::cout << "Sent: " << i << std::endl;
  54. }
  55. ch.close();
  56. });
  57. std::thread consumer([&]() {
  58. std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 故意延迟消费者开始消费
  59. int val;
  60. while (ch.receive(val)) {
  61. std::cout << "Received: " << val << std::endl;
  62. }
  63. });
  64. producer.join();
  65. consumer.join();
  66. return 0;
  67. }

简单来说就是通过条件变量实现通信的阻塞和同步的。

利用csp思想实现取款逻辑

《C++并发编程实战》一书中提及了用csp思想实现atm机取款逻辑,我根据书中思想,整理了通信的示意图,书中部分代码存在问题,也一并修复了。

https://cdn.llfc.club/1696562243686.jpg

主函数实现

  1. // Actor.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
  2. //
  3. #include <iostream>
  4. #include "message.h"
  5. #include "withdraw_msg.h"
  6. #include "atm.h"
  7. #include "dispatcher.h"
  8. #include "bank_matchine.h"
  9. #include "interface_matchine.h"
  10. int main()
  11. {
  12. bank_machine bank;
  13. interface_machine interface_hardware;
  14. atm machine(bank.get_sender(), interface_hardware.get_sender());
  15. std::thread bank_thread(&bank_machine::run, &bank);
  16. std::thread if_thread(&interface_machine::run, &interface_hardware);
  17. std::thread atm_thread(&atm::run, &machine);
  18. messaging::sender atmqueue(machine.get_sender());
  19. bool quit_pressed = false;
  20. while (!quit_pressed)
  21. {
  22. char c = getchar();
  23. switch (c)
  24. {
  25. case '0':
  26. case '1':
  27. case '2':
  28. case '3':
  29. case '4':
  30. case '5':
  31. case '6':
  32. case '7':
  33. case '8':
  34. case '9':
  35. atmqueue.send(digit_pressed(c));
  36. break;
  37. case 'b':
  38. atmqueue.send(balance_pressed());
  39. break;
  40. case 'w':
  41. atmqueue.send(withdraw_pressed(50));
  42. break;
  43. case 'c':
  44. atmqueue.send(cancel_pressed());
  45. break;
  46. case 'q':
  47. quit_pressed = true;
  48. break;
  49. case 'i':
  50. atmqueue.send(card_inserted("acc1234"));
  51. break;
  52. }
  53. }
  54. bank.done();
  55. machine.done();
  56. interface_hardware.done();
  57. atm_thread.join();
  58. bank_thread.join();
  59. if_thread.join();
  60. }

主函数中启动了三个线程,分别处理bank,machine以及interface的操作。
由于代码复杂解析来只列举atm类的实现

  1. #pragma once
  2. #include "dispatcher.h"
  3. #include <functional>
  4. #include <iostream>
  5. class atm
  6. {
  7. messaging::receiver incoming;
  8. messaging::sender bank;
  9. messaging::sender interface_hardware;
  10. void (atm::* state)();
  11. std::string account;
  12. unsigned withdrawal_amount;
  13. std::string pin;
  14. void process_withdrawal()
  15. {
  16. incoming.wait().handle<withdraw_ok, std::function<void(withdraw_ok const& msg)>,
  17. messaging::dispatcher >(
  18. [&](withdraw_ok const& msg)
  19. {
  20. interface_hardware.send(
  21. issue_money(withdrawal_amount));
  22. bank.send(
  23. withdrawal_processed(account, withdrawal_amount));
  24. state = &atm::done_processing;
  25. }, "withdraw_ok").handle<withdraw_denied, std::function<void(withdraw_denied const& msg)>>(
  26. [&](withdraw_denied const& msg)
  27. {
  28. interface_hardware.send(display_insufficient_funds());
  29. state = &atm::done_processing;
  30. }, "withdraw_denied").handle<cancel_pressed, std::function<void(cancel_pressed const& msg)>>(
  31. [&](cancel_pressed const& msg)
  32. {
  33. bank.send(
  34. cancel_withdrawal(account, withdrawal_amount));
  35. interface_hardware.send(
  36. display_withdrawal_cancelled());
  37. state = &atm::done_processing;
  38. }, "cancel_pressed"
  39. );
  40. }
  41. void process_balance()
  42. {
  43. incoming.wait()
  44. .handle<balance, std::function<void(balance const& msg)>,
  45. messaging::dispatcher>(
  46. [&](balance const& msg)
  47. {
  48. interface_hardware.send(display_balance(msg.amount));
  49. state = &atm::wait_for_action;
  50. },"balance"
  51. ).handle < cancel_pressed, std::function<void(cancel_pressed const& msg) >>(
  52. [&](cancel_pressed const& msg)
  53. {
  54. state = &atm::done_processing;
  55. }, "cancel_pressed"
  56. );
  57. }
  58. void wait_for_action()
  59. {
  60. interface_hardware.send(display_withdrawal_options());
  61. incoming.wait()
  62. .handle<withdraw_pressed, std::function<void(withdraw_pressed const& msg)>,
  63. messaging::dispatcher>(
  64. [&](withdraw_pressed const& msg)
  65. {
  66. withdrawal_amount = msg.amount;
  67. bank.send(withdraw(account, msg.amount, incoming));
  68. state = &atm::process_withdrawal;
  69. }, "withdraw_pressed"
  70. ).handle < balance_pressed, std::function<void(balance_pressed const& msg) >>(
  71. [&](balance_pressed const& msg)
  72. {
  73. bank.send(get_balance(account, incoming));
  74. state = &atm::process_balance;
  75. }, "balance_pressed"
  76. ).handle<cancel_pressed, std::function<void(cancel_pressed const& msg) >>(
  77. [&](cancel_pressed const& msg)
  78. {
  79. state = &atm::done_processing;
  80. }, "cancel_pressed"
  81. );
  82. }
  83. void verifying_pin()
  84. {
  85. incoming.wait()
  86. .handle<pin_verified, std::function<void(pin_verified const& msg)>,
  87. messaging::dispatcher>(
  88. [&](pin_verified const& msg)
  89. {
  90. state = &atm::wait_for_action;
  91. }, "pin_verified"
  92. ).handle<pin_incorrect, std::function<void(pin_incorrect const& msg)>>(
  93. [&](pin_incorrect const& msg)
  94. {
  95. interface_hardware.send(
  96. display_pin_incorrect_message());
  97. state = &atm::done_processing;
  98. }, "pin_incorrect"
  99. ).handle<cancel_pressed, std::function<void(cancel_pressed const& msg)>>(
  100. [&](cancel_pressed const& msg)
  101. {
  102. state = &atm::done_processing;
  103. }, "cancel_pressed"
  104. );
  105. }
  106. void getting_pin()
  107. {
  108. incoming.wait().handle<digit_pressed, std::function<void(digit_pressed const& msg)>,
  109. messaging::dispatcher>(
  110. [&](digit_pressed const& msg)
  111. {
  112. unsigned const pin_length = 6;
  113. pin += msg.digit;
  114. if (pin.length() == pin_length)
  115. {
  116. bank.send(verify_pin(account, pin, incoming));
  117. state = &atm::verifying_pin;
  118. }
  119. }, "digit_pressed"
  120. ).handle<clear_last_pressed, std::function<void(clear_last_pressed const& msg)>>(
  121. [&](clear_last_pressed const& msg)
  122. {
  123. if (!pin.empty())
  124. {
  125. pin.pop_back();
  126. }
  127. }, "clear_last_pressed"
  128. ).handle<cancel_pressed, std::function<void(cancel_pressed const& msg)>>(
  129. [&](cancel_pressed const& msg)
  130. {
  131. state = &atm::done_processing;
  132. }, "cancel_pressed"
  133. );
  134. }
  135. void waiting_for_card()
  136. {
  137. interface_hardware.send(display_enter_card());
  138. incoming.wait().handle<card_inserted, std::function<void(card_inserted const& msg)>,
  139. messaging::dispatcher>(
  140. [&](card_inserted const& msg)
  141. {
  142. account = msg.account;
  143. pin = "";
  144. interface_hardware.send(display_enter_pin());
  145. state = &atm::getting_pin;
  146. }, "card_inserted"
  147. );
  148. }
  149. void done_processing()
  150. {
  151. interface_hardware.send(eject_card());
  152. state = &atm::waiting_for_card;
  153. }
  154. atm(atm const&) = delete;
  155. atm& operator=(atm const&) = delete;
  156. public:
  157. atm(messaging::sender bank_,
  158. messaging::sender interface_hardware_) :
  159. bank(bank_), interface_hardware(interface_hardware_)
  160. {}
  161. void done()
  162. {
  163. get_sender().send(messaging::close_queue());
  164. }
  165. void run()
  166. {
  167. state = &atm::waiting_for_card;
  168. try
  169. {
  170. for (;;)
  171. {
  172. (this->*state)();
  173. }
  174. }
  175. catch (messaging::close_queue const&)
  176. {
  177. }
  178. }
  179. messaging::sender get_sender()
  180. {
  181. return incoming;
  182. }
  183. };

atm 主要功能就是通过状态机不断地切换状态监听想要处理的函数。

详细源码可参考https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day08-actor/Actor

总结

本文讲述了Actor设计模式和CSP设计模式,并通过go和C++等语言给大家展示了csp并发设计的demo,最后通过讲解《C++并发编程实战》中取款的案例,展示了csp的用法。

详细源码

https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day08-actor/Actor

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

热门评论

热门文章

  1. 解密定时器的实现细节

    喜欢(566) 浏览(3106)
  2. C++ 类的继承封装和多态

    喜欢(588) 浏览(4389)
  3. Linux环境搭建和编码

    喜欢(594) 浏览(10281)
  4. windows环境搭建和vscode配置

    喜欢(587) 浏览(2408)
  5. slice介绍和使用

    喜欢(521) 浏览(2248)

最新评论

  1. C++ 虚函数表原理和类成员内存分布 WangQi888888:class Test{ int m; int b; }中b成员是int,为什么在内存中只占了1个字节。不应该是4个字节吗?是不是int应该改为char。这样的话就会符合图上说明的情况
  2. 解决博客回复区被脚本注入的问题 secondtonone1:走到现在我忽然明白一个道理,无论工作也好生活也罢,最重要的是开心,即使一份安稳的工作不能给我带来事业上的积累也要合理的舍弃,所以我还是想去做喜欢的方向。
  3. asio多线程模型IOServicePool Lion:线程池一定要继承单例模式吗
  4. 泛型算法的定制操作 secondtonone1:lambda和bind是C11新增的利器,善于利用这两个机制可以极大地提升编程安全性和效率。
  5. 类和对象 陈宇航:支持!!!!

个人公众号

个人微信