`
iyuan
  • 浏览: 462994 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

zeroMQ初体验-2.发布订阅模式(pub/sub)

    博客分类:
  • MQ
阅读更多
pub/sub模式:



发布端(pub)
import itertools
import sys  
import time 
            
import zmq  
            
def main(): 
    if len (sys.argv) != 2:
        print 'usage: publisher <bind-to>'
        sys.exit (1)
    
    bind_to = sys.argv[1]
    
    all_topics = ['sports.general','sports.football','sports.basketball',
                  'stocks.general','stocks.GOOG','stocks.AAPL',
                  'weather']
    
    ctx = zmq.Context()
    s = ctx.socket(zmq.PUB)
    s.bind(bind_to)

    print "Starting broadcast on topics:"
    print "   %s" % all_topics
    print "Hit Ctrl-C to stop broadcasting."
    print "Waiting so subscriber sockets can connect..."
    print
    time.sleep(1.0)
    
    msg_counter = itertools.count()
    try:
        for topic in itertools.cycle(all_topics):
            msg_body = str(msg_counter.next())
            print '   Topic: %s, msg:%s' % (topic, msg_body)
            #s.send_multipart([topic, msg_body])
            s.send_pyobj([topic, msg_body])
            # short wait so we don't hog the cpu
            time.sleep(0.1)
    except KeyboardInterrupt:
        pass

    print "Waiting for message queues to flush..."
    time.sleep(0.5)
    s.close()
    print "Done."

if __name__ == "__main__":
    main()


订阅端(sub):
import sys
import time
import zmq

def main():
    if len (sys.argv) < 2:
        print 'usage: subscriber <connect_to> [topic topic ...]'
        sys.exit (1)

    connect_to = sys.argv[1]
    topics = sys.argv[2:]

    ctx = zmq.Context()
    s = ctx.socket(zmq.SUB)
    s.connect(connect_to)

    # manage subscriptions
    if not topics:
        print "Receiving messages on ALL topics..."
        s.setsockopt(zmq.SUBSCRIBE,'')
    else:
        print "Receiving messages on topics: %s ..." % topics
        for t in topics:
            s.setsockopt(zmq.SUBSCRIBE,t)
    print
    try:
        while True:
            #topic, msg = s.recv_multipart()
            topic, msg = s.recv_pyobj()
            print '   Topic: %s, msg:%s' % (topic, msg)
    except KeyboardInterrupt:
        pass
    print "Done."

if __name__ == "__main__":
    main()


注意:
这里的发布与订阅角色是绝对的,即发布者无法使用recv,订阅者不能使用send,并且订阅者需要设置订阅条件"setsockopt"。
按照官网的说法,在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,(还没有看到那,在后续中发现的话会更新这里)。
官网还提供了一种可能出现的问题:当订阅者消费慢于发布,此时就会出现数据的堆积,而且还是在发布端的堆积(有朋友指出是堆积在消费端,或许是新版本改进,需要读者的尝试和反馈,thx!),显然,这是不可以被接受的。至于解决方案,或许后面的"分而治之"就是吧。

(未完待续)
2
0
分享到:
评论
6 楼 frankwangzy1103 2012-07-04  
按照官网的说法,在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,(还没有看到那,在后续中发现的话会更新这里)。
---------------------------------------
这个我还真遇到了,局域网,多个sub的时候,如果pub不sleep,会全部被取到一台机器上去,之前折腾了半天了,后来sleep了一下好了~~
5 楼 frankwangzy1103 2012-07-04  
按照官网的说法,在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,(还没有看到那,在后续中发现的话会更新这里)。
----------------------------------
4 楼 frankwangzy1103 2012-07-04  
按照官网的说法,在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,(还没有看到那,在后续中发现的话会更新这里)。
3 楼 iyuan 2011-12-07  
guozhiwei 写道
guozhiwei 写道
"当订阅者消费慢于发布,此时就会出现数据的堆积,而且还是在发布端的堆积,"

是在发布端堆积吗?

我觉得应该是在订阅端堆积的.




我刚才试验了 是在订阅端堆积的..


现在zmq好像版本很高了,文档已经陈旧了。谢谢你的反馈,相关内容已经做出修正~
2 楼 guozhiwei 2011-12-06  
guozhiwei 写道
"当订阅者消费慢于发布,此时就会出现数据的堆积,而且还是在发布端的堆积,"

是在发布端堆积吗?

我觉得应该是在订阅端堆积的.




我刚才试验了 是在订阅端堆积的..
1 楼 guozhiwei 2011-12-06  
"当订阅者消费慢于发布,此时就会出现数据的堆积,而且还是在发布端的堆积,"

是在发布端堆积吗?

我觉得应该是在订阅端堆积的.

相关推荐

    zeromq的pub-sub订阅模式的jave实现

    zeromq的pub-sub订阅模式的jave实现,Eclipse下的maven工程,相关引用已在pom文件引入,可以直接运行。

    zeps:ZeroMQ 企业发布-订阅 (ZEPS) **已弃用**

    ZeroMQ 企业发布订阅 - ZEPS 已弃用 该项目已被 zeromq/Malamute 弃用。 概述 ZEPS 是一种基于代理的发布和订阅设计,提供高速低延迟发布订阅和持久日志发布订阅,根据客户端的能力自动将客户端从低速档切换到高速档...

    pub_sub.rar

    基于zmqpp实现的zeromq的publish-subscribe模式,实现了一发多收的功能,同时sub端可以订阅自己指定的topic(序中支持了三种topic的订阅)

    zmqpubsub:在zeromq之上实现友好的pubsub实现

    它抽象了底层的ZeroMQ机械,以为发布-订阅消息传递模式提供Go友好的API。 安装 go get -tags zmq_3_x github.com/hpcloud/zmqpubsub 用法 经纪人 首先设置一个经纪人: var Broker zmqpubsub. Broker func init ()...

    pyzmq-17.1.2-cp36-cp36m-win32

    2. 订阅发布模式 (sub 和 pub) 消息单向的,有去无回的。可按照发布端可发布制定主题的消息,订阅端可订阅喜欢的主题,订阅端只会收到自己已经订阅的主题。发布端发布一条消息,可被多个订阅端同事收到。 3. ...

    pubsub:[存档] 实习生项目 - 使用 ZeroMQ 在 MongoDB v2.6.3 中发布订阅 - 此存储库不是受支持的 MongoDB 产品

    MongoDB + 发布/订阅 欢迎来到 ! 这是使用在 MongoDB v2.6.3 中发布/订阅的实现。... 在 MongoDB 中使用 pub/sub 有很多好处: 数据库和发布/订阅都有很多用例,将两者结合可以减少应用程序堆栈中的组件数量。

    cs6381-assignment2

    分布式系统原理-作业2ZooKeeper在发布/订阅模型中的领导者选举理查德·怀特(Max Richardy) ZeroMQ(ZMQ)中间件支持的PUB / SUB模型。 应用程序维护一个代理,该代理对于发布者和订阅者都是众所周知的。 该经纪人...

    brov-bus:一个模块,该模块使用pub-sub模式向BlueROV提供异步,匿名总线

    ## Description ## Node.js模块,提供匿名的“发布者-订阅者”通信总线。 通过总线的通信可以通过“注册”侦听器,提供回调和“获取”发布者对象来执行。 ## Installation ##要直接从github安装最新版本: npm ...

    tenyks:Tenyks IRC机器人

    _ _ | | | | | |_ ___ _ __ _ _| | _____ ... 当消息从IRC传入时,该消息将转换为json数据结构,然后通过服务可以订阅的Pub / Sub通道上的管道发送。 然后,服务解析或模式匹配消息,并可能通过

    zerogw:一个快速的 HTTPWebSocket 到 zeromq 网关(未维护,请查看 swindon Web 服务器)

    Websockets 是通过使用 ZMQ_PUB 套接字转发传入消息和从 ZMQ_SUB 套接字监听命令来实现的。 每个 WebSocket 客户端可以订阅无限数量的主题。 每个 zeromq 消息要么控制消息(例如订阅),要么控制消息到指定主题,...

    zper:ZeroMQ持久性代理

    在PUB套接字上收听以进行公开连接的SUB将获得最后成功同步的偏移和消息的第一帧因此,请尝试将第一帧用作消息ID 通过订阅,应用程序可以获得更细粒度的偏移量在Inproc PULL套接字上侦听来自ZPWriter的ack输入保持...

    OpenMAMA-docker:适用于OpenMAMA的Docker基本映像

    OpenMAMA订阅者映像( ) 启用OpenMAMA ZeroMQ的映像( ) 启用了OpenMAMA Solace的映像(仅出于合法原因,您需要自行构建镜像)(仅Dockerfile和脚本) Sub / pub OpenMAMA映像仅在openmama配置和单独的docker网络...

    socket-benchmark:NodeJS PubSub 套接字基准测试 TCP、TLS、HTTP、HTTPS

    部分我创建了两个程序,一个pub和一个sub ,它们将计算每秒可以发送的消息数。消息框架当使用像 TCP 和 TLS 这样的“流式”套接字传输时,消息需要成帧。 可以使用诸如换行符 '\n' 之类的简单方法来实现消息框架,...

Global site tag (gtag.js) - Google Analytics