Actor模式封装MongoDB C驱动-多连接并发与自动重试

论坛 期权论坛 脚本     
匿名技术用户   2020-12-27 00:31   11   0

原文转自:http://www.tanjp.com/archives/192 (即时修正和更新)

Actor模式封装异步的MongoDB C驱动

MongoDB C驱动(MongoDB C Driver http://mongoc.org/),提供了同步的DB访问存储接口,在高并发的业务系统,同步操作会阻塞业务逻辑,是高并发的一大障碍。所以需要设计异步的接口来满足业务系统的需求。

利用Actor的特性,同一个Actor里面的行为,不需要加锁,也就是说Actor可以很方便地封装同步操作。

1、实现一个同步的MongoDB数据存取类型 SyncModb。

2、用Actor包装SyncModb,得到类型ActorMongodb。

3、实现ActorModb类型,该类型包含了单独的调度器(Scheduler)和舞台(Stage),和管理所有ActorMongodb。

4、ActoModb类型接收信件,创建N个ActorMongodb对象,每个ActorMongodb对象负责一个单独mongoc驱动连接,进行数据收发。

这样就实现了基于Actor模式实现可多连接并发异步接口。ActoModb对象的设计如下图:

图文代码请参照以上笔记

首先,实现一个同步的MongoDB数据存取类型 SyncModb,也就是说,这些接口如果直接调用会阻塞业务层逻辑。

下面只粘贴部分代码演示使用方法:

class SyncModb
{
public:
     SyncModb();
     ~SyncModb();
     // 配置连接地址
     // @ps_uri, mongod的URI,格式: mongodb://username:password@127.0.0.1:27017/?authSource=collectionname
     // @return 执行结果集合。
     ModbResult config(const std::string & ps_uri);
 // 插入一条数据
 // @ps_dbname, 数据库名称。
 // @ps_colnname, 集合名称。
 // @ps_json, 待插入数据,json格式。
 // @ps_opts, 选项。默认为空,格式 { writeConcern : { w: <uint32>, j: <boolean>, wtimeout: <uint32> } }]
 // @pn_retry, 失败时自动重试的次数(每次重试耗时不少于0.1秒),0表示不自动重试。
 // @return 执行结果集合。
 ModbResult insert_one(const std::string & ps_dbname, const std::string & ps_colnname,
  const std::string & ps_json, const std::string & ps_opts = "");  
 // 关闭 mongodb 连接。
 void close();
 // @return mongodb已经建立连接返回true,如果未建立连接将尝试重连,重连失败返回false。
 bool connected();
 // @po_res, 根据结果集来判断网络是否有问题。
 // @return 返回true表示网络有问题,否则返回false其他问题或没问题。
 bool check_network_problem(const ModbResult & po_res);
 // ping测试Mongodb连接是否可用,每0.1秒尝试一次。
 // @ps_dbname, 数据库名称。
 // @pn_try_times, 尝试次数,取值大于零。
 // @return 0表示重试多次后还是失败,成功返回非0表示重试了多少次后成功。
 uint32 ping(const std::string & ps_dbname, uint32 pn_try_times);
private:
 mongoc_client_t * mp_client;
 std::string ms_mongo_uri;
 uint32 mn_ping_times; //ping次数,失败时每次不少于0.1秒,默认值600,不少于1分钟。
};

然后,用Actor包装同步的MongoDB数据存取类型 SyncModb,得到类型ActorMongodb。代码如下:

class ActorMongodb : public Actor
{
public:
     explicit ActorMongodb(ActorModb * pp_parent, Stage * pp_stage, uint32 pn_actorid, uint32 pn_retry_times_on_disconnected = 1, uint32 pn_ping_times = 600);
     ~ActorMongodb();
     void init() override;

private:
     void sync_ping(const std::string & ps_dbname, uint32 pn_qid, uint32 pn_cur_try_times);
 void op_config(Mail * pp_mail);
 void op_insert_one(Mail * pp_mail);
 void op_close(Mail * pp_mail);
private:
 const uint32 kRetryTimes;
 const uint32 kPingTimes;
 ActorModb * mp_parent;
 SyncModb * mp_modb;
};
ActorMongodb::ActorMongodb(ActorModb * pp_parent, Stage * pp_stage, uint32 pn_actorid, uint32 pn_retry_times_on_disconnected, uint32 pn_ping_times)
 : Actor(pp_stage, pn_actorid), kRetryTimes(pn_retry_times_on_disconnected), kPingTimes(pn_ping_times)
 , mp_parent(pp_parent), mp_modb(new SyncModb()){}
ActorMongodb::~ActorMongodb()
{
 SAFE_DELETE(mp_modb);
}
void ActorMongodb::init()
{
 set_operation(ActorModb::kMSIdConfig, std::bind(&ActorMongodb::op_config, this, std::placeholders::_1));
 set_operation(ActorModb::kMSIdInsertOne, std::bind(&ActorMongodb::op_insert_one, this, std::placeholders::_1));
 set_operation(ActorModb::kMSIdClose, std::bind(&ActorMongodb::op_close, this, std::placeholders::_1));
}
void ActorMongodb::sync_ping(const std::string & ps_dbname, uint32 pn_qid, uint32 pn_cur_try_times)
{
 uint32 zn_ping_times = mp_modb->ping(ps_dbname, kPingTimes);
 if (zn_ping_times > 0){
  ExINFO(TI_LIBMODB, TI_NETWORK, "ActorMongodb::op_insert_many -> ping success, network reconnected, qid="
   << pn_qid << ", cur_times=" << pn_cur_try_times << ", ping times=" << zn_ping_times);
 }else{
  ExERROR(TI_LIBMODB, TI_NETWORK, "ActorMongodb::op_insert_many -> ping failed, network disconnected, qid="
   << pn_qid << ", cur_times=" << pn_cur_try_times << ", ping times=" << kPingTimes);
 }
}
void ActorMongodb::op_config(Mail * pp_mail)
{
 modb::ConfigAttachment * zp_attachment = static_cast<modb::ConfigAttachment*>(pp_mail->attachment);
 modb::ConfigAttachment & zo_attach = *zp_attachment;
 uint32 zn_qid = std::get<0>(zo_attach);
 std::string & zs_uri = std::get<1>(zo_attach);
 ModbResult zo_res = mp_modb->config(zs_uri);
 zo_res.qid = zn_qid;
 if (!zo_res.ok) {
  ExERROR(TI_LIBMODB, TI_ARGUMENTS, "ActorMongodb::op_config -> sync modb config error, uri=" << zs_uri 
   << ", result={" << zo_res.tostring()
   << "}, actor={" << tostring() << "}");
 }
 uint32 zn_receiver = pp_mail->content;
 ModbResult * zp_res = modb::apply_result();
 zp_res->operator=(std::move(zo_res)); 
 bool zb_ok = mp_parent->post(zn_receiver, ActorModb::kMSIdResult, 0, zp_res);
 if (!zb_ok)
 {
  modb::revert_result(zp_res); //归还附件
  ExERROR(TI_LIBMODB, TI_SYSTEM_ERR, "ActorMongodb::op_config -> modb actor post error uri=" << zs_uri
   << ", receiver=" << zn_receiver
   << ", result={" << zp_res->tostring()
   << ", actor={" << tostring() << "}");
 }
 SAFE_DELETE(zp_attachment); //销毁附件
}
void ActorMongodb::op_insert_one(Mail * pp_mail)
{
 modb::InsertOneAttachment * zp_attachment = static_cast<modb::InsertOneAttachment*>(pp_mail->attachment);
 modb::InsertOneAttachment & zo_attach = *zp_attachment;
 uint32 zn_qid = std::get<0>(zo_attach);
 std::string & zs_dbname = std::get<1>(zo_attach);
 std::string & zs_colnname = std::get<2>(zo_attach);
 std::string & zs_json = std::get<3>(zo_attach);
 std::string & zs_opts = std::get<4>(zo_attach);
 ModbResult zo_res;
 uint32 zn_cur_try_times = 0; 
 while (zn_cur_try_times <= kRetryTimes)
 {
  ++zn_cur_try_times;
  zo_res = std::move(mp_modb->insert_one(zs_dbname, zs_colnname, zs_json, zs_opts));
  if (!mp_modb->check_network_problem(zo_res))
  {
   break; //成功或不是网络问题
  }
  //网络有问题
  if (kRetryTimes <= 0)
  {
   break; //不重试
  }  
  ExERROR(TI_LIBMODB, TI_NETWORK, "ActorMongodb::op_insert_one -> network problem sync modb insert_one error, " 
   << ", qid=" << zn_qid << ", cur_times=" << zn_cur_try_times << ", dbname=" << zs_dbname 
   << ", colnname=" << zs_colnname << ", json=" << zs_json << ", opts=" << zs_opts 
   << ", result={" << zo_res.tostring() << "}, actor={" << tostring() << "}");
  sync_ping(zs_dbname, zn_qid, zn_cur_try_times);
 }
 zo_res.qid = zn_qid;
 if (!zo_res.ok)
 {
  ExERROR(TI_LIBMODB, TI_LOGIC_ERR, "ActorMongodb::op_insert_one -> sync modb insert_one error, dbname=" << zs_dbname
   << ", colnname=" << zs_colnname << ", json=" << zs_json << ", opts=" << zs_opts
   << ", result={" << zo_res.tostring() << "}, actor={" << tostring() << "}");
 }
 uint32 zn_receiver = pp_mail->content;
 ModbResult * zp_res = modb::apply_result();
 zp_res->operator=(std::move(zo_res));
 bool zb_ok = mp_parent->post(zn_receiver, ActorModb::kMSIdResult, 0, zp_res);
 if (!zb_ok)
 {
  modb::revert_result(zp_res); //归还附件
  ExERROR(TI_LIBMODB, TI_SYSTEM_ERR, "ActorMongodb::op_insert_one -> modb actor post error, receiver=" 
   << zn_receiver << ", result={" << zp_res->tostring() << ", actor={" << tostring() << "}");
 }
 modb::revert_insert_one(zp_attachment); //归还附件
}
void ActorMongodb::op_close(Mail * pp_mail)
{
 this->close();
 mp_modb->close();
}

最后,实现ActorModb类型,该类型包含了单独的调度器(Scheduler)和舞台(Stage),和管理所有ActorMongodb。

struct ModbSpecialData
{
     explicit ModbSpecialData(uint32 pn_core_num = 2, uint32 pn_retry_times = 5, uint32 pn_ping_times = 600)
  : core_num(pn_core_num), retry_times(pn_retry_times), ping_times(pn_ping_times) {}
     uint32 core_num; //分配给连接的线程数
     uint32 retry_times; //重试次数
     uint32 ping_times; //检查网络ping的次数(每次不少于0.1秒)
};
class ActorModb : public Actor
{
private:
 friend class ActorMongodb;
 static const uint32 kMSIdConfig = MODB_MAIL_SUBJECT(1);
 static const uint32 kMSIdInsertOne = MODB_MAIL_SUBJECT(2);
 static const uint32 kMSIdClose = MODB_MAIL_SUBJECT(9);
public:
 static const uint32 kMSIdResult = MODB_MAIL_SUBJECT(0);
 static const uint32 kActorId = MODB_ACTOR_ID;
 static bool post_config(ActorPtr & pp_sender, uint32 pn_id, uint32 pn_qid, const std::string & ps_uri); 
 static bool post_insert_one(ActorPtr & pp_sender, uint32 pn_id, uint32 pn_qid, 
  const std::string & ps_dbname, const std::string & ps_colname, const std::string & ps_json, 
  const std::string & ps_opts = "");
 static bool post_close(ActorPtr & pp_sender);
public:
 explicit ActorModb(Stage * pp_stage);
 ~ActorModb();
 void init() override;
 bool set_special(void * pp_data) override;
 bool set_special(ModbSpecialData * pp_data);
private:
 void op_config(Mail * pp_mail);
 void op_insert_one(Mail * pp_mail);
 void op_close(Mail * pp_mail);
private:
 Scheduler * mp_schd;
 Stage * mp_stage;
 ModbSpecialData mo_data;
};
bool ActorModb::post_config(ActorPtr & pp_sender, uint32 pn_id, uint32 pn_qid, const std::string & ps_uri)
{
 modb::ConfigAttachment * zp_attachment = new modb::ConfigAttachment(pn_qid, ps_uri);
 bool zb_ok = pp_sender->post(kActorId, kMSIdConfig, pn_id, zp_attachment);
 if (!zb_ok){SAFE_DELETE(zp_attachment); /*销毁附件*/ }
 return zb_ok;
}
bool ActorModb::post_insert_one(ActorPtr & pp_sender, uint32 pn_id, uint32 pn_qid,
 const std::string & ps_dbname, const std::string & ps_colname, const std::string & ps_json,
 const std::string & ps_opts)
{
 modb::InsertOneAttachment * zp_attachment = modb::apply_insert_one();
 std::get<0>(*zp_attachment) = pn_qid;
 std::get<1>(*zp_attachment) = ps_dbname;
 std::get<2>(*zp_attachment) = ps_colname;
 std::get<3>(*zp_attachment) = ps_json;
 std::get<4>(*zp_attachment) = ps_opts;
 bool zb_ok = pp_sender->post(kActorId, kMSIdInsertOne, pn_id, zp_attachment);
 if (!zb_ok)
 {
  modb::revert_insert_one(zp_attachment); //归还附件
 }
 return zb_ok;
}
bool ActorModb::post_close(ActorPtr & pp_sender)
{
 bool zb_ok = pp_sender->post(kActorId, kMSIdClose, 0, 0);
 return zb_ok;
}
ActorModb::ActorModb(Stage * pp_stage)
 : Actor(pp_stage, MODB_ACTOR_ID), mp_schd(0), mp_stage(0), mo_data(2, 5, 600){}
ActorModb::~ActorModb()
{
 SAFE_DELETE(mp_stage);
 SAFE_DELETE(mp_schd);
}
void ActorModb::init()
{
 set_operation(kMSIdConfig, std::bind(&ActorModb::op_config, this, std::placeholders::_1));
 set_operation(kMSIdInsertOne, std::bind(&ActorModb::op_insert_one, this, std::placeholders::_1));
 set_operation(kMSIdClose, std::bind(&ActorModb::op_close, this, std::placeholders::_1));
}
bool ActorModb::set_special(void * pp_data)
{
 ModbSpecialData * zp_data = static_cast<ModbSpecialData*>(pp_data);
 return set_special(zp_data);
}
bool ActorModb::set_special(ModbSpecialData * pp_data)
{
 if (mp_schd || mp_stage)
 {
  ExERROR(TI_LIBMODB, TI_LOGIC_ERR, "ActorModb::set_special -> can not repeat set_special.");
  return false;
 }
 mo_data.core_num = pp_data->core_num;
 mo_data.retry_times = pp_data->retry_times;
 mo_data.ping_times = pp_data->ping_times;
 //实时性要求不高,这种方式可以通过条件变量和互斥锁在空闲时挂起,尽量少侵占系统资源
 mp_schd = new Scheduler(mo_data.core_num, enST_PREEMPTIVE, false);
 bool zb_ok = mp_schd->start();
 if (!zb_ok)
 {
  ExERROR(TI_LIBMODB, TI_LOGIC_ERR, "ActorModb::set_special -> scheduler start failed, core_num=" << mo_data.core_num);
  SAFE_DELETE(mp_schd);
  return false;
 }
 mp_stage = new Stage(mp_schd);
 return zb_ok;
}
void ActorModb::op_config(Mail * pp_mail)
{
 modb::ConfigAttachment * zp_attachment = static_cast<modb::ConfigAttachment*>(pp_mail->attachment);
 if (!mp_schd || !mp_stage)
 {
  SAFE_DELETE(zp_attachment); //销毁附件
  ExERROR(TI_LIBMODB, TI_LOGIC_ERR, "ActorModb::op_config -> please call function set_special after init.");
  return;
 }
 uint32 zn_id = pp_mail->content;
 ActorPtr zp_actor = ActorPtr(new ActorMongodb(this, mp_stage, zn_id, mo_data.retry_times, mo_data.ping_times));
 zp_actor->init();
 bool zb_ok = mp_stage->add_actor(zp_actor);
 if (!zb_ok)
 {
  SAFE_DELETE(zp_attachment); //销毁附件
  ExERROR(TI_LIBMODB, TI_LOGIC_ERR, "ActorModb::op_config -> add actor failed, maybe repeat mongodb actorid="
   << zn_id << ", actor = {" << zp_actor->tostring() << "}, mail={" << pp_mail->tostring() << "}");
  return;
 }
 zb_ok = zp_actor->post(zn_id, pp_mail->subject, pp_mail->sender(), zp_attachment);
 if (!zb_ok)
 {
  SAFE_DELETE(zp_attachment); //销毁附件
  ExERROR(TI_LIBMODB, TI_LOGIC_ERR, "ActorModb::op_config -> actor post self actor failed, mongodb actorid=" << zn_id
   <<", parent actor = {" << tostring() << "}, mail={" << pp_mail->tostring() << "}");
 }
}
void ActorModb::op_insert_one(Mail * pp_mail)
{
 modb::InsertOneAttachment * zp_attachment = static_cast<modb::InsertOneAttachment*>(pp_mail->attachment);
 if (!mp_schd || !mp_stage)
 {
  modb::revert_insert_one(zp_attachment); //归还附件
  ExERROR(TI_LIBMODB, TI_LOGIC_ERR, "ActorModb::op_insert_one -> please call function set_special after init.");
  return;
 }
 uint32 zn_id = pp_mail->content;
 ActorPtr & zp_actor = mp_stage->get(zn_id);
 if (!zp_actor)
 {
  modb::revert_insert_one(zp_attachment); //归还附件
  ExERROR(TI_LIBMODB, TI_LOGIC_ERR, "ActorModb::op_insert_one -> get actor failed, not found mongodb actorid="
   << zn_id << ", actor = {" << tostring() << "}, mail={" << pp_mail->tostring() << "}");
  return;
 }
 bool zb_ok = zp_actor->post(zn_id, pp_mail->subject, pp_mail->sender(), zp_attachment);
 if (!zb_ok)
 {
  modb::revert_insert_one(zp_attachment); //归还附件
  ExERROR(TI_LIBMODB, TI_LOGIC_ERR, "ActorModb::op_insert_one -> actor post self actor failed, mongodb actorid=" << zn_id
   << ", parent actor = {" << tostring() << "}, mail={" << pp_mail->tostring() << "}");
 }
}
void ActorModb::op_close(Mail * pp_mail)
{
 if (!mp_schd || !mp_stage)
 {
  ExERROR(TI_LIBMODB, TI_LOGIC_ERR, "ActorModb::op_close -> please call function set_special after init.");
  return;
 }
 this->close();
 std::vector<uint32> zc_ids = mp_stage->get_ids();
 uint32 zn_actorid = 0;
 for (auto it = zc_ids.begin(); it != zc_ids.end(); ++it)
 {
  zn_actorid = *it;
  ActorPtr & zp_actor = mp_stage->get(zn_actorid);
  zp_actor->post(zn_actorid, kMSIdClose, 0, 0);
 }
 mp_stage->wait_for_destroyed();
 mp_schd->stop();
}

测试示例

创建一个测试ActorTest对象,用于返回的结果。

class ActorTest : public Actor
{
     static const uint32 kMSIdClose = 101;
public:
     static bool post_close(ActorPtr & pp_sender)
     {
      bool zb_ok = pp_sender->post(kActorId, kMSIdClose, 0, 0);
      return zb_ok;
     }
 static const uint32 kActorId = 100;
 ActorTest(Stage * pp_stage) : Actor(pp_stage, kActorId){}
 void init() override
 {
  set_operation(ActorModb::kMSIdResult, std::bind(&ActorTest::op_modb_result, this, std::placeholders::_1));
  set_operation(kMSIdClose, std::bind(&ActorTest::op_close, this, std::placeholders::_1));
 }
private:
 void op_modb_result(Mail * pp_mail)
 {
  modb::ResultAttachment * zp_attachment = static_cast<modb::ResultAttachment *>(pp_mail->attachment);
  std::cout << "ActorTest::op_modb_result uid=" << pp_mail->uid()
   << ", result={" << zp_attachment->tostring() << "}"
   << std::endl;
  modb::revert_result(zp_attachment);
 }
 void op_close(Mail * pp_mail)
 {
  this->close();
 }
};
void main(int argc, char* argv[])
{
 modb::init();
 std::cout << std::endl << "begin" << std::endl;
 uint32 zn_qid = 0;
 Scheduler schd(0, enST_STEALING, true);
 schd.start();
 {
  Stage stage(&schd);
  { //创建Actor
   ActorPtr zp_actor = ActorPtr(new ActorModb(&stage));
   zp_actor->init();
   uint32 zn_core_num = 3;
   ModbSpecialData sd(3, 10, 600);
   zp_actor->set_special(&sd);
   stage.add_actor(zp_actor);
   ActorPtr zp_test = ActorPtr(new ActorTest(&stage));
   zp_test->init();
   stage.add_actor(zp_test);
  }{ // 开始投递信件
   ActorPtr zp_actor = stage.get(ActorTest::kActorId);
   for (uint32 i = 0; i < mongodb_col_len; ++i)
   {
    ActorModb::post_config(zp_actor, mongodb_col_indexs[i], ++zn_qid, mongodb_uri);
   }
  } { //insert_one
            ActorPtr zp_actor = stage.get(ActorTest::kActorId);
            auto f = [&](uint32 n) {
             std::string s1("{\"insert_one_key\":\"abc ");
             if (n > 0)
             {
              s1.append((n % 100) + 10, 'A' + (n % 26));
             }
             s1.append("\"}");
             for (uint32 i = 0; i < mongodb_col_len; ++i)
             {
              std::stringstream ss;
              ss << mongodb_colname << "_" << mongodb_col_indexs[i];
              ActorModb::post_insert_one(zp_actor, mongodb_col_indexs[i], ++zn_qid, mongodb_dbname, ss.str(), s1, "");
             }
            };
            for (uint32 i = 0; i < 2; ++i)
            {
             f(i);
            }
        }{ // 投递关闭信件
   ActorPtr zp_actor = stage.get(ActorModb::kActorId);
   ActorModb::post_close(zp_actor);
   THIS_SLEEP_SECONDS(13);
   ActorPtr zp_test = stage.get(ActorTest::kActorId);
   ActorTest::post_close(zp_test);
  }
  // 等待所有操作完成并销毁
  stage.wait_for_destroyed();
 }
 schd.stop();
 modb::release();
}

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:7942463
帖子:1588486
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP