Actor模式实现高并发的异步文件日志系统(Actor Log)

论坛 期权论坛 脚本     
匿名技术用户   2020-12-27 00:30   107   0

原文转自:http://www.tanjp.com/archives/176 (即时修正和更新)

文件日志异步读写

我们都知道普通的读写文件打开文件都是同步的,比如C的fopen, fclose, fread等。并且磁盘的访问速度远远的低于内存,所以操作系统要阻塞等待磁盘设备准备好才进行读写。如果采用同步,那么上次业务逻辑将会阻塞挂起,等待磁盘把数据准备好,再通知操作系统上报到应用层。高性能的服务器,提高并发,读写文件都会采用异步的模式。

C或C++提高文件读写的接口(FILE, fopen, fclose, fwrite),都是非线程安全的。而我们之前已经知道一个Actor内的行为能确保不可能被多个线程执行,也就是说把日志文件的写入,封装在一个Actor里面(无需使用锁),并且通过调度机制就能实现高并发的文件日志异步写入。文件日志系统,主要用于记录各种日志信息到磁盘文件,所以只需要写入操作。

Actor模式实现高并发的异步文件日志系统

利用Actor的特性,同一个Actor里面的行为,不需要加锁,也就是说Actor可以很方便地封装同步操作。

1、实现一个同步的文件日志写入类型 SyncFileLog。

2、用Actor包装SyncFileLog,得到类型ActorFileLog。

3、实现ActorLog类型,该类型包含了单独的调度器(Scheduler)和舞台(Stage),和管理所有ActorFileLog。

4、ActorLog类型接收信件,创建N个FileLogActor对象,每个FileLogActor对象负责一个单独的文件句柄写入。

这样就实现了基于Actor模式高并发写入多个文件的日志系统。ActorLog对象的设计如下图:

首先,实现一个同步的文件日志写入类型 SyncFileLog,也就是说,这些接口如果直接调用阻塞业务层逻辑。

有以下接口(实现细节有几百行,暂时不粘贴,之后再把源码上传):图文代码请参照以上笔记

class SyncFileLog
{
public:
     // 配置文件日志
     // @ps_dir, 所在目录。
     // @ps_name, 文件名称,不包括路径和后缀。
     // @ps_suffix, 后缀名。
     // @pn_divide_size, 划分文件的大小。
     // @pb_create_dir_everyday, 是否每天划分目录。
 // @return 配置成功返回true,否则返回false。
 bool config(const std::string & ps_dir, const std::string & ps_name,
  const std::string & ps_suffix, uint32 pn_divide_size, bool pb_create_dir_everyday = false);
 // @ps_data, 待写入的字符串日志数据。
 // @return 写入成功返回true, 否则返回false。
 bool write(const std::string & ps_data);
 // 关闭文件。
 void close(); 
};

然后,用Actor包装同步的文件日志写入类型SyncFileLog,得到类型ActorFileLog。代码如下:

class ActorFileLog : public Actor
{
public:
     explicit ActorFileLog(Stage * pp_stage, uint32 pn_actorid);
     ~ActorFileLog();
     void init() override;
private:
     void op_config(Mail * pp_mail);
     void op_write(Mail * pp_mail);
 void op_close(Mail * pp_mail);
private:
 SyncFileLog * mp_filelog;
};
ActorFileLog::ActorFileLog(Stage * pp_stage, uint32 pn_actorid)
 : Actor(pp_stage, pn_actorid), mp_filelog(new SyncFileLog()){}
ActorFileLog::~ActorFileLog() { SAFE_DELETE(mp_filelog);}
void ActorFileLog::init()
{
 set_operation(ActorLog::kMSIdConfig, std::bind(&ActorFileLog::op_config, this, std::placeholders::_1));
 set_operation(ActorLog::kMSIdWrite, std::bind(&ActorFileLog::op_write, this, std::placeholders::_1));
 set_operation(ActorLog::kMSIdClose, std::bind(&ActorFileLog::op_close, this, std::placeholders::_1));
}
void ActorFileLog::op_config(Mail * pp_mail)
{
 ActorLog::ConfigAttachment * zp_attachment = static_cast< ActorLog::ConfigAttachment* >(pp_mail->attachment);
 if (!zp_attachment)
 {
  return; //static_cast attachment error
 }
 ActorLog::ConfigAttachment & zo_attachment = *zp_attachment;
 const std::string & zs_dir = std::get<0>(zo_attachment);
 const std::string & zs_name = std::get<1>(zo_attachment);
 const std::string & zs_suffix = std::get<2>(zo_attachment);
 uint32 zn_divide_size = std::get<3>(zo_attachment);
 bool zb_create_dir_everyday = std::get<4>(zo_attachment);
 bool zb_ok = mp_filelog->config(zs_dir, zs_name, zs_suffix, zn_divide_size, zb_create_dir_everyday);
 if (!zb_ok) {   //失败处理 }
 SAFE_DELETE(zp_attachment); //销毁附件
}
void ActorFileLog::op_write(Mail * pp_mail)
{
 ActorLog::WriteAttachment * zp_attachment = static_cast<ActorLog::WriteAttachment*>(pp_mail->attachment);
 if (!zp_attachment)
 {
  return; //static_cast attachment error
 }
 std::string & zs_data = *zp_attachment;
 bool zb_ok = mp_filelog->write(zs_data);
 if (!zb_ok) { //失败处理 }
 SAFE_DELETE(zp_attachment); //销毁附件
}
void ActorFileLog::op_close(Mail * pp_mail)
{
 this->close();
 mp_filelog->close(); 
}

最后,实现ActorLog类型,该类型包含了单独的调度器(Scheduler)和舞台(Stage),和管理所有ActorFileLog。

class ActorLog : public Actor
{
 friend class ActorFileLog;
 typedef std::tuple<std::string, std::string, std::string, uint32, bool> ConfigAttachment;
 typedef std::string WriteAttachment;
 static const uint32 kMSIdConfig = LOG_MAIL_SUBJECT(1);
 static const uint32 kMSIdWrite = LOG_MAIL_SUBJECT(2);
 static const uint32 kMSIdClose = LOG_MAIL_SUBJECT(3);
public:
 static const uint32 kActorId = LOG_ACTOR_ID;
 static bool post_config(ActorPtr & pp_sender, uint32 pn_logid,
  const std::string & ps_dir, const std::string & ps_name, 
  const std::string & ps_suffix, uint32 pn_divide_size, bool pb_create_dir_everyday);
 static bool post_write(ActorPtr & pp_sender, uint32 pn_logid, const std::string & ps_data);
 static bool post_close(ActorPtr & pp_sender);
public:
 explicit ActorLog(Stage * pp_stage);
 ~ActorLog();
 void init() override;
 // pp_data, 传入uint32类型, 表示日志调度分配的CPU核数。
 bool set_special(void * pp_data) override;
private:
 void op_config(Mail * pp_mail);
 void op_write(Mail * pp_mail);
 void op_close(Mail * pp_mail);
private:
 Scheduler * mp_schd;
 Stage * mp_stage;
};
bool ActorLog::post_config(ActorPtr & pp_sender, uint32 pn_logid,
 const std::string & ps_dir, const std::string & ps_name,
 const std::string & ps_suffix, uint32 pn_divide_size, bool pb_create_dir_everyday)
{
 ConfigAttachment * zp_attachment = new ConfigAttachment(ps_dir, ps_name, ps_suffix, pn_divide_size, pb_create_dir_everyday);
 bool zb_ok = pp_sender->post(kActorId, kMSIdConfig, pn_logid, zp_attachment);
 if (!zb_ok) {SAFE_DELETE(zp_attachment);}
 return zb_ok;
}
bool ActorLog::post_write(ActorPtr & pp_sender, uint32 pn_logid, const std::string & ps_data)
{
 WriteAttachment * zp_attachment = new WriteAttachment(ps_data);
 bool zb_ok = pp_sender->post(kActorId, kMSIdWrite, pn_logid, zp_attachment);
 if (!zb_ok){ SAFE_DELETE(zp_attachment); }
 return zb_ok;
}
bool ActorLog::post_close(ActorPtr & pp_sender)
{
 return pp_sender->post(kActorId, kMSIdClose, 0, 0);
}
ActorLog::ActorLog(Stage * pp_stage) : Actor(pp_stage, kActorId), mp_schd(0), mp_stage(0){}
ActorLog::~ActorLog()
{
 SAFE_DELETE(mp_stage);
 SAFE_DELETE(mp_schd);
}
void ActorLog::init()
{
 set_operation(kMSIdConfig, std::bind(&ActorLog::op_config, this, std::placeholders::_1));
 set_operation(kMSIdWrite, std::bind(&ActorLog::op_write, this, std::placeholders::_1));
 set_operation(kMSIdClose, std::bind(&ActorLog::op_close, this, std::placeholders::_1));
}
bool ActorLog::set_special(void * pp_data)
{
 if (mp_schd || mp_stage){ return false;/*can not repeat set_special*/ }
 uint32 * zp_data = static_cast<uint32*>(pp_data);
 if (!zp_data){ return false;/*static_cast cpu count error*/ }
 uint32 zn_cpu_count = *zp_data;
 mp_schd = new Scheduler(zn_cpu_count, enST_DEFAULT, false);
 bool zb_ok = mp_schd->start();
 if (!zb_ok)
 {
  SAFE_DELETE(mp_schd);
  return false; //scheduler start failed
 }
 mp_stage = new Stage(mp_schd);
 return zb_ok;
}
void ActorLog::op_config(Mail * pp_mail)
{ 
 ConfigAttachment * zp_attachment = static_cast< ConfigAttachment* >(pp_mail->attachment);
 if (!zp_attachment){return;/*static_cast attachment*/}
 
 if (!mp_schd || !mp_stage)
 {
  SAFE_DELETE(zp_attachment); //销毁附件
  return;//please call function set_specia
 }
 uint32 zn_logid = pp_mail->content;
 ActorPtr zp_actor = ActorPtr(new ActorFileLog(mp_stage, zn_logid));
 zp_actor->init();
 bool zb_ok = mp_stage->add_actor(zp_actor);
 if (!zb_ok)
 {
  SAFE_DELETE(zp_attachment); //销毁附件
  return;//add actor failed
 } 
 zb_ok = zp_actor->post(zn_logid, pp_mail->subject, 0, zp_attachment);
 if (!zb_ok)
 {
  SAFE_DELETE(zp_attachment); //销毁附件
  ExTrack::error(TI_LIBLOG, ss.str());//actor post self failed
 }
}

void ActorLog::op_write(Mail * pp_mail)
{
 WriteAttachment * zp_attachment = static_cast<WriteAttachment*>(pp_mail->attachment);
 if (!zp_attachment)
 {
  return;//static_cast attachment error
 }
 if (!mp_schd || !mp_stage)
 {
  SAFE_DELETE(zp_attachment); //销毁附件
  return;//please call function set_special
 }
 uint32 zn_logid = pp_mail->content;
 ActorPtr zp_actor = mp_stage->get(zn_logid);
 if (!zp_actor)
 {
  SAFE_DELETE(zp_attachment); //销毁附件
  return;//get actor failed
 }

 bool zb_ok = zp_actor->post(zn_logid, pp_mail->subject, 0, zp_attachment);
 if (!zb_ok)
 {
  SAFE_DELETE(zp_attachment); //销毁附件
  ExTrack::error(TI_LIBLOG, ss.str());//actor post self failed
 }
}
void ActorLog::op_close(Mail * pp_mail)
{
 if (!mp_schd || !mp_stage)
 {
  return;//please call function set_special
 }
 this->close();
 std::vector<uint32> zc_ids = mp_stage->get_ids();
 uint32 zn_actorid = 0;
 for (auto it = zc_ids.begin(); it != zc_ids.end(); ++it)
 {
  zn_actorid = *it;
  ActorPtr & zp_actor = mp_stage->get(zn_actorid);
  zp_actor->post(zn_actorid, kMSIdClose, 0, 0);
 } 
 mp_stage->wait_for_destroyed();
 mp_schd->stop();
}

测试示例

void main(int argc, char* argv[])
{
     std::cout << std::endl << "begin" << std::endl;
     Scheduler schd(0, enST_STEALING, true);
     schd.start();
     {
      Stage stage(&schd);
      { //创建Actor
       ActorPtr zp_actor = ActorPtr(new ActorLog(&stage));
   zp_actor->init();
   uint32 zn_core_num = 3;
   zp_actor->set_special(&zn_core_num);
   stage.add_actor(zp_actor);
  }{
   // 开始投递信件
   ActorPtr zp_actor = stage.get(ActorLog::kActorId);
   ActorLog::post_config(zp_actor, 11, "./logs/", "11_test", ".txt", 1024 * 1024, true);
   ActorLog::post_config(zp_actor, 12, "./logs/", "12_test", ".txt", 1024 * 1024, true);
   ActorLog::post_config(zp_actor, 21, "./logs_all/", "21_test", ".txt", 1024 * 1024, false);
   ActorLog::post_config(zp_actor, 22, "./logs_all/", "22_test", ".txt", 1024 * 1024, false);
  }{ // 写入数据到多个日志文件
   ActorPtr zp_actor = stage.get(ActorLog::kActorId);
   auto f = [&](uint32 n) {
    std::string s1("abc ");
    if (n > 0){ s1.assign((n % 100) + 1, 'A' + (n % 26));}
    s1.append("\n");
    ActorLog::post_write(zp_actor, 11, s1);
    ActorLog::post_write(zp_actor, 12, s1);
    std::string s2("123 ");
    if (n > 0){s2.assign((n % 100) + 1, 'a' + (n % 26));}
    s2.append("\n");
    ActorLog::post_write(zp_actor, 21, s2);
    ActorLog::post_write(zp_actor, 22, s2);
   };
   for (uint32 i = 0; i < 100000; ++i)
   {
    f(i);
   }
  }{ // 投递关闭信件
   ActorPtr zp_actor = stage.get(ActorLog::kActorId);
   ActorLog::post_close(zp_actor);
  }
  // 等待所有操作完成并销毁
  stage.wait_for_destroyed();
 }
 schd.stop();
 std::cout << std::endl << "end" << std::endl;
}

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:7942463
帖子:1588486
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP