简介
本文介绍两种并发设计中常用的设计模式,包括Actor和CSP模式。传统的并发设计经常都是通过共享内存加锁保证逻辑安全,这种模式有几个缺点,包括1 频繁加锁影响性能,2 耦合度高。后来大家提出了Actor和CSP设计模式。
Actor设计模式
简单点说,actor通过消息传递的方式与外界通信。消息传递是异步的。每个actor都有一个邮箱,该邮箱接收并缓存其他actor发过来的消息,actor一次只能同步处理一个消息,处理消息过程中,除了可以接收消息,不能做任何其他操作。
每一个类独立在一个线程里称作Actor,Actor之间通过队列通信,比如Actor1 发消息给Actor2, Actor2 发消息给Actor1都是投递到对方的队列中。好像给对方发邮件,对方从邮箱中取出一样。如下图
Actor模型的另一个好处就是可以消除共享状态,因为它每次只能处理一条消息,所以actor内部可以安全的处理状态,而不用考虑锁机制。
我们在网络编程中对于逻辑层的处理就采用了将要处理的逻辑消息封装成包投递给逻辑队列,逻辑类从队列中消费的思想,其实就是一种Actor设计模式。Erlang是天然支持Actor的语言。
CSP模式
CSP 是 Communicating Sequential Process 的简称,中文可以叫做通信顺序进程,是一种并发编程模型,是一个很强大的并发数据模型,是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。相对于Actor模型,CSP中channel是第一类对象,它不关注发送消息的实体,而关注与发送消息时使用的channel。go是天然支持csp模式的语言。
CSP和Actor类似,只不过CSP将消息投递给channel,至于谁从channel中取数据,发送的一方是不关注的。简单的说Actor在发送消息前是直到接收方是谁,而接受方收到消息后也知道发送方是谁,更像是邮件的通信模式。而csp是完全解耦合的。
无论Actor还是CSP,他们都有一个共同的特性”Do not communicate by sharing memory; instead, share memory by communicating”
go风格的csp
我们通过生产者和消费者模型给大家演示csp模式的使用方式,用go来做示例
package main
import (
"cspdemo/message"
"fmt"
"os"
"os/signal"
"syscall"
)
var closeChan chan struct{}
var sigs chan os.Signal
func init() {
//类似于auto
sigs = make(chan os.Signal)
//具体类型初始化
closeChan = make(chan struct{})
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
//可以理解为C++ 的匿名函数,或者js的匿名函数,此处通过go原语启动一个协程并行执行
go func() {
sig := <-sigs
fmt.Println("receive signal is ", sig)
close(closeChan)
message.ConsumerInst().Exit()
message.ProducerInst().Exit()
}()
}
func main() {
fmt.Println("Main Process begin!")
<-closeChan
message.ConsumerInst().Join()
message.ProducerInst().Join()
fmt.Println("Main Process exit!")
}
在上面的代码中我们启动了一个协程监听Ctrl+C等退出操作,当收到Ctrl+C的信号后,会关闭closeChan这个channel。这样主函数中<-closeChan
就会从channel中取出数据。然后等待消费者和生产者退出。
接下来我们将生产者和消费者的实现放入message包,先看下message公共数据的定义
package message
const MAX_COUNT = 200
var msgChan = make(chan int, MAX_COUNT)
上面的代码中我们定义了一个channel,大小为200,大家可以理解为仓库的大小为200,生产者向仓库中投递数据如果达到200就会阻塞。直到有消费者从中消费数据,如果消费者发现channel中数据为0,则阻塞。
package message
import (
"context"
"fmt"
"sync"
"time"
)
var producer *Producer = nil
var producer_once sync.Once
func init() {
// Consumer1 = new(Consumer)
//类似于C++ std::call_once
producer_once.Do(func() {
producer = new(Producer)
producer._exited = make(chan struct{})
producer._ctx, producer._cancel = context.WithCancel(context.Background())
producer.StartWork()
})
}
func ProducerInst() *Producer {
return producer
}
type Producer struct {
_exited chan struct{}
_ctx context.Context
_cancel context.CancelFunc
}
func (producer *Producer) Join() {
<-producer._exited
fmt.Println("producer exited")
}
func (producer *Producer) Exit() {
producer._cancel()
}
func (producer *Producer) StartWork() {
go func() {
i := 0
for {
i++
select {
case <-producer._ctx.Done():
{
close(producer._exited)
return
}
case msgChan <- i:
fmt.Println("producer produce number is ", i)
}
time.Sleep(50 * time.Millisecond)
}
}()
}
我们通过init函数中只调用一次的方式初始化了producer,之后生成了一个名为_exited的channel,用来通知Join返回。
同样我们还初始化了一个可以取消的context,主要是在Exit函数内调用cancel取消上下文,会触发StartWork
中producer._ctx.Done()
进而终止生产工作,再发出退出信号,达到优雅退出的效果。
类似的消费者也是相似逻辑
package message
import (
"context"
"fmt"
"sync"
"time"
)
var consumer *Consumer = nil
var consumer_once sync.Once
func init() {
// Consumer1 = new(Consumer)
//类似于C++ std::call_once
consumer_once.Do(func() {
consumer = new(Consumer)
consumer._exited = make(chan struct{})
consumer._ctx, consumer._cancel = context.WithCancel(context.Background())
consumer.StartWork()
})
}
func ConsumerInst() *Consumer {
return consumer
}
type Consumer struct {
_exited chan struct{}
_ctx context.Context
_cancel context.CancelFunc
}
func (consumer *Consumer) Join() {
<-consumer._exited
fmt.Println("consumer exited")
}
func (consumer *Consumer) Exit() {
consumer._cancel()
}
func (consumer *Consumer) StartWork() {
go func() {
i := 0
for {
select {
case <-consumer._ctx.Done():
{
close(consumer._exited)
return
}
case i = <-msgChan:
fmt.Println("consumer consum number is ", i)
}
time.Sleep(100 * time.Millisecond)
}
}()
}
C++ 风格的csp
C++是万能的,我们可以用C++实现一个类似于go的channel,采用csp模式解耦合,实现类似的生产者和消费者问题
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
template <typename T>
class Channel {
private:
std::queue<T> queue_;
std::mutex mtx_;
std::condition_variable cv_producer_;
std::condition_variable cv_consumer_;
size_t capacity_;
bool closed_ = false;
public:
Channel(size_t capacity = 0) : capacity_(capacity) {}
bool send(T value) {
std::unique_lock<std::mutex> lock(mtx_);
cv_producer_.wait(lock, [this]() {
// 对于无缓冲的channel,我们应该等待直到有消费者准备好
return (capacity_ == 0 && queue_.empty()) || queue_.size() < capacity_ || closed_;
});
if (closed_) {
return false;
}
queue_.push(value);
cv_consumer_.notify_one();
return true;
}
bool receive(T& value) {
std::unique_lock<std::mutex> lock(mtx_);
cv_consumer_.wait(lock, [this]() { return !queue_.empty() || closed_; });
if (closed_ && queue_.empty()) {
return false;
}
value = queue_.front();
queue_.pop();
cv_producer_.notify_one();
return true;
}
void close() {
std::unique_lock<std::mutex> lock(mtx_);
closed_ = true;
cv_producer_.notify_all();
cv_consumer_.notify_all();
}
};
// 示例使用
int main() {
Channel<int> ch(10); // 10缓冲的channel
std::thread producer([&]() {
for (int i = 0; i < 5; ++i) {
ch.send(i);
std::cout << "Sent: " << i << std::endl;
}
ch.close();
});
std::thread consumer([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 故意延迟消费者开始消费
int val;
while (ch.receive(val)) {
std::cout << "Received: " << val << std::endl;
}
});
producer.join();
consumer.join();
return 0;
}
简单来说就是通过条件变量实现通信的阻塞和同步的。
利用csp思想实现取款逻辑
《C++并发编程实战》一书中提及了用csp思想实现atm机取款逻辑,我根据书中思想,整理了通信的示意图,书中部分代码存在问题,也一并修复了。
主函数实现
// Actor.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//
#include <iostream>
#include "message.h"
#include "withdraw_msg.h"
#include "atm.h"
#include "dispatcher.h"
#include "bank_matchine.h"
#include "interface_matchine.h"
int main()
{
bank_machine bank;
interface_machine interface_hardware;
atm machine(bank.get_sender(), interface_hardware.get_sender());
std::thread bank_thread(&bank_machine::run, &bank);
std::thread if_thread(&interface_machine::run, &interface_hardware);
std::thread atm_thread(&atm::run, &machine);
messaging::sender atmqueue(machine.get_sender());
bool quit_pressed = false;
while (!quit_pressed)
{
char c = getchar();
switch (c)
{
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9':
atmqueue.send(digit_pressed(c));
break;
case 'b':
atmqueue.send(balance_pressed());
break;
case 'w':
atmqueue.send(withdraw_pressed(50));
break;
case 'c':
atmqueue.send(cancel_pressed());
break;
case 'q':
quit_pressed = true;
break;
case 'i':
atmqueue.send(card_inserted("acc1234"));
break;
}
}
bank.done();
machine.done();
interface_hardware.done();
atm_thread.join();
bank_thread.join();
if_thread.join();
}
主函数中启动了三个线程,分别处理bank,machine以及interface的操作。
由于代码复杂解析来只列举atm类的实现
#pragma once
#include "dispatcher.h"
#include <functional>
#include <iostream>
class atm
{
messaging::receiver incoming;
messaging::sender bank;
messaging::sender interface_hardware;
void (atm::* state)();
std::string account;
unsigned withdrawal_amount;
std::string pin;
void process_withdrawal()
{
incoming.wait().handle<withdraw_ok, std::function<void(withdraw_ok const& msg)>,
messaging::dispatcher >(
[&](withdraw_ok const& msg)
{
interface_hardware.send(
issue_money(withdrawal_amount));
bank.send(
withdrawal_processed(account, withdrawal_amount));
state = &atm::done_processing;
}, "withdraw_ok").handle<withdraw_denied, std::function<void(withdraw_denied const& msg)>>(
[&](withdraw_denied const& msg)
{
interface_hardware.send(display_insufficient_funds());
state = &atm::done_processing;
}, "withdraw_denied").handle<cancel_pressed, std::function<void(cancel_pressed const& msg)>>(
[&](cancel_pressed const& msg)
{
bank.send(
cancel_withdrawal(account, withdrawal_amount));
interface_hardware.send(
display_withdrawal_cancelled());
state = &atm::done_processing;
}, "cancel_pressed"
);
}
void process_balance()
{
incoming.wait()
.handle<balance, std::function<void(balance const& msg)>,
messaging::dispatcher>(
[&](balance const& msg)
{
interface_hardware.send(display_balance(msg.amount));
state = &atm::wait_for_action;
},"balance"
).handle < cancel_pressed, std::function<void(cancel_pressed const& msg) >>(
[&](cancel_pressed const& msg)
{
state = &atm::done_processing;
}, "cancel_pressed"
);
}
void wait_for_action()
{
interface_hardware.send(display_withdrawal_options());
incoming.wait()
.handle<withdraw_pressed, std::function<void(withdraw_pressed const& msg)>,
messaging::dispatcher>(
[&](withdraw_pressed const& msg)
{
withdrawal_amount = msg.amount;
bank.send(withdraw(account, msg.amount, incoming));
state = &atm::process_withdrawal;
}, "withdraw_pressed"
).handle < balance_pressed, std::function<void(balance_pressed const& msg) >>(
[&](balance_pressed const& msg)
{
bank.send(get_balance(account, incoming));
state = &atm::process_balance;
}, "balance_pressed"
).handle<cancel_pressed, std::function<void(cancel_pressed const& msg) >>(
[&](cancel_pressed const& msg)
{
state = &atm::done_processing;
}, "cancel_pressed"
);
}
void verifying_pin()
{
incoming.wait()
.handle<pin_verified, std::function<void(pin_verified const& msg)>,
messaging::dispatcher>(
[&](pin_verified const& msg)
{
state = &atm::wait_for_action;
}, "pin_verified"
).handle<pin_incorrect, std::function<void(pin_incorrect const& msg)>>(
[&](pin_incorrect const& msg)
{
interface_hardware.send(
display_pin_incorrect_message());
state = &atm::done_processing;
}, "pin_incorrect"
).handle<cancel_pressed, std::function<void(cancel_pressed const& msg)>>(
[&](cancel_pressed const& msg)
{
state = &atm::done_processing;
}, "cancel_pressed"
);
}
void getting_pin()
{
incoming.wait().handle<digit_pressed, std::function<void(digit_pressed const& msg)>,
messaging::dispatcher>(
[&](digit_pressed const& msg)
{
unsigned const pin_length = 6;
pin += msg.digit;
if (pin.length() == pin_length)
{
bank.send(verify_pin(account, pin, incoming));
state = &atm::verifying_pin;
}
}, "digit_pressed"
).handle<clear_last_pressed, std::function<void(clear_last_pressed const& msg)>>(
[&](clear_last_pressed const& msg)
{
if (!pin.empty())
{
pin.pop_back();
}
}, "clear_last_pressed"
).handle<cancel_pressed, std::function<void(cancel_pressed const& msg)>>(
[&](cancel_pressed const& msg)
{
state = &atm::done_processing;
}, "cancel_pressed"
);
}
void waiting_for_card()
{
interface_hardware.send(display_enter_card());
incoming.wait().handle<card_inserted, std::function<void(card_inserted const& msg)>,
messaging::dispatcher>(
[&](card_inserted const& msg)
{
account = msg.account;
pin = "";
interface_hardware.send(display_enter_pin());
state = &atm::getting_pin;
}, "card_inserted"
);
}
void done_processing()
{
interface_hardware.send(eject_card());
state = &atm::waiting_for_card;
}
atm(atm const&) = delete;
atm& operator=(atm const&) = delete;
public:
atm(messaging::sender bank_,
messaging::sender interface_hardware_) :
bank(bank_), interface_hardware(interface_hardware_)
{}
void done()
{
get_sender().send(messaging::close_queue());
}
void run()
{
state = &atm::waiting_for_card;
try
{
for (;;)
{
(this->*state)();
}
}
catch (messaging::close_queue const&)
{
}
}
messaging::sender get_sender()
{
return incoming;
}
};
atm 主要功能就是通过状态机不断地切换状态监听想要处理的函数。
详细源码可参考https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day08-actor/Actor
总结
本文讲述了Actor设计模式和CSP设计模式,并通过go和C++等语言给大家展示了csp并发设计的demo,最后通过讲解《C++并发编程实战》中取款的案例,展示了csp的用法。
详细源码
https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day08-actor/Actor
视频链接
https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290