|
本周花时间专门学习了 Erlang 的 TCP 内容,一直以来都停留在应用层面,没有自己亲手完整的实现过,总觉得不踏实,这次亲自动手写的过程果然就遇到了问题。
Erlang 的 TCP 提供了比较完善的 TCP 封装,但是 Erlang 提供的连接接口是同步的,而大部分情况我们需要用到的都是异步的情况,所以会用到内部的 prim_inet 模块,关于这个模块网上有很多的教程,主要就是这个模块是没有官方文档的,也就是说非官方推荐使用。
接下来看具体的一些代码,首先我们需要在服务器启动一个监听:
SockOpts = [binary, {packet, 0}, {reuseaddr, true}, {nodelay, true}, {active, false}],
case gen_tcp:listen(Port, SockOpts) of
{ok, LSock} ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} ->
{ok, #state{listen_socket = LSock, ref = Ref}};
Error ->
{stop, {listen_error, Error}}
end;
{error, eaddrinuse} ->
{stop, {port_inuse, Port}};
{error, Reason} ->
{stop, {cannot_listen, Port, Reason}}
end.
todo Socket 选项的作用
gen_tcp:listen(Port, SockOpts) 接口返回 {ok, LSock} 就意味着在端口 Port 处成功建立监听。接下来产品中的一般做法是建立一个连接池调用 gen_tcp:accept 来接收连接,我在这里只是为了实验,所以采用了在同一个进程监听端口和接收并处理 Socket。
prim_inet:async_accept(LSock, -1)的作用就是异步接收一个 Socket 连接,当有连接进入时会收到 {inet_async, LSock, Ref, {ok, ClientSock}} 这么一条消息,ClientSock 就是连接的 Socket,Ref 是函数返回的唯一标识。
所以接下来处理接收就是匹配该消息,这里可以使用 receive 接收也可以使用 gen_server 的 handle_info 来处理,一般情况都会使用一个 gen_server 进程,毕竟 Erlang 提供了完善的消息封装。
handle_info({inet_async, LSock, Ref, {ok, ClientSock}},
try
prim_inet:async_recv(ClientSock, 0, -1), %% 这一句调用是异步接收已连接的 Socket 发来的数据
{ok, Mod} = inet_db:lookup_socket(LSock),
inet_db:register_socket(ClientSock, Mod), %% 这里是个坑,如果是自己使用 prim_inet 的话,一定要进行这一步,否则可以接收数据无法发送数据
%% 注意这一句,每次处理完连接的 Socket 之后都需要重新去异步接收新的 Socket
%% @todo 应该是和 {active, false} 有关?
case prim_inet:async_accept(LSock, -1) of
{ok, NRef} ->
{noreply, State
Error ->
close_sockets(Clients),
{stop, Error, State}
end
catch _:Reason ->
close_sockets(ClientSock),
{noreply, State}
end;
%% receive socket data
handle_info({inet_async, ClientSock, _Ref, {ok, Data}}, #state{} = State) ->
do_handle_msg(ClientSock, Data),
%% 注意这里是第二个坑,处理完数据之后必须重新设置接收数据,否则无法收到后序数据
prim_inet:async_recv(ClientSock, 0, -1),
{noreply, State};
%% client socket closed
handle_info({inet_async, ClientSock, _Ref, {error, Reason}}, #state{clients = Clients} = State) ->
close_sockets(ClientSock),
{noreply, State#state{clients = lists:delete(ClientSock, Clients)}};
这里是用的同一个进程同时处理监听和处理数据,所以需要注意监听到连接的消息和连接发送数据的消息的区分,使用模式匹配来区分两者。其次就是每次处理完接收的连接和数据之后需要重新接收,这点应该是 {active, false}选项的作用,需要后面深入了解。
接下来是客户端部分,客户端也采用非阻塞的方式:
do_connect(IP, Port, N) ->
?INFO("第 ~p 次连接服务器 ~p:~p", [N, IP, Port]),
case gen_tcp:connect(IP, Port, ?TCP_OPTS, 3000) of
{ok, Socket} ->
%% 连接成功之后设置接收数据
prim_inet:async_recv(Socket, 0, -1),
{ok, Socket};
{error, Reason} ->
case N > 0 of
true ->
%% 如果指定次数内连接失败则重试
do_connect(IP, Port, N -1);
false ->
?ERROR("连接服务器 ~p:~p 失败, Reason:~p", [IP, Port, Reason]),
{error, Reason}
end
end.
客户端连接失败会尝试重连,直到指定次数还没连接成功则失败。TCP 连接成功后,Socket 处理消息就都是一样的了。
%% receive server socket data
handle_info({inet_async, Socket, _Ref, {ok, Data}}, #state{} = State) ->
do_handle_data(binary_to_term(Data)),
prim_inet:async_recv(Socket, 0, -1),
{noreply, State};
%% socket close
handle_info({inet_async, Socket, _Ref, {error, Reason}}, State) ->
close_socket(Socket),
{stop, Reason, State};
以上就是关于 Erlang TCP 的使用记录,虽然很简单,但是如果不自己写一遍,总是似是而非,一直都不会真正理解。 |