- 浏览: 463173 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
kc_hxd_jp:
博主问个问题,这个篇幅下的python代码无法达到应有的作用, ...
zeroMQ初体验-14.命名机制 进阶 -
kobe1029:
Map<String, Object> args ...
rabbitmq 队列长度预设的曲线方案 -
Sasoritattoo:
LZ,这都13年了,抽空把这篇文章的下文给表完了吧,这一口气喘 ...
nginx + gridfs + mongodb 大事记(残) -
3GQQ2012:
引用前文已经说过,XREP其实用以平衡负载,所以这里由它对请求 ...
zeroMQ初体验-15.应答模式进阶(一)-数据的封装 -
iyuan:
ustclz 写道图片怎么显示不了了。。我这看是可以显示的。不 ...
zeroMQ初体验-1.简介及C/S模式
虽然说“简单的海盗模式”已经非常靠谱了,不过瑕疵还是有不少的。比如说,中间件队列并不监控后端的worker死活,至少会有一次丢包来确定那个worker已经不在了(虽然问题不大,但终究不爽)。而在“偏执的”模式中,有对“简单”模式做了一些扩展:
Queue:
worker:
注意:这里的是lua代码
其实从模式图中已经可以看出,系统中多了“心跳”环节,来确认链路的可用性。
关于心跳模块,着实比较棘手,也算是代码中的重头了。关于做“心跳”的策略,关键是要把握好时间间隔,以避免过载或者失效。通常的,也不建议在持久化的连接上加入心跳机制。
这里应当注意到,“偏执”模式与“简单”模式并不兼容--因为心跳机制。
为了避免混乱。 rfc.zeromq.org这儿有一些协议的声明,帮助你至少不需要去看现有的代码来确定是否兼容新的东东~
(未完待续)
Queue:
require"zmq" require"zmq.poller" require"zmsg" local MAX_WORKERS = 100 local HEARTBEAT_LIVENESS = 3 -- 3-5 is reasonable local HEARTBEAT_INTERVAL = 1000 -- msecs local tremove = table.remove -- Insert worker at end of queue, reset expiry -- Worker must not already be in queue local function s_worker_append(queue, identity) if queue[identity] then printf ("E: duplicate worker identity %s", identity) else assert (#queue < MAX_WORKERS) queue[identity] = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS queue[#queue + 1] = identity end end -- Remove worker from queue, if present local function s_worker_delete(queue, identity) for i=1,#queue do if queue == identity then tremove(queue, i) break end end queue[identity] = nil end -- Reset worker expiry, worker must be present local function s_worker_refresh(queue, identity) if queue[identity] then queue[identity] = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS else printf("E: worker %s not ready\n", identity) end end -- Pop next available worker off queue, return identity local function s_worker_dequeue(queue) assert (#queue > 0) local identity = tremove(queue, 1) queue[identity] = nil return identity end -- Look for & kill expired workers local function s_queue_purge(queue) local curr_clock = s_clock() -- Work backwards from end to simplify removal for i=#queue,1,-1 do local id = queue if (curr_clock > queue[id]) then tremove(queue, i) queue[id] = nil end end end s_version_assert (2, 1) -- Prepare our context and sockets local context = zmq.init(1) local frontend = context:socket(zmq.XREP) local backend = context:socket(zmq.XREP) frontend:bind("tcp://*:5555"); -- For clients backend:bind("tcp://*:5556"); -- For workers -- Queue of available workers local queue = {} local is_accepting = false -- Send out heartbeats at regular intervals local heartbeat_at = s_clock() + HEARTBEAT_INTERVAL local poller = zmq.poller(2) local function frontend_cb() -- Now get next client request, route to next worker local msg = zmsg.recv(frontend) local identity = s_worker_dequeue (queue) msg:push(identity) msg:send(backend) if (#queue == 0) then -- stop accepting work from clients, when no workers are available. poller:remove(frontend) is_accepting = false end end -- Handle worker activity on backend poller:add(backend, zmq.POLLIN, function() local msg = zmsg.recv(backend) local identity = msg:unwrap() -- Return reply to client if it's not a control message if (msg:parts() == 1) then if (msg:address() == "READY") then s_worker_delete(queue, identity) s_worker_append(queue, identity) elseif (msg:address() == "HEARTBEAT") then s_worker_refresh(queue, identity) else printf("E: invalid message from %s\n", identity) msg:dump() end else -- reply for client. msg:send(frontend) s_worker_append(queue, identity) end -- start accepting client requests, if we are not already doing so. if not is_accepting and #queue > 0 then is_accepting = true poller:add(frontend, zmq.POLLIN, frontend_cb) end end) -- start poller's event loop while true do local cnt = assert(poller:poll(HEARTBEAT_INTERVAL * 1000)) -- Send heartbeats to idle workers if it's time if (s_clock() > heartbeat_at) then for i=1,#queue do local msg = zmsg.new("HEARTBEAT") msg:wrap(queue, nil) msg:send(backend) end heartbeat_at = s_clock() + HEARTBEAT_INTERVAL end s_queue_purge(queue) end -- We never exit the main loop -- But pretend to do the right shutdown anyhow while (#queue > [[span style="color:#666666"]]0) [[span style="color:#008000"]]do s_worker_dequeue(queue) [[span style="color:#008000"]]end frontend:close() backend:close()
worker:
require"zmq" require"zmq.poller" require"zmsg" local HEARTBEAT_LIVENESS = 3 -- 3-5 is reasonable local HEARTBEAT_INTERVAL = 1000 -- msecs local INTERVAL_INIT = 1000 -- Initial reconnect local INTERVAL_MAX = 32000 -- After exponential backoff -- Helper function that returns a new configured socket -- connected to the Hello World server -- local identity local function s_worker_socket (context) local worker = context:socket(zmq.XREQ) -- Set random identity to make tracing easier identity = string.format("%04X-%04X", randof (0x10000), randof (0x10000)) worker:setopt(zmq.IDENTITY, identity) worker:connect("tcp://localhost:5556") -- Configure socket to not wait at close time worker:setopt(zmq.LINGER, 0) -- Tell queue we're ready for work printf("I: (%s) worker ready\n", identity) worker:send("READY") return worker end s_version_assert (2, 1) math.randomseed(os.time()) local context = zmq.init(1) local worker = s_worker_socket (context) -- If liveness hits zero, queue is considered disconnected local liveness = HEARTBEAT_LIVENESS local interval = INTERVAL_INIT -- Send out heartbeats at regular intervals local heartbeat_at = s_clock () + HEARTBEAT_INTERVAL local poller = zmq.poller(1) local is_running = true local cycles = 0 local function worker_cb() -- Get message -- - 3-part envelope + content -> request -- - 1-part "HEARTBEAT" -> heartbeat local msg = zmsg.recv (worker) if (msg:parts() == 3) then -- Simulate various problems, after a few cycles cycles = cycles + 1 if (cycles > 3 and randof (5) == 0) then printf ("I: (%s) simulating a crash\n", identity) is_running = false return elseif (cycles > 3 and randof (5) == 0) then printf ("I: (%s) simulating CPU overload\n", identity) s_sleep (5000) end printf ("I: (%s) normal reply - %s\n", identity, msg:body()) msg:send(worker) liveness = HEARTBEAT_LIVENESS s_sleep(1000); -- Do some heavy work elseif (msg:parts() == 1 and msg:body() == "HEARTBEAT") then liveness = HEARTBEAT_LIVENESS else printf ("E: (%s) invalid message\n", identity) msg:dump() end interval = INTERVAL_INIT end poller:add(worker, zmq.POLLIN, worker_cb) while is_running do local cnt = assert(poller:poll(HEARTBEAT_INTERVAL * 1000)) if (cnt == 0) then liveness = liveness - 1 if (liveness == 0) then printf ("W: (%s) heartbeat failure, can't reach queue\n", identity) printf ("W: (%s) reconnecting in %d msec…\n", identity, interval) s_sleep (interval) if (interval < INTERVAL_MAX) then interval = interval * 2 end poller:remove(worker) worker:close() worker = s_worker_socket (context) poller:add(worker, zmq.POLLIN, worker_cb) liveness = HEARTBEAT_LIVENESS end end -- Send heartbeat to queue if it's time if (s_clock () > heartbeat_at) then heartbeat_at = s_clock () + HEARTBEAT_INTERVAL printf("I: (%s) worker heartbeat\n", identity) worker:send("HEARTBEAT") end end worker:close() context:term()
注意:这里的是lua代码
其实从模式图中已经可以看出,系统中多了“心跳”环节,来确认链路的可用性。
关于心跳模块,着实比较棘手,也算是代码中的重头了。关于做“心跳”的策略,关键是要把握好时间间隔,以避免过载或者失效。通常的,也不建议在持久化的连接上加入心跳机制。
这里应当注意到,“偏执”模式与“简单”模式并不兼容--因为心跳机制。
为了避免混乱。 rfc.zeromq.org这儿有一些协议的声明,帮助你至少不需要去看现有的代码来确定是否兼容新的东东~
(未完待续)
发表评论
-
IM选型(初)
2016-08-23 19:12 1591主要参考文章: https://r ... -
关于python和rabbitmq的那点事儿
2011-10-19 14:15 7911rabbitmq是一个消息中间件,在之前的zmq介绍中有略带提 ... -
zeroMQ初体验-34.发布/订阅模式进阶-克隆模式-下,结言
2011-05-26 16:09 4073服务器: // // Clone server Mod ... -
zeroMQ初体验-33.发布/订阅模式进阶-克隆模式-中
2011-05-26 15:37 2856临时缓存 现实中,比如 ... -
zeroMQ初体验-32.发布/订阅模式进阶-克隆模式-上
2011-05-26 15:04 3600在发布/订阅模式中,特别是现实应用中,总会因为这样那样的问题导 ... -
zeroMQ初体验-31.发布/订阅模式进阶-黑盒的高速订阅者
2011-05-25 16:55 2707作为发布/订阅模式的一个常用场景,大数据量的组播是有必要的。虽 ... -
zeroMQ初体验-30.发布/订阅模式进阶-自裁的蜗牛订阅者
2011-05-25 16:24 4484在初次介绍发布/订阅模式的时候,就已经抖出了这个包袱:如果订阅 ... -
zeroMQ初体验-29.可靠性-自由模式
2011-05-24 17:02 5352好吧,本以为这可能是一个更靠谱的模式,谁知(其实是我一厢情愿了 ... -
zeroMQ初体验-28.可靠性-主从模式
2011-05-23 14:47 5455虽然"硬盘模式" ... -
zeroMQ初体验-27.可靠性-硬盘模式
2011-05-23 13:44 3745在之前的种种模式中, ... -
zeroMQ初体验-26.可靠性-管家模式
2011-05-12 19:05 5607上一节末尾有说到协议,zeromq自然做了充沛的封装,&quo ... -
zeroMQ初体验-26.可靠性-管家模式
2011-05-12 19:03 1上一节末尾有说到协议,zeromq自然做了充沛的封装,&quo ... -
zeroMQ初体验-24.可靠性-简单的海盗模式
2011-05-05 16:41 3160相较于“懒惰的”做了 ... -
zeroMQ初体验-23.可靠性-懒惰的海盗模式
2011-05-05 16:15 5014相较于通常的阻塞模式,这里只是做了一点简单的动作来加强系统的可 ... -
zeroMQ初体验-22.可靠性-总览
2011-04-26 19:25 5865在开篇就从曾对zeromq的可靠性做过质疑,不过,作为一个雄心 ... -
rabbitmq 队列长度预设的曲线方案
2011-04-21 14:36 3311zeromq中倒是直接支持这个功能的。 类似于设定队列长度或 ... -
zeroMQ初体验-21.应答模式进阶(七)-云计算
2011-04-18 19:14 3476这里给出了一个最近很火的"云计算"案例。 ... -
zeroMQ初体验-20.应答模式进阶(六)-多对多路由模式
2011-04-18 17:22 3808某些时候,为了冗余的需要,可能会有这样的需求: impo ... -
zeroMQ初体验-19.应答模式进阶(五)-异步式应答
2011-04-15 15:23 4778恩,这应该算是比较实 ... -
zeroMQ初体验-18.应答模式进阶(四)-定制路由3
2011-04-02 15:39 5125从经典到超越经典。 首 ...
相关推荐
zeromq-2.1.7.tar.gz 的早期的一个版本,本人已安装成功 放心使用
zeromq-4.0.3.tar.gz zeromq-4.0.3.tar.gz zeromq-4.0.3.tar.gz
zeromq-4.3.2.tar.gz,可在linux下编译安装,能够使用zeromq进行sorket开发,多线程,提升性能,效率,可以配合msgpack进行使用,是个好的扩展插件
zeromq-4.1.3.tar.gz,最新的zeromq的开发工具包,希望对开发者有用
zeromq-4.1.8.tar.gz 有问题请联系
官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装
zeromq-2.1.9.tar.gz 这是zeromq linux 官方原版 请放心下载
zeromq-4.2.3.tar.gz 一个稳定的版本,可以安装使用!
0MQ version 4.3.4 stable, released on 2021/01/17
storm搭建所需资源
在官网下载zeromq太慢了,网速极不稳定,特意下载放在这里供大家下载,当然象征性地赚点 资源分
ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...
ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...
ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。
0MQ version 4.3.4 stable, released on 2021/01/17
zeroMQ 4.2.3版本 zeromq-4.2.3.tar.gz 欢迎关注我的CSDN博客:https://mp.csdn.net/console/home 免积分下载
ZeroMQ(也说明 ØMQ,0MQ 或 ZMQ)是一个高性能的异步消息库,旨在使用分布式或并行应用程序。它提供了一个消息队列,但 不同于面向消息的中间件,一个 ZeroMQ 系统可以在没有专用运行消息代理。jzmq安装包,本人也...
zeromq-4.2.5.tar.gz
VS2015 在Widows 10 上编译的 ZeroMQ 4.3.2,JZMQ 3.1 CZMQ 4.2,可以在 JDK 1.8 下运行。DLL 都是 64位,包含了编译及运行相关信息。分享一下,也给自己留个备份