使用Protocol Buffers入门四步骤

论坛 期权论坛 脚本     
匿名网站用户   2020-12-19 20:38   11   0

Protocol Buffers(简称protobuf)是谷歌的一项技术,用于将结构化的数据序列化、反序列化,经常用于网络传输。

protobuf是谷歌的Protocol Buffers的简称,用于结构化数据和字节码之间互相转换(序列化、反序列化,即实现从结构体转换为字节流(编码,向LIS发送消息时使用)以及从字节流转换为结构体(解码,从LIS接收消息时使用)的功能。),一般应用于网络传输,可支持多种编程语言。

这货实际上类似于XML生成和解析,但protobuf的效率高于XML,不过protobuf生成的是字节码,可读性比XML差。类似的还有json、Java的Serializable等。

protobuf支持各种语言。本文以Java为例,简单介绍protobuf如何使用。其他语言使用方法类似。

首先需要下载:

http://download.csdn.net/download/xiao__gui/7586617

解压后有两个文件:protobuf-java-2.5.0.jar和protoc.exe。

protobuf-java-2.5.0.jar即protobuf所需要的jar包,如果用maven的话可以无视这个文件;

protoc.exe是protobuf代码生成工具。

第一步:定义数据结构

首先要定义protobuf的数据结构,这里要写一个.proto文件。这个文件有点类似于定义一个类。例如定义一个Person,保存文件PersonMsg.proto(注意文件名和里面的message名不要一样)。

  1. messagePerson{
  2. //ID(必需)
  3. requiredint32id=1;
  4. //姓名(必需)
  5. requiredstringname=2;
  6. //email(可选)
  7. optionalstringemail=3;
  8. //朋友(集合)
  9. repeatedstringfriends=4;
  10. }

上面的1、2、3、4是unique numbered tag,是一个唯一标识。

上面的例子中定义了一个非常简单的数据结构,当然还可以定义更复杂的结构,这里不再讨论,具体可以看官方文档。

第二步:protoc.exe生成Java代码

使用文件protoc.exe,cmd命令行运行:

protoc.exe --java_out=E:\JavaPersonMsg.proto

输入文件是PersonMsg.proto,也就是定义数据结构的文件;输出文件夹是E:\java,将java文件生成在E:\java中。运行命令成功后会生成PersonMsg.java:


在Eclipse中创建一个项目,将java文件拷贝到项目中。项目中需要引入protobuf-java-2.5.0.jar包。如果是maven项目则加入:

  1. <dependency>
  2. <groupId>com.google.protobuf</groupId>
  3. <artifactId>protobuf-java</artifactId>
  4. <version>2.5.0</version>
  5. </dependency>

第三步:序列化
第四步:反序列化

一般来说,序列化和反序列化是分开的。例如网络传输,由一方将数据序列化后发送给另一方来接收并解析,序列化发送和接收反序列化并不在一起。但是下面为了例子简单将二者写在同一程序中。

  1. importjava.io.ByteArrayInputStream;
  2. importjava.io.ByteArrayOutputStream;
  3. importjava.io.IOException;
  4. importjava.util.List;
  5. publicclassMain{
  6. publicstaticvoidmain(String[]args)throwsIOException{
  7. //按照定义的数据结构,创建一个Person
  8. PersonMsg.Person.BuilderpersonBuilder=PersonMsg.Person.newBuilder();
  9. personBuilder.setId(1);
  10. personBuilder.setName("叉叉哥");
  11. personBuilder.setEmail("xxg@163.com");
  12. personBuilder.addFriends("FriendA");
  13. personBuilder.addFriends("FriendB");
  14. PersonMsg.Personxxg=personBuilder.build();
  15. //将数据写到输出流,如网络输出流,这里就用ByteArrayOutputStream来代替
  16. ByteArrayOutputStreamoutput=newByteArrayOutputStream();
  17. xxg.writeTo(output);
  18. //--------------分割线:上面是发送方,将数据序列化后发送---------------
  19. byte[]byteArray=output.toByteArray();
  20. //--------------分割线:下面是接收方,将数据接收后反序列化---------------
  21. //接收到流并读取,如网络输入流,这里用ByteArrayInputStream来代替
  22. ByteArrayInputStreaminput=newByteArrayInputStream(byteArray);
  23. //反序列化
  24. PersonMsg.Personxxg2=PersonMsg.Person.parseFrom(input);
  25. System.out.println("ID:"+xxg2.getId());
  26. System.out.println("name:"+xxg2.getName());
  27. System.out.println("email:"+xxg2.getEmail());
  28. System.out.println("friend:");
  29. List<String>friends=xxg2.getFriendsList();
  30. for(Stringfriend:friends){
  31. System.out.println(friend);
  32. }
  33. }
  34. }


作者:叉叉哥 转载请注明出处:http://blog.csdn.net/xiao__gui/article/details/36643949


protobuf是谷歌的Protocol Buffers的简称,用于结构化数据和字节码之间互相转换(序列化、反序列化),一般应用于网络传输,可支持多种编程语言。

protobuf如何使用这里不再介绍,本文主要介绍在MINA、Netty、Twisted中如何使用protobuf,不了解protobuf的同学可以去参考我的另一篇博文

前面的一篇博文中,有介绍到一种用一个固定为4字节的前缀Header来指定Body的字节数的一种消息分割方式,在这里同样要使用到。只是其中Body的内容不再是字符串,而是protobuf字节码。


在处理业务逻辑时,肯定不希望还要对数据进行序列化和反序列化,而是希望直接操作一个对象,那么就需要有相应的编码器和解码器,将序列化和反序列化的逻辑写在编码器和解码器中。有关编码器和解码器的实现,上一篇博文中有介绍。

Netty包中已经自带针对protobuf的编码器和解码器,那么就不用再自己去实现了。而MINA、Twisted还需要自己去实现protobuf的编码器和解码器。

这里定义一个protobuf数据结构,用于描述一个学生的信息,保存为StudentMsg.proto文件:

  1. messageStudent{
  2. //ID
  3. requiredint32id=1;
  4. //姓名
  5. requiredstringname=2;
  6. //email
  7. optionalstringemail=3;
  8. //朋友
  9. repeatedstringfriends=4;
  10. }

用StudentMsg.proto分别生成Java和Python代码,将代码加入到相应的项目中。生成的代码就不再贴上来了。

下面分别介绍在Netty、MINA、Twisted如何使用protobuf来传输Student信息。

Netty:

Netty自带protobuf的编码器和解码器,分别是ProtobufEncoder和ProtobufDecoder。需要注意的是,ProtobufEncoder和ProtobufDecoder只负责protobuf的序列化和反序列化,而处理消息Header前缀和消息分割的还需要LengthFieldBasedFrameDecoder和LengthFieldPrepender。LengthFieldBasedFrameDecoder即用于解析消息Header前缀,根据Header中指定的Body字节数截取Body,LengthFieldPrepender用于在wirte消息时在消息前面添加一个Header前缀来指定Body字节数。

  1. publicclassTcpServer{
  2. publicstaticvoidmain(String[]args)throwsInterruptedException{
  3. EventLoopGroupbossGroup=newNioEventLoopGroup();
  4. EventLoopGroupworkerGroup=newNioEventLoopGroup();
  5. try{
  6. ServerBootstrapb=newServerBootstrap();
  7. b.group(bossGroup,workerGroup)
  8. .channel(NioServerSocketChannel.class)
  9. .childHandler(newChannelInitializer<SocketChannel>(){
  10. @Override
  11. publicvoidinitChannel(SocketChannelch)
  12. throwsException{
  13. ChannelPipelinepipeline=ch.pipeline();
  14. //负责通过4字节Header指定的Body长度将消息切割
  15. pipeline.addLast("frameDecoder",
  16. newLengthFieldBasedFrameDecoder(1048576,0,4,0,4));
  17. //负责将frameDecoder处理后的完整的一条消息的protobuf字节码转成Student对象
  18. pipeline.addLast("protobufDecoder",
  19. newProtobufDecoder(StudentMsg.Student.getDefaultInstance()));
  20. //负责将写入的字节码加上4字节Header前缀来指定Body长度
  21. pipeline.addLast("frameEncoder",newLengthFieldPrepender(4));
  22. //负责将Student对象转成protobuf字节码
  23. pipeline.addLast("protobufEncoder",newProtobufEncoder());
  24. pipeline.addLast(newTcpServerHandler());
  25. }
  26. });
  27. ChannelFuturef=b.bind(8080).sync();
  28. f.channel().closeFuture().sync();
  29. }finally{
  30. workerGroup.shutdownGracefully();
  31. bossGroup.shutdownGracefully();
  32. }
  33. }
  34. }

处理事件时,接收和发送的参数直接就是Student对象:

  1. publicclassTcpServerHandlerextendsChannelInboundHandlerAdapter{
  2. @Override
  3. publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg){
  4. //读取客户端传过来的Student对象
  5. StudentMsg.Studentstudent=(StudentMsg.Student)msg;
  6. System.out.println("ID:"+student.getId());
  7. System.out.println("Name:"+student.getName());
  8. System.out.println("Email:"+student.getEmail());
  9. System.out.println("Friends:");
  10. List<String>friends=student.getFriendsList();
  11. for(Stringfriend:friends){
  12. System.out.println(friend);
  13. }
  14. //新建一个Student对象传到客户端
  15. StudentMsg.Student.Builderbuilder=StudentMsg.Student.newBuilder();
  16. builder.setId(9);
  17. builder.setName("服务器");
  18. builder.setEmail("123@abc.com");
  19. builder.addFriends("X");
  20. builder.addFriends("Y");
  21. StudentMsg.Studentstudent2=builder.build();
  22. ctx.writeAndFlush(student2);
  23. }
  24. @Override
  25. publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause){
  26. cause.printStackTrace();
  27. ctx.close();
  28. }
  29. }

MINA:

在MINA中没有针对protobuf的编码器和解码器,但是可以自己实现一个功能和Netty一样的编码器和解码器。

编码器:

  1. publicclassMinaProtobufEncoderextendsProtocolEncoderAdapter{
  2. @Override
  3. publicvoidencode(IoSessionsession,Objectmessage,
  4. ProtocolEncoderOutputout)throwsException{
  5. StudentMsg.Studentstudent=(StudentMsg.Student)message;
  6. byte[]bytes=student.toByteArray();//Student对象转为protobuf字节码
  7. intlength=bytes.length;
  8. IoBufferbuffer=IoBuffer.allocate(length+4);
  9. buffer.putInt(length);//writeheader
  10. buffer.put(bytes);//writebody
  11. buffer.flip();
  12. out.write(buffer);
  13. }
  14. }

解码器:

  1. publicclassMinaProtobufDecoderextendsCumulativeProtocolDecoder{
  2. @Override
  3. protectedbooleandoDecode(IoSessionsession,IoBufferin,
  4. ProtocolDecoderOutputout)throwsException{
  5. //如果没有接收完Header部分(4字节),直接返回false
  6. if(in.remaining()<4){
  7. returnfalse;
  8. }else{
  9. //标记开始位置,如果一条消息没传输完成则返回到这个位置
  10. in.mark();
  11. //读取header部分,获取body长度
  12. intbodyLength=in.getInt();
  13. //如果body没有接收完整,直接返回false
  14. if(in.remaining()<bodyLength){
  15. in.reset();//IoBufferposition回到原来标记的地方
  16. returnfalse;
  17. }else{
  18. byte[]bodyBytes=newbyte[bodyLength];
  19. in.get(bodyBytes);//读取body部分
  20. StudentMsg.Studentstudent=StudentMsg.Student.parseFrom(bodyBytes);//将body中protobuf字节码转成Student对象
  21. out.write(student);//解析出一条消息
  22. returntrue;
  23. }
  24. }
  25. }
  26. }

MINA服务器加入protobuf的编码器和解码器:

  1. publicclassTcpServer{
  2. publicstaticvoidmain(String[]args)throwsIOException{
  3. IoAcceptoracceptor=newNioSocketAcceptor();
  4. //指定protobuf的编码器和解码器
  5. acceptor.getFilterChain().addLast("codec",
  6. newProtocolCodecFilter(newMinaProtobufEncoder(),newMinaProtobufDecoder()));
  7. acceptor.setHandler(newTcpServerHandle());
  8. acceptor.bind(newInetSocketAddress(8080));
  9. }
  10. }

这样,在处理业务逻辑时,就和Netty一样了:

  1. publicclassTcpServerHandleextendsIoHandlerAdapter{
  2. @Override
  3. publicvoidexceptionCaught(IoSessionsession,Throwablecause)
  4. throwsException{
  5. cause.printStackTrace();
  6. }
  7. @Override
  8. publicvoidmessageReceived(IoSessionsession,Objectmessage)
  9. throwsException{
  10. //读取客户端传过来的Student对象
  11. StudentMsg.Studentstudent=(StudentMsg.Student)message;
  12. System.out.println("ID:"+student.getId());
  13. System.out.println("Name:"+student.getName());
  14. System.out.println("Email:"+student.getEmail());
  15. System.out.println("Friends:");
  16. List<String>friends=student.getFriendsList();
  17. for(Stringfriend:friends){
  18. System.out.println(friend);
  19. }
  20. //新建一个Student对象传到客户端
  21. StudentMsg.Student.Builderbuilder=StudentMsg.Student.newBuilder();
  22. builder.setId(9);
  23. builder.setName("服务器");
  24. builder.setEmail("123@abc.com");
  25. builder.addFriends("X");
  26. builder.addFriends("Y");
  27. StudentMsg.Studentstudent2=builder.build();
  28. session.write(student2);
  29. }
  30. }

Twisted:

在Twisted中,首先定义一个ProtobufProtocol类,继承Protocol类,充当编码器和解码器。处理业务逻辑的TcpServerHandle类再继承ProtobufProtocol类,调用或重写ProtobufProtocol提供的方法。

  1. #-*-coding:utf-8–*-
  2. fromstructimportpack,unpack
  3. fromtwisted.internet.protocolimportFactory
  4. fromtwisted.internet.protocolimportProtocol
  5. fromtwisted.internetimportreactor
  6. importStudentMsg_pb2
  7. #protobuf编码、解码器
  8. classProtobufProtocol(Protocol):
  9. #用于暂时存放接收到的数据
  10. _buffer=b""
  11. defdataReceived(self,data):
  12. #上次未处理的数据加上本次接收到的数据
  13. self._buffer=self._buffer+data
  14. #一直循环直到新的消息没有接收完整
  15. whileTrue:
  16. #如果header接收完整
  17. iflen(self._buffer)>=4:
  18. #header部分,按大字节序转int,获取body长度
  19. length,=unpack(">I",self._buffer[0:4])
  20. #如果body接收完整
  21. iflen(self._buffer)>=4+length:
  22. #body部分,protobuf字节码
  23. packet=self._buffer[4:4+length]
  24. #protobuf字节码转成Student对象
  25. student=StudentMsg_pb2.Student()
  26. student.ParseFromString(packet)
  27. #调用protobufReceived传入Student对象
  28. self.protobufReceived(student)
  29. #去掉_buffer中已经处理的消息部分
  30. self._buffer=self._buffer[4+length:]
  31. else:
  32. break;
  33. else:
  34. break;
  35. defprotobufReceived(self,student):
  36. raiseNotImplementedError
  37. defsendProtobuf(self,student):
  38. #Student对象转为protobuf字节码
  39. data=student.SerializeToString()
  40. #添加Header前缀指定protobuf字节码长度
  41. self.transport.write(pack(">I",len(data))+data)
  42. #逻辑代码
  43. classTcpServerHandle(ProtobufProtocol):
  44. #实现ProtobufProtocol提供的protobufReceived
  45. defprotobufReceived(self,student):
  46. #将接收到的Student输出
  47. print'ID:'+str(student.id)
  48. print'Name:'+student.name
  49. print'Email:'+student.email
  50. print'Friends:'
  51. forfriendinstudent.friends:
  52. printfriend
  53. #创建一个Student并发送给客户端
  54. student2=StudentMsg_pb2.Student()
  55. student2.id=9
  56. student2.name='服务器'.decode('UTF-8')#中文需要转成UTF-8字符串
  57. student2.email='123@abc.com'
  58. student2.friends.append('X')
  59. student2.friends.append('Y')
  60. self.sendProtobuf(student2)
  61. factory=Factory()
  62. factory.protocol=TcpServerHandle
  63. reactor.listenTCP(8080,factory)
  64. reactor.run()


下面是Java编写的一个客户端测试程序:

  1. publicclassTcpClient{
  2. publicstaticvoidmain(String[]args)throwsIOException{
  3. Socketsocket=null;
  4. DataOutputStreamout=null;
  5. DataInputStreamin=null;
  6. try{
  7. socket=newSocket("localhost",8080);
  8. out=newDataOutputStream(socket.getOutputStream());
  9. in=newDataInputStream(socket.getInputStream());
  10. //创建一个Student传给服务器
  11. StudentMsg.Student.Builderbuilder=StudentMsg.Student.newBuilder();
  12. builder.setId(1);
  13. builder.setName("客户端");
  14. builder.setEmail("xxg@163.com");
  15. builder.addFriends("A");
  16. builder.addFriends("B");
  17. StudentMsg.Studentstudent=builder.build();
  18. byte[]outputBytes=student.toByteArray();//Student转成字节码
  19. out.writeInt(outputBytes.length);//writeheader
  20. out.write(outputBytes);//writebody
  21. out.flush();
  22. //获取服务器传过来的Student
  23. intbodyLength=in.readInt();//readheader
  24. byte[]bodyBytes=newbyte[bodyLength];
  25. in.readFully(bodyBytes);//readbody
  26. StudentMsg.Studentstudent2=StudentMsg.Student.parseFrom(bodyBytes);//body字节码解析成Student
  27. System.out.println("Header:"+bodyLength);
  28. System.out.println("Body:");
  29. System.out.println("ID:"+student2.getId());
  30. System.out.println("Name:"+student2.getName());
  31. System.out.println("Email:"+student2.getEmail());
  32. System.out.println("Friends:");
  33. List<String>friends=student2.getFriendsList();
  34. for(Stringfriend:friends){
  35. System.out.println(friend);
  36. }
  37. }finally{
  38. //关闭连接
  39. in.close();
  40. out.close();
  41. socket.close();
  42. }
  43. }
  44. }

用客户端分别测试上面三个TCP服务器:

服务器输出:

ID:1
Name:客户端
Email:xxg@163.com
Friends:
A
B

客户端输出:

Header:32
Body:
ID:9
Name:服务器
Email:123@abc.com
Friends:
X
Y


作者:叉叉哥 转载请注明出处:http://blog.csdn.net/xiao__gui/article/details/38864961


分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:1136255
帖子:227251
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP