要设计:
本项目主要由服务器端和客户端两大模块组成,每个模块的主要功能有:
服务器端:
利用多线程实现并行,结合半同步半异步的网络模型完成线程之间的任务分工,利用socket_pair完成线程之间的数据交互
1>负责绑定与监听目标端口
2>为每个客户建立连接,并将套接字按照子线程压力分发给子线程监听,子线程利用libevent实现I/O复用监听客户端的套接字,利用MVC模式通过判断客户端的请求类型调用不同的视图处理用户请求。
3>循环接受并处理客户消息,并反馈给客户。
4>利用mysql数据库存储用户相关信息以及离线消息。
5>利用memcached实现数据库和内存之间的高速缓存。
客户端:
1>请求与服务器端口的连接
2>利用多线程实现,采取输入命令的方式,让用户选择服务类型,根据不同的服务类型调用不同的处理函数,完成请求
3>利用json和自定义的上层协议完成和服务器端的数据的交互,
4>在登录成功后,启动一个线程接收服务器端的消息
详细设计:
服务器端主要由五个类组成:
mysql类::进行数据库的连接与选择。
tcpsever类::创建服务器,创建cpu内核数-1个子线程,创建线程个数个双向管道,接受用户连接,并将客户端套接字通过sock_pair发给当前监听数量最少的子线程(pthread)。
pthread类::启动子线程,接收主线程传过来的客户端套接字,并给主线程回复当前监听的客户端数量,监听客户端套接字,并接受客户端发送的消息发送给控制台(control)。
control类::解析客户端发来的数据,并根据其类型调用相关的视图(view)。
view类::对发来的数据做出相关的处理。
客户端主要有主线程和子线程:
主线程连接服务器,进入死循环,让用户选择服务(注册,登录,退出)
登录之后进入子线程等待接收服务器发送来的消息。让用户选择接下来的服务内容。


最后进行了服务器的最大负载量的测试:
在默认的打开文件描述符的情况下,最多连接客户端1021个,
将默认个数改调后,发现最多客户端连接是28231个。

main::
int main(int argc,char *argv[])
{
cout<<"main begin:"<<endl;
if(argc < 4)
{
cout<<"error"<<endl;
return 0;
}
//分离参数
int port = atoi(argv[2]);
char *ip = argv[1];
short pth_num = atoi(argv[3]);
Tcpsever sever(ip,port,pth_num);
sever.run();
}
tcpsever::
int i = 0;
void listen_cb(int fd,short event,void *arg)
{
i++;
Tcpsever *ser = (Tcpsever*)arg;
//接受用户链接
struct sockaddr_in cli;
socklen_t len = sizeof(cli);
int cli_fd = accept(fd,(struct sockaddr*)&cli,&len);
assert(cli_fd != -1);
cout<<"one cli linked!"<<endl;
cout<<"次数:"<<i<<endl;
//查找当前监听数量最少的子线程
map<int,int>::iterator it = ser->_pth_work_num.begin();
int min_num = it->second; //最小个数
int min_fd;//监听最少的对应的fd
for(;it != ser->_pth_work_num.end();++it)
{
if(it->second <= min_num)
{
min_num = it->second;
min_fd = it->first;
}
}
//将客户端套接子通过socktpair发给子线程
char sendbuff[128] = {0};
sprintf(sendbuff,"%d",cli_fd);
send(min_fd,sendbuff,strlen(sendbuff),0);
}
void sock_pair_cb(int fd,short event,void *arg)
{
Tcpsever *ser = (Tcpsever*)arg;
//读取管道内容
char pairbuff[128] = {0};
recv(fd,pairbuff,127,0);
//更新到map表_pth_work_num ----->fd
map<int,int>::iterator it = ser->_pth_work_num.find(fd);
it->second = atoi(pairbuff);
}
Tcpsever::Tcpsever(char *ip,short port,int pth_num)
{
cout<<"tcpsever begin:"<<endl;
///创建服务器
int _listen_fd = socket(AF_INET,SOCK_STREAM,0);
if(-1 == _listen_fd)
{
cerr<<"socket fail;error:"<<errno<<endl;
return;
}
struct sockaddr_in ser;
ser.sin_family = AF_INET;
ser.sin_port = htons(port);
ser.sin_addr.s_addr = inet_addr(ip);
_pth_num = pth_num;
if(-1 == bind(_listen_fd,(struct sockaddr*)&ser,sizeof(ser)))
{
cerr<<"bind fail;errno:"<<errno<<endl;
throw "";
}
if(-1 == listen(_listen_fd,20))
{
cerr<<"listen fail;errno:"<<errno<<endl;
throw "";
}
//给libevent申请空间
_base = event_base_new();
//创建事件,绑定监听套接子的回调函数(listen_cb)
struct event *listen_event = event_new(_base,_listen_fd,EV_READ|EV_PERSIST,listen_cb,this);
if(NULL == listen_event)
{
cerr<<"event new fail;errno:"<<errno<<endl;
throw "";
}
//添加到事件列表
event_add(listen_event,NULL);
}
void Tcpsever::run()
{
//申请socketpair(函数自查)
get_sock_pair();
//创建线程//规定 int arr[2] arr[0]<=>主线程占用 arr[1]<=>子线程占用
get_pthread();
//为主线程的socktpair创建事件,绑定回调函数(sock_pair_cb)
int i;
for(i=0; i<_pth_num; i++)
{
struct event* sock_event = event_new(_base,_socket_pair[i].arr[0],EV_READ|EV_PERSIST,sock_pair_cb,this);
event_add(sock_event,NULL);
}
event_base_dispatch(_base);
}
void Tcpsever::get_sock_pair()
{
int i = 0;
for(i = 0;i < _pth_num;i++ )
{
//申请双向管道
int arr[2];
if(-1 == socketpair(AF_UNIX,SOCK_STREAM,0,arr))
{
cerr<<"socketpair fail;errno:"<<errno<<endl;
return;
}
//将双向管道加入到_sock_pair.push_back();
_socket_pair.push_back(MVEC(arr));
_pth_work_num.insert(make_pair(arr[0],0));
}
}
void Tcpsever::get_pthread()
{
//开辟线程
for(int i = 0; i< _pth_num; i++)
{
_pthread.push_back(new Pthread(_socket_pair[i].arr[1]));
}
}
Tcpsever::~Tcpsever()
{
}
pthread::
extern Control control_sever;
void *pthread_run(void *arg);
void sock_pair_1_cb(int fd,short event,void *arg);
void client_cb(int fd,short int event,void *arg);
Pthread::Pthread(int sock_fd)
{
cout<<"Pthread begin:"<<endl;
_sock_fd = sock_fd;
_base = event_base_new();
//启动线程
pthread_create(&_pthread,NULL,pthread_run,this);
}
void* pthread_run(void *arg)
{
Pthread *pth = (Pthread *)arg;
//将sock_pair_1加入到libevent sock_pair_1_cb()
struct event* _event = event_new(pth->_base,pth->_sock_fd,EV_READ|EV_PERSIST,sock_pair_1_cb,arg);
event_add(_event,NULL);
pth->_event_map.insert(make_pair(pth->_sock_fd,_event));
event_base_dispatch(pth->_base);
}
void sock_pair_1_cb(int fd,short int event,void *arg)
{
Pthread *pth = (Pthread *)arg;
//recv -> clien_fd
char buff[128] = {0};
if(0 >recv(fd,buff,127,0))
{
cout<<"recv fail;errno:"<<errno<<endl;
exit(1);
}
int cli_fd = atoi(buff);
//将client_fd加入到libevent client_cb()
struct event* cli_event = event_new(pth->_base,cli_fd,EV_READ|EV_PERSIST,client_cb,arg);
if(NULL == cli_event)
{
cerr<<"client create fail;errno:"<<errno<<endl;
exit(1);
}
event_add(cli_event,NULL);
//给主线程回复当前监听的客户端数量
//插入到map表中
pth->_event_map.insert(make_pair(cli_fd,cli_event));
int cli_fd_num = pth->_event_map.size();
char buf[128] = {0};
sprintf(buf,"%d",cli_fd_num);
send(fd,buff,strlen(buff),0);
}
void client_cb(int fd,short event,void *arg)
{
cout<<"pthread::recv"<<endl;
Pthread *pth = (Pthread*)arg;
//recv ->buff
char buff[128] = {0};
if(0 > recv(fd,buff,127,0))
{
cout<<"revc buff fail;errno:"<<errno<<endl;
return;
}
cout<<buff<<endl;
control_sever.process(fd,buff);
//判断退出并将客户端退出
Json::Value val;
Json::Reader read;
if(-1 == read.parse(buff,val))
{
cerr<<"pthread::read fail;errno:"<<errno<<endl;
return;
}
if(val["reason_type"] == REASON_TYPE_EXIT)
{
map<int,struct event*>::iterator it =pth->_event_map.find(fd);
if(pth->_event_map.end()!=it)
{
event_del(it->second);
close(fd);
pth->_event_map.erase(it);
}
}
}
Pthread::~Pthread()
{
}
control::
Control::Control()
{
cout<<"control begin:"<<endl;
_map.insert(make_pair(REASON_TYPE_REGISTER,new Register()));
_map.insert(make_pair(REASON_TYPE_LOGIN,new Login()));
_map.insert(make_pair(REASON_TYPE_LIST,new Get_list()));
_map.insert(make_pair(REASON_TYPE_EXIT,new Exit()));
_map.insert(make_pair(REASON_TYPE_TALK,new Talk_one()));
_map.insert(make_pair(REASON_TYPE_GROUP,new Talk_group()));
}
void Control::process(int fd,char *json)
{
Json::Value val;
Json::Reader read;
if(-1 == read.parse(json,val))
{
cerr<<"Control process json parse fail;errno:"<<errno<<endl;
return;
}
map<int,View*>::iterator it = _map.find(val["reason_type"].asInt());
if(it == _map.end())
{
cout<<"reason type not find!"<<endl;
}
else
{
it->second->process(fd,json);
it->second->response();
}
}
Control control_sever;
mysql::
Mysql::Mysql()
{
_mpcon = mysql_init((MYSQL*)0);
if(NULL == _mpcon)
{
cerr<<"_mpcon NULL;errno:"<<errno<<endl;
exit(0);
}
//链接数据库
if(!mysql_real_connect(_mpcon,"127.0.0.1","root","123456",NULL,3306,NULL,0))
{
cerr<<"mysql connect fail;errno:"<<errno<<endl;
exit(0);
}
//选择数据库
if(mysql_select_db(_mpcon,"chat"))
{
cerr<<"database select fail;errno:"<<errno<<endl;
exit(0);
}
}
Mysql::~Mysql()
{
if(NULL != _mp_res)
{
mysql_free_result(_mp_res);
}
mysql_close(_mpcon);
}
Mysql Mysql_sever;
register::
extern Mysql Mysql_sever;
void Register::process(int fd,char* json)
{
cout<<"register::begin"<<endl;
_fd = fd;
//解析json
Json::Value val;
Json::Reader read;
if(-1 == read.parse(json,val))
{
cerr<<"read fail;errno:"<<errno<<endl;
return;
}
//name pw
//在数据库中查找name有没有重复
char cmd[100] = "SELECT *FROM user WHERE name='";
strcat(cmd,val["name"].asString().c_str());
strcat(cmd,"';");
cout<<cmd<<endl;
if(mysql_real_query(Mysql_sever._mpcon,cmd,strlen(cmd)))
{
cerr<<" select fail ;errno:"<<errno<<endl;
return;
}
Mysql_sever._mp_res = mysql_store_result(Mysql_sever._mpcon);
if(!(Mysql_sever._mp_row = mysql_fetch_row(Mysql_sever._mp_res)))
{
//将name pw加入到数据库的user
char cmd1[100] = "INSERT INTO user VALUE('";
strcat(cmd1,val["name"].asString().c_str());
strcat(cmd1,"','");
strcat(cmd1,val["pw"].asString().c_str());
strcat(cmd1,"');");
cout<<cmd1<<endl;
if(mysql_real_query(Mysql_sever._mpcon,cmd1,strlen(cmd1)))
{
cerr<<"insert fail;errno:"<<errno<<endl;
return;
}
_str = "register success";
}
else
{
_str = "register fail";
}
}
void Register::response()
{
//用json将消息打包,发送给客户端
Json::Value root;
root["reason"] = _str;
if(-1 == send(_fd,root.toStyledString().c_str(),strlen(root.toStyledString().c_str()),0))
{
cerr<<"register send reason fail;errno:"<<errno<<endl;
return;
}
}
|