`
iyuan
  • 浏览: 463004 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

zeroMQ初体验-19.应答模式进阶(五)-异步式应答

    博客分类:
  • MQ
阅读更多
恩,这应该算是比较实用的部分了。

模式图:


import zmq
import threading
import time
from random import choice

class ClientTask(threading.Thread):
    """ClientTask"""
    def init(self):
        threading.Thread.init (self)

    def run(self):
        context = zmq.Context()
        socket = context.socket(zmq.XREQ)
        identity = 'worker-%d' % (choice([0,1,2,3,4,5,6,7,8,9]))
        socket.setsockopt(zmq.IDENTITY, identity )
        socket.connect('tcp://localhost:5570')
        print 'Client %s started' % (identity)
        poll = zmq.Poller()
        poll.register(socket, zmq.POLLIN)
        reqs = 0
        while True:
            for i in xrange(5):
                sockets = dict(poll.poll(1000))
                if socket in sockets:
                    if sockets[socket] == zmq.POLLIN:
                        msg = socket.recv()
                        print '%s: %s\n' % (identity, msg)
                        del msg
            reqs = reqs + 1
            print 'Req #%d sent..' % (reqs)
            socket.send('request #%d' % (reqs))

        socket.close()
        context.term()

class ServerTask(threading.Thread):
    """ServerTask"""
    def init(self):
        threading.Thread.init (self)

    def run(self):
        context = zmq.Context()
        frontend = context.socket(zmq.XREP)
        frontend.bind('tcp://*:5570')

        backend = context.socket(zmq.XREQ)
        backend.bind('inproc://backend')

        workers = []
        for i in xrange(5):
            worker = ServerWorker(context)
            worker.start()
            workers.append(worker)

        poll = zmq.Poller()
        poll.register(frontend, zmq.POLLIN)
        poll.register(backend,  zmq.POLLIN)

        while True:
            sockets = dict(poll.poll())
            if frontend in sockets:
                if sockets[frontend] == zmq.POLLIN:
                    msg = frontend.recv()
                    print 'Server received %s' % (msg)
                    backend.send(msg)
            if backend in sockets:
                if sockets[backend] == zmq.POLLIN:
                    msg = backend.recv()
                    frontend.send(msg)

        frontend.close()
        backend.close()
        context.term()
 
class ServerWorker(threading.Thread):
    """ServerWorker"""
    def init(self, context):
        threading.Thread.init (self)
        self.context = context

    def run(self):
        worker = self.context.socket(zmq.XREQ)
        worker.connect('inproc://backend')
        print 'Worker started'
        while True:
            msg = worker.recv()
            print 'Worker received %s' % (msg)
            replies = choice(xrange(5))
            for i in xrange(replies):
                time.sleep(1/choice(range(1,10)))
                worker.send(msg)
            del msg

        worker.close()

def main():
    """main function"""
    server = ServerTask()
    server.start()
    for i in xrange(3):
        client = ClientTask()
        client.start()
    
    server.join()
    

if name == "main":
    main()


作为一个异步的服务器,详图应该是这样的:


这里的数据传递顺序是这样的:
 client        server     frontend     worker
    [ XREQ ]<---->[ XREP <----> XREQ <----> XREQ ]
            1 part       2 parts     2 parts


在这里有可能碰到一个比较经典的c/s问题:
c端太多,耗尽s端资源怎么办?
这就需要靠谱些的机制了,比如通过“心跳”来确定是否应该释放这个c端的资源等。当然,那就是另外一个话题了。

(未完待续)
1
0
分享到:
评论
2 楼 iyuan 2012-02-06  
guozhiwei 写道
这个例子应该就是 官方提供的包里面的example包里面的Python中的asyncsrv.py

你这个例子有些地方跟它不一样,

你的例子 跑不起来 

而官方的例子能跑起来...

你的例子在send 和 recv的时没有identity,加上以后就能正常跑了.




就是直接拿的官网的例子。原来是声明了setsockopt就可以用的。也可能是zmq版本更新了导致的吧。zmq这块目前还没什么机会用到,有时间还得再捋下,初体验还是太浅了,哈哈
1 楼 guozhiwei 2012-02-06  
这个例子应该就是 官方提供的包里面的example包里面的Python中的asyncsrv.py

你这个例子有些地方跟它不一样,

你的例子 跑不起来 

而官方的例子能跑起来...

你的例子在send 和 recv的时没有identity,加上以后就能正常跑了.



相关推荐

    zeromq-2.1.7.tar.gz

    zeromq-2.1.7.tar.gz 的早期的一个版本,本人已安装成功 放心使用

    zeromq-4.0.3.tar.gz.zip

    zeromq-4.0.3.tar.gz zeromq-4.0.3.tar.gz zeromq-4.0.3.tar.gz

    zeromq-4.3.2.tar.gz

    zeromq-4.3.2.tar.gz,可在linux下编译安装,能够使用zeromq进行sorket开发,多线程,提升性能,效率,可以配合msgpack进行使用,是个好的扩展插件

    zeromq-4.1.3.tar.gz

    zeromq-4.1.3.tar.gz,最新的zeromq的开发工具包,希望对开发者有用

    zeromq-4.0.5-4.el7.x86_64.rpm

    官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装

    zeromq-4.1.8.tar.gz

    zeromq-4.1.8.tar.gz 有问题请联系

    zeromq-2.1.9.tar.gz

    zeromq-2.1.9.tar.gz 这是zeromq linux 官方原版 请放心下载

    zeromq-4.2.3.tar.gz

    zeromq-4.2.3.tar.gz 一个稳定的版本,可以安装使用!

    zeromq-4.3.4.zip

    0MQ version 4.3.4 stable, released on 2021/01/17

    zeromq-3.2.5.tar.gz、jzmq.tar.gz、Python-2.6.6.tar.bz2、storm-0.8.0.zip下载

    storm搭建所需资源

    zeromq-4.2.0.tar.gz源码包

    在官网下载zeromq太慢了,网速极不稳定,特意下载放在这里供大家下载,当然象征性地赚点 资源分

    zeromq-4.1.2.tar.gz

    ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。

    zeromq-3.2.5.tar.gz

    ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...

    zeromq-4.1.4.tar.gz

    ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...

    zeromq-4.3.4.tar.gz

    0MQ version 4.3.4 stable, released on 2021/01/17

    zeromq-4.2.1.tar.gz (包括安装包和zeromq\jzmq教程)

    ZeroMQ(也说明 ØMQ,0MQ 或 ZMQ)是一个高性能的异步消息库,旨在使用分布式或并行应用程序。它提供了一个消息队列,但 不同于面向消息的中间件,一个 ZeroMQ 系统可以在没有专用运行消息代理。jzmq安装包,本人也...

    zeromq-4.2.3.zip

    zeroMQ 4.2.3版本 zeromq-4.2.3.tar.gz 欢迎关注我的CSDN博客:https://mp.csdn.net/console/home 免积分下载

    zeromq-4.2.5.tar.gz

    zeromq-4.2.5.tar.gz

    Win64-ZeroMQ-JZMQ-CZMQ.zip

    VS2015 在Widows 10 上编译的 ZeroMQ 4.3.2,JZMQ 3.1 CZMQ 4.2,可以在 JDK 1.8 下运行。DLL 都是 64位,包含了编译及运行相关信息。分享一下,也给自己留个备份

Global site tag (gtag.js) - Google Analytics