Actor模式之高性能并发框架的实现

论坛 期权论坛 脚本     
匿名技术用户   2020-12-27 00:31   11   0

原文转自:http://www.tanjp.com/archives/149 (即时修正和更新)

概念

Actor模式是一种并发模型,其目标是充分利用计算机多核的优势,把一个大问题分拆成多个小问题并发处理,或者把多个平行的问题并发处理。一个Actor指的是一个最基本的计算单元。它能接收一个消息并且基于其执行计算。Actor一大重要特征在于Actor之间相互隔离,它们并不互相共享内存。也就是说,一个Actor能维持一个私有的状态,并且这个状态不可能被另一个Actor所改变,改变私有状态只能由Actor自身的行为(函数)来操作。光有一个Actor是不够的,多个Actors才能组成系统。在Actor模型里每个Actor都有所有其他Actor地址,它们能够相互发送消息,并组成系统。

Actor模式中的Actor是并行运算的最小单元,也就是说同一个Actor的行为不可能被并行处理。

Actor对象由三个元素构成:数据(data),行为(behavior),信箱(mailbox)。而信件(mail)是Actor与Actor之间沟通的唯一途径。

一组系统由一个或多个舞台(stage)组成,每个舞台上面有多个Actor。Actor的数据流,从投递信件(mail)到信箱(mailbox)开始,Actor对象严格按先后次序读取信箱的信件,触发并执行相应的行为(behavior),这些行为会改变数据(data),也会投递新信件(post mail)到其他Actor或者自己(行为所属的Actor)。

多个Actor的触发的行为会被调度器(scheduler)分配到各个线程去执行。

(原文有画图说明)

关键点梳理

1、同一个Actor的行为不可能被并行处理,同一时刻一个Actor只会有一个行为被执行。

2、Actor与Actor之间的数据状态相互隔离,不可直接改变。

3、Actor的数据只会被该Actor的行为来改变,消息不会改变数据。

4、Actor与Actor唯一的沟通方式就是投递消息。

5、Actor的信件严格按先后次序被触发。

6、一个Actor是没有意义的,多个Actor才能组成高并发的系统。

C++接口

调度器(Scheduler)

采用了行为(OperationType)缓存池来减少频繁的内存分配和销毁的消耗,同时也能减少内存碎片的产生。

class Scheduler
{
public:
     // @pn_cpu_count, 使用的CPU核数。
     // @pn_type, 调度方案的类型。
     // @pb_lockfree, 是否采用无锁实现。
     explicit Scheduler(uint32 pn_cpu_count = 0U, enSchedulerType pn_type, bool pb_lockfree); 
     // 启动服务,多线程安全。
     // @return 启动成功返回true, 失败返回false。
 bool start();
 // 判断服务是否可用。启动服务后到停止服务前,都处于可用状态,多线程安全。
 // @return 服务可用返回true, 不可用返回false。
 bool available();
 // 将任务加入执行队列,等待空闲线程执行,多线程安全。
 // 启动服务后到停止服务前处于可用状态时,加入的任务会被正确执行,其他时机未定义。
 // @pp_optype, 待加入的行为。
 // @return 加入成功返回true, 失败返回false。
 bool post(OperationType && pp_optype);
 // 停止服务,多线程安全。
 // @return 停止成功返回true, 失败返回false。
 bool stop();
 // @return 返回可投递接口,投递到此接口的行为严格按先后次序执行。
 ISequence * sequence();
};

信件(Mail)

信件的创建有舞台类型负责,也能由框架层自动回收已经用完的信件,同时采用了信件缓冲池,减少频繁的内存分配和销毁的消耗,同时也能减少内存碎片的产生。

class Mail
{
public:
     uint32 subject; //主题
     uint32 content; //内容
     void * attachment; //附件
     inline uint64 uid() const { return mn_uid; }
     inline uint32 sender() const { return mn_sender; }
     inline uint32 receiver() const { return mn_receiver; }
private:
 // 私有构造函数,只能由 MailCache 创建
 explicit Mail(MailCache * pp_cache);
 void done(); //邮件处理完后调用,回收邮件
private:
 MailCache * mp_cache;
 uint64 mn_uid; //唯一ID
 uint32 mn_sender; //发件人
 uint32 mn_receiver; //收件人
};

舞台(Stage)

提供一个统一的管理接口,简单易用的接口,方便Actor的管理和销毁。

typedef std::shared_ptr<Actor> ActorPtr; //Actor的共享智能指针
class Stage
{
public:
     // @pp_scheduler 调度器指针。
     explicit Stage(Scheduler * pp_scheduler);
     // @return 获得调度器指针。
     Scheduler * svc() { return mp_schd; }
     // @pn_id 待获取的Actor唯一标识。
 // @return 返回指定标识的Actor对象共享只能指针。
 ActorPtr get(uint32 pn_id); 
 // 添加Actor到舞台。
 // @pp_actor 待添加的Actor。
 // @return Actor标识重复导致失败返回false,否则返回成功true。
 bool add_actor(ActorPtr & pp_actor);
 // 删除舞台中指定标识的Actor。
 // @pn_id 待删除的Actor标识。
 // @return Actor标识不存在导致失败返回false,否则返回成功true。
 bool del_actor(uint32 pn_id);
 // @return 一个新的信件。
 Mail * new_mail();
 // 等待所有Actor处理完所有任务,并成功释放占有的新建。
 void wait_for_destroyed();
};

实体对象(Actor)

Actor基类,所有其他Actor都需要继承此类来实现功能。

1、初始化时,绑定信件主题对应的Actor行为。

2、查收信件,并做出相应的行为。

3、查收信件按从先到后的次序进行。

4、查收信件并完成相应的行为后才会查收下一个信件。

5、Actor未关闭(调用close)或有未读取的信件或有未完成的行为时都不能销毁。

class Actor
{
public:
     // @pp_stage, 舞台对象指针。
     // @pn_actorid, 该实体对象在舞台中的唯一标识。
     explicit Actor(Stage * pp_stage, uint32 pn_actorid);
    // 必须调用close()函数并等待消息被处理完才能正常析构Actor对象。
     virtual ~Actor();
     // @return 返回实体唯一标识。
     uint32 id() const;
 // @retrun 返回舞台对象指针。
 Stage * stage();
 // 初始化,对象创建后要被首先调用。
 virtual void init();
 // 设置特殊数据。用于子类的特殊实现。
 virtual bool set_special(void * pp_data) { return true; }
 // 关闭实体,执行后该实体不再接收邮件。必须调用此函数并等待消息被处理完才能正常析构Actor对象。
 void close();
 // @return 如果实体完全关闭(待处理队列空)成功返回true, 否则返回false。
 bool closed();
 // 投递邮件。
 // @pn_receiver, 信件接收者标识。
 // @pn_subject, 信件主题。
 // @pn_content, 信件内容。
 // @pp_attachment, 信件的附件。
 // @return 投递成功返回true,否则失败返回false。失败时要注意回收附件pp_attachment,以免内存泄漏。
 bool post(uint32 pn_receiver, uint32 pn_subject, uint32 pn_content, void * pp_attachment);
protected:
 // 设置信件的行为,只能在子类初始化(init)时调用。
 // @pn_subject, 信件主题。
 // @pp_optype, 仿函数,信件主题对应的行为。
 // @return 成功返回true, 否则返回false。
 bool set_operation(uint32 pn_subject, MailOperation && pp_optype);
 };

应用示例

5个Actor轮流发送消息,直到主线程发送关闭信件才结束。

static const uint32 kActorCount = 5;
static const uint32 kActorIds[kActorCount] = {1, 2, 3, 4, 5};
class TestActor : public Actor
{
public:
     TestActor(Stage * pp_stage, uint32 pn_actorid) : Actor(pp_stage, pn_actorid){ }
     ~TestActor(){}
     void init() override
     {  
  set_operation(1, std::bind(&TestActor::op_test1, this, std::placeholders::_1));
  set_operation(2, std::bind(&TestActor::op_test2, this, std::placeholders::_1));
  set_operation(3, std::bind(&TestActor::op_test3, this, std::placeholders::_1));
  set_operation(0, std::bind(&TestActor::op_close, this, std::placeholders::_1));
 }
 bool set_special(void * pp_data) override
 {
  std::string * zp_s = static_cast<std::string*>(pp_data);
  if (!zp_s){ return false; }
  ms_data = *zp_s;
  std::cout << id() << ", set_special " << ms_data << std::endl;
  return true;
 }
private:
 void op_test1(Mail * pp_mail)
 {
  ms_data = pp_mail->tostring();
  std::cout << id() << ", op_test1 " << pp_mail->tostring() << std::endl;
  uint32 zn_receiver = (id() % kActorCount) + 1;
  post(zn_receiver, 2, 0, 0);
 }
 void op_test2(Mail * pp_mail)
 {
  ms_data = pp_mail->tostring();
  std::cout << id() << ", op_test2 " << pp_mail->tostring() << std::endl;
  uint32 zn_receiver = (id() % kActorCount) + 1;
  post(zn_receiver, 3, 0, 0);
 }
 void op_test3(Mail * pp_mail)
 {
  ms_data = pp_mail->tostring();
  std::cout << id() << ", op_test3 " << pp_mail->tostring() << std::endl;
  uint32 zn_receiver = (id() % kActorCount) + 1;
  post(zn_receiver, 1, 0, 0);
 }
 void op_close(Mail * pp_mail)
 {
  ms_data = pp_mail->tostring();
  std::cout << id() << ", op_close " << pp_mail->tostring() << std::endl;
  this->close();
 }
private:
 std::string ms_data;
};
void main(int argc, char* argv[])
{
 std::cout << std::endl << "begin" << std::endl;
 Scheduler schd(0, enST_STEALING, true);
 schd.start();
 {
  Stage stage(&schd);
  // 创建多个Actor
  for (uint32 i = 0; i < kActorCount; ++i)
  {
   ActorPtr zp_actor = ActorPtr(new TestActor(&stage, kActorIds[i]));
   zp_actor->init();
   std::string zs_temp(1, 'A' + i);
   zp_actor->set_special(&zs_temp);
   stage.add_actor(zp_actor);
  }
  {   // 开始投递信件
   ActorPtr zp_actor = stage.get(kActorIds[0]);
   zp_actor->post(kActorIds[0], 1, 0, 0);
  }  
  // 主线程阻塞等待
  THIS_SLEEP_SECONDS(10);
  // 投递关闭信件
  for (uint32 i = 0; i < kActorCount; ++i)
  {
   ActorPtr zp_actor = stage.get(kActorIds[i]);
   zp_actor->post(kActorIds[i], 0, 0, 0);
  }
  // 等待所有操作完成并销毁
  stage.wait_for_destroyed();
 }
 std::cout << "stoping" << std::endl;
 schd.stop();
 std::cout << "stoped" << std::endl;
 std::cout << std::endl << "end" << std::endl;
}

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:7942463
帖子:1588486
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP