|
三种基本模式(它有很多种)
1. 请求应答模式(req 和 rep)
消息双向的,有来有往,req端请求的消息,rep端必须答复给req端
2. 订阅发布模式 (sub 和 pub)
消息单向的,有去无回的。可按照发布端可发布制定主题的消息,订阅端可订阅喜欢的主题,订阅端只会收到自己已经订阅的主题。发布端发布一条消息,可被多个订阅端同事收到。
3. push pull模式
消息单向的,也是有去无回的。push的任何一个消息,始终只会有一个pull端收到消息.
后续的代理模式和路由模式等都是在三种基本模式上面的扩展或变异。
阻塞 和 非阻塞
以上三种基本模式都支持阻塞模式和非阻塞模式。
req 和 rep的阻塞模式是这样的(其实跟原生的socket实现也非常像):
req端:
#coding=gbk
import zmq
import time
c = zmq.Context()
#选择req模式
s = c.socket(zmq.REQ)
#主动去连接
s.connect('tcp://127.0.0.1:13414)
for i in range(103):
s.send('hello')
msg = s.recv()
print i,msg
time.sleep(1)
rep端:
#coding=gbk
import zmq
import time
#这里的参数1标识开启的io线程个数,默认是1,具体io线程是什么用处,暂时还不知道
c = zmq.Context(1)
#选择rep模式
s = c.socket(zmq.REP)
#绑定ip端口,这里和使用原生的socket有点像
s.bind('tcp://127.0.0.1:13414')
while True:
#阻塞等待其他地方发送消息过来
msg = s.recv()
s.send('from serv1')
time.sleep(1)
Poller
主要是zmq提供的poller的使用。
Poller的用法跟python内置的poll的用法非常像
rep端:
#coding=gbk
'''
poll模式
'''
import zmq
import time
#参数模式是1 表示开启的io线程个数
c = zmq.Context(1)
#选择req模式
s = c.socket(zmq.REP)
#绑定ip端口
s.bind('tcp://127.0.0.1:13414')
#poller
poller = zmq.Poller()
#注册POLLIN事件 这里跟原生socket的poll的用法是不是很像
poller.register(s,zmq.POLLIN)
while True:
#这里poll后面的参数是毫秒
socks = dict(poller.poll(1))
if s in socks and socks[s] == zmq.POLLIN:
#监控如果socket有可读入的事件 那么接收消息
message = s.recv()
time.sleep(1)
#回个消息过去
s.send(str(time.time()))
另外还有个不太优雅的非阻塞用法
就是sock.recv(zmq.NOBLOCK) 通过制定NOBLOCK,这句话会抛出异常,需要捕获。不过不太建议采用这种NOBLOCK的方式,因为poller已经很好用了。
同样订阅发布模式和push pull模式也都可以使用poller来实现非阻塞。
订阅发布模式的不可靠性
原因:zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布
#coding=gbk
import sys
import time
import zmq
def main():
ctx = zmq.Context()
#选择发布模式
s = ctx.socket(zmq.PUB)
#绑定ip端口
s.bind("tcp://*:12341")
# 此时订阅端和发布端可能还未建议好连接
# 发布消息会导致消息丢失
# 如果这里不sleep一会 可能
#time.sleep(0.1)
for i in range(2):
#发送‘test’主题的消息 消息内容是 hello
print s.send_multipart(['test', 'hello'])
#time.sleep(0.3)
if __name__ == "__main__":
main()
如果s.bind()之后,马上就s.send()的话,很有可能send的消息(测试的结果发现是“一定”)没有收到。是因为s.bind实际并不是表示与订阅端已经建立好连接。所以解决办法之一是在s.bind之后sleep一段时间,但这个解决办法对一些情况来说,很不好用,例如用户的某个操作以后,需要通过zeromq的这种订阅发布模式,发布一些信息出去,交由其他程序来异步处理某些事情,但是由于这里需要sleep一会,并且我们实际不知道应该sleep多少时间才会建立好连接,而用户的客户端又要等着做完刚才的操作返回。
官方还提供了另外一种解决办法,订阅端和服务端之间同时开启rep和req模式,发布端先通过req发送一个消息,等发布端响应之后,再发布消息。
总体的来说,zeromq的订阅发布模式在某些场合下面很不好用,相比其他的订阅发布服务例如icestorm或redis的订阅发布服务来说易用性或稳定性还是差一点的。
redis的订阅者和发布者中间有个redis server,所有传输的消息都经过redis server.而icestorm也有一个icebox,所有传输的消息都需要经过icebox.,zeromq的发布者和订阅者之间是直接相连的.
icestorm的消息会保存在订阅端,订阅端内存不断增大直至内存耗尽崩溃.
redis订阅者的消息会保存在redis server端,不在订阅端,redis server内存不断增大而崩溃
zeromq消息会保存在订阅端内存中,到达一定数量,会将消息从内存同步至硬盘中 |