前文曾提到过命名机制,事实上它是一把双刃剑。在能够持有数据等待重新连接的时候,也增加了持有数据方的负担(危险),特别是在"发布/订阅"模式下,可谓牵一发而动全身。
这里先给出一组示例,在代码的运行过程中,通过重启消费者来观察发布者的进程状态。
发布端:
import zmq
import time
context = zmq.Context()
sync = context.socket(zmq.PULL)
sync.bind("tcp://*:5564")
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5565")
sync_request = sync.recv()
for n in xrange(10):
msg = "Update %d" % n
publisher.send(msg)
time.sleep(1)
publisher.send("END")
time.sleep(1) # Give 0MQ/2.0.x time to flush output
订阅端:
import zmq
import time
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt(zmq.IDENTITY, "Hello")
subscriber.setsockopt(zmq.SUBSCRIBE, "")
subscriber.connect("tcp://localhost:5565")
sync = context.socket(zmq.PUSH)
sync.connect("tcp://localhost:5564")
sync.send("")
while True:
data = subscriber.recv()
print data
if data == "END":
break
订阅端得到的信息:
$ durasub
Update 0
Update 1
Update 2
^C
$ durasub
Update 3
Update 4
Update 5
Update 6
Update 7
^C
$ durasub
Update 8
Update 9
END
数据被发布者存储了,而发布者的内存占用也节节升高(很危险啊)。所以是否使用命名策略是需要谨慎选择的。为了以防万一,zeromq也提供了"高水位"机制,即当发送端持有数据达到一定数量就不再存储后面的数据,很好的控制了风险。这个机制也适当解决了
这里的慢消费问题。
使用了 高水位 后的测试结果:
$ durasub
Update 0
Update 1
^C
$ durasub
Update 2
Update 3
Update 7
Update 8
Update 9
END
"高水位"封堵了内存崩溃的可能性,却是以数据丢失为代价的,zeromq也为此配对提供了"swap"功能,将内存中的数据转存入硬盘,实现了"既不耗内存又不丢数据"。
实现代码:
import zmq
import time
context = zmq.Context()
# Subscriber tells us when it's ready here
sync = context.socket(zmq.PULL)
sync.bind("tcp://*:5564")
# We send updates via this socket
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5565")
# Prevent publisher overflow from slow subscribers
publisher.setsockopt(zmq.HWM, 1)
# Specify the swap space in bytes, this covers all subscribers
publisher.setsockopt(zmq.SWAP, 25000000)
# Wait for synchronization request
sync_request = sync.recv()
# Now broadcast exactly 10 updates with pause
for n in xrange(10):
msg = "Update %d" % n
publisher.send(msg)
time.sleep(1)
publisher.send("END")
time.sleep(1) # Give 0MQ/2.0.x time to flush output
注意点:
高水位与交换区的设定,是需要根据实际运用状态来确定的,高水位设的过小,会影响到速度。
如果是数据存储端崩溃了,那么,所有数据将彻底消失。
关于高水位的特别说明:
除了PUB型会在达到高水位丢弃后续数据外,其他类型的都会以阻塞的形式来应对后续数据。
线程间的通信,高水位是通信双方共同设置的总和,如果有一方没有设置,则高水位规则不会起到作用。
(未完待续)
分享到:
相关推荐
zeromq-2.1.7.tar.gz 的早期的一个版本,本人已安装成功 放心使用
zeromq-4.0.3.tar.gz zeromq-4.0.3.tar.gz zeromq-4.0.3.tar.gz
zeromq-4.3.2.tar.gz,可在linux下编译安装,能够使用zeromq进行sorket开发,多线程,提升性能,效率,可以配合msgpack进行使用,是个好的扩展插件
zeromq-4.1.3.tar.gz,最新的zeromq的开发工具包,希望对开发者有用
官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装
zeromq-4.1.8.tar.gz 有问题请联系
zeromq-2.1.9.tar.gz 这是zeromq linux 官方原版 请放心下载
zeromq-4.2.3.tar.gz 一个稳定的版本,可以安装使用!
0MQ version 4.3.4 stable, released on 2021/01/17
storm搭建所需资源
在官网下载zeromq太慢了,网速极不稳定,特意下载放在这里供大家下载,当然象征性地赚点 资源分
ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。
ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...
ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...
0MQ version 4.3.4 stable, released on 2021/01/17
ZeroMQ(也说明 ØMQ,0MQ 或 ZMQ)是一个高性能的异步消息库,旨在使用分布式或并行应用程序。它提供了一个消息队列,但 不同于面向消息的中间件,一个 ZeroMQ 系统可以在没有专用运行消息代理。jzmq安装包,本人也...
zeroMQ 4.2.3版本 zeromq-4.2.3.tar.gz 欢迎关注我的CSDN博客:https://mp.csdn.net/console/home 免积分下载
zeromq-4.2.5.tar.gz
VS2015 在Widows 10 上编译的 ZeroMQ 4.3.2,JZMQ 3.1 CZMQ 4.2,可以在 JDK 1.8 下运行。DLL 都是 64位,包含了编译及运行相关信息。分享一下,也给自己留个备份