`
iyuan
  • 浏览: 463173 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

zeroMQ初体验-25.可靠性-偏执的海盗模式

    博客分类:
  • MQ
阅读更多
虽然说“简单的海盗模式”已经非常靠谱了,不过瑕疵还是有不少的。比如说,中间件队列并不监控后端的worker死活,至少会有一次丢包来确定那个worker已经不在了(虽然问题不大,但终究不爽)。而在“偏执的”模式中,有对“简单”模式做了一些扩展:


Queue:
require"zmq"
require"zmq.poller"
require"zmsg"

local MAX_WORKERS          = 100
local HEARTBEAT_LIVENESS   = 3       --  3-5 is reasonable
local HEARTBEAT_INTERVAL   = 1000    --  msecs

local tremove = table.remove

--  Insert worker at end of queue, reset expiry
--  Worker must not already be in queue
local function s_worker_append(queue, identity)
    if queue[identity] then
        printf ("E: duplicate worker identity %s", identity)
    else
        assert (#queue < MAX_WORKERS)
        queue[identity] = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
        queue[#queue + 1] = identity
    end
end
--  Remove worker from queue, if present
local function s_worker_delete(queue, identity)
    for i=1,#queue do
        if queue == identity then
            tremove(queue, i)
            break
        end
    end
    queue[identity] = nil
end
--  Reset worker expiry, worker must be present
local function s_worker_refresh(queue, identity)
    if queue[identity] then
        queue[identity] = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
    else
        printf("E: worker %s not ready\n", identity)
    end
end
--  Pop next available worker off queue, return identity
local function s_worker_dequeue(queue)
    assert (#queue > 0)
    local identity = tremove(queue, 1)
    queue[identity] = nil
    return identity
end
--  Look for & kill expired workers
local function s_queue_purge(queue)
    local curr_clock = s_clock()
    --  Work backwards from end to simplify removal
    for i=#queue,1,-1 do
        local id = queue
        if (curr_clock > queue[id]) then
            tremove(queue, i)
            queue[id] = nil
        end
    end
end
s_version_assert (2, 1)

--  Prepare our context and sockets
local context = zmq.init(1)
local frontend = context:socket(zmq.XREP)
local backend  = context:socket(zmq.XREP)
frontend:bind("tcp://*:5555");    --  For clients
backend:bind("tcp://*:5556");    --  For workers

--  Queue of available workers
local queue = {}
local is_accepting = false

--  Send out heartbeats at regular intervals
local heartbeat_at = s_clock() + HEARTBEAT_INTERVAL

local poller = zmq.poller(2)

local function frontend_cb()
    --  Now get next client request, route to next worker
    local msg = zmsg.recv(frontend)
    local identity = s_worker_dequeue (queue)
    msg:push(identity)
    msg:send(backend)

    if (#queue == 0) then
        -- stop accepting work from clients, when no workers are available.
        poller:remove(frontend)
        is_accepting = false
    end
end

--  Handle worker activity on backend
poller:add(backend, zmq.POLLIN, function()
    local msg = zmsg.recv(backend)
    local identity = msg:unwrap()

    --  Return reply to client if it's not a control message
    if (msg:parts() == 1) then
        if (msg:address() == "READY") then
            s_worker_delete(queue, identity)
            s_worker_append(queue, identity)
        elseif (msg:address() == "HEARTBEAT") then
            s_worker_refresh(queue, identity)
        else
            printf("E: invalid message from %s\n", identity)
            msg:dump()
        end
    else
        -- reply for client.
        msg:send(frontend)
        s_worker_append(queue, identity)
    end

    -- start accepting client requests, if we are not already doing so.
    if not is_accepting and #queue > 0 then
        is_accepting = true
        poller:add(frontend, zmq.POLLIN, frontend_cb)
    end
end)

-- start poller's event loop
while true do
    local cnt = assert(poller:poll(HEARTBEAT_INTERVAL * 1000))
    --  Send heartbeats to idle workers if it's time
    if (s_clock() > heartbeat_at) then
        for i=1,#queue do
            local msg = zmsg.new("HEARTBEAT")
            msg:wrap(queue, nil)
            msg:send(backend)
        end
        heartbeat_at = s_clock() + HEARTBEAT_INTERVAL
    end
    s_queue_purge(queue)
end

--  We never exit the main loop
--  But pretend to do the right shutdown anyhow
while (#queue > [[span style="color:#666666"]]0) [[span style="color:#008000"]]do
    s_worker_dequeue(queue)
[[span style="color:#008000"]]end

frontend:close()
backend:close()

worker:
require"zmq"
require"zmq.poller"
require"zmsg"

local HEARTBEAT_LIVENESS   = 3       --  3-5 is reasonable
local HEARTBEAT_INTERVAL   = 1000    --  msecs
local INTERVAL_INIT        = 1000    --  Initial reconnect
local INTERVAL_MAX        = 32000    --  After exponential backoff

--  Helper function that returns a new configured socket
--  connected to the Hello World server
--
local identity

local function s_worker_socket (context)
    local worker = context:socket(zmq.XREQ)

    --  Set random identity to make tracing easier
    identity = string.format("%04X-%04X", randof (0x10000), randof (0x10000))
    worker:setopt(zmq.IDENTITY, identity)
    worker:connect("tcp://localhost:5556")

    --  Configure socket to not wait at close time
    worker:setopt(zmq.LINGER, 0)

    --  Tell queue we're ready for work
    printf("I: (%s) worker ready\n", identity)
    worker:send("READY")

    return worker
end

s_version_assert (2, 1)
math.randomseed(os.time())

local context = zmq.init(1)
local worker = s_worker_socket (context)

--  If liveness hits zero, queue is considered disconnected
local liveness = HEARTBEAT_LIVENESS
local interval = INTERVAL_INIT

--  Send out heartbeats at regular intervals
local heartbeat_at = s_clock () + HEARTBEAT_INTERVAL

local poller = zmq.poller(1)

local is_running = true

local cycles = 0
local function worker_cb()
    --  Get message
    --  - 3-part envelope + content -> request
    --  - 1-part "HEARTBEAT" -> heartbeat
    local msg = zmsg.recv (worker)

    if (msg:parts() == 3) then
        --  Simulate various problems, after a few cycles
        cycles = cycles + 1
        if (cycles > 3 and randof (5) == 0) then
            printf ("I: (%s) simulating a crash\n", identity)
            is_running = false
            return
        elseif (cycles > 3 and randof (5) == 0) then
            printf ("I: (%s) simulating CPU overload\n",
                identity)
            s_sleep (5000)
        end
        printf ("I: (%s) normal reply - %s\n",
            identity, msg:body())
        msg:send(worker)
        liveness = HEARTBEAT_LIVENESS
        s_sleep(1000);           --  Do some heavy work
    elseif (msg:parts() == 1 and msg:body() == "HEARTBEAT") then
        liveness = HEARTBEAT_LIVENESS
    else
        printf ("E: (%s) invalid message\n", identity)
        msg:dump()
    end
    interval = INTERVAL_INIT
end
poller:add(worker, zmq.POLLIN, worker_cb)

while is_running do
    local cnt = assert(poller:poll(HEARTBEAT_INTERVAL * 1000))

    if (cnt == 0) then
        liveness = liveness - 1
        if (liveness == 0) then
            printf ("W: (%s) heartbeat failure, can't reach queue\n",
                identity)
            printf ("W: (%s) reconnecting in %d msec…\n",
                identity, interval)
            s_sleep (interval)
    
            if (interval < INTERVAL_MAX) then
                interval = interval * 2
            end
            poller:remove(worker)
            worker:close()
            worker = s_worker_socket (context)
            poller:add(worker, zmq.POLLIN, worker_cb)
            liveness = HEARTBEAT_LIVENESS
        end
    end
    --  Send heartbeat to queue if it's time
    if (s_clock () > heartbeat_at) then
        heartbeat_at = s_clock () + HEARTBEAT_INTERVAL
        printf("I: (%s) worker heartbeat\n", identity)
        worker:send("HEARTBEAT")
    end
end
worker:close()
context:term()

注意:这里的是lua代码

其实从模式图中已经可以看出,系统中多了“心跳”环节,来确认链路的可用性。

关于心跳模块,着实比较棘手,也算是代码中的重头了。关于做“心跳”的策略,关键是要把握好时间间隔,以避免过载或者失效。通常的,也不建议在持久化的连接上加入心跳机制。

这里应当注意到,“偏执”模式与“简单”模式并不兼容--因为心跳机制。
为了避免混乱。 rfc.zeromq.org这儿有一些协议的声明,帮助你至少不需要去看现有的代码来确定是否兼容新的东东~

(未完待续)
分享到:
评论

相关推荐

    zeromq-2.1.7.tar.gz

    zeromq-2.1.7.tar.gz 的早期的一个版本,本人已安装成功 放心使用

    zeromq-4.0.3.tar.gz.zip

    zeromq-4.0.3.tar.gz zeromq-4.0.3.tar.gz zeromq-4.0.3.tar.gz

    zeromq-4.3.2.tar.gz

    zeromq-4.3.2.tar.gz,可在linux下编译安装,能够使用zeromq进行sorket开发,多线程,提升性能,效率,可以配合msgpack进行使用,是个好的扩展插件

    zeromq-4.1.3.tar.gz

    zeromq-4.1.3.tar.gz,最新的zeromq的开发工具包,希望对开发者有用

    zeromq-4.1.8.tar.gz

    zeromq-4.1.8.tar.gz 有问题请联系

    zeromq-4.0.5-4.el7.x86_64.rpm

    官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装

    zeromq-2.1.9.tar.gz

    zeromq-2.1.9.tar.gz 这是zeromq linux 官方原版 请放心下载

    zeromq-4.2.3.tar.gz

    zeromq-4.2.3.tar.gz 一个稳定的版本,可以安装使用!

    zeromq-4.3.4.zip

    0MQ version 4.3.4 stable, released on 2021/01/17

    zeromq-3.2.5.tar.gz、jzmq.tar.gz、Python-2.6.6.tar.bz2、storm-0.8.0.zip下载

    storm搭建所需资源

    zeromq-4.2.0.tar.gz源码包

    在官网下载zeromq太慢了,网速极不稳定,特意下载放在这里供大家下载,当然象征性地赚点 资源分

    zeromq-3.2.5.tar.gz

    ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...

    zeromq-4.1.4.tar.gz

    ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...

    zeromq-4.1.2.tar.gz

    ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。

    zeromq-4.3.4.tar.gz

    0MQ version 4.3.4 stable, released on 2021/01/17

    zeromq-4.2.3.zip

    zeroMQ 4.2.3版本 zeromq-4.2.3.tar.gz 欢迎关注我的CSDN博客:https://mp.csdn.net/console/home 免积分下载

    zeromq-4.2.1.tar.gz (包括安装包和zeromq\jzmq教程)

    ZeroMQ(也说明 ØMQ,0MQ 或 ZMQ)是一个高性能的异步消息库,旨在使用分布式或并行应用程序。它提供了一个消息队列,但 不同于面向消息的中间件,一个 ZeroMQ 系统可以在没有专用运行消息代理。jzmq安装包,本人也...

    zeromq-4.2.5.tar.gz

    zeromq-4.2.5.tar.gz

    Win64-ZeroMQ-JZMQ-CZMQ.zip

    VS2015 在Widows 10 上编译的 ZeroMQ 4.3.2,JZMQ 3.1 CZMQ 4.2,可以在 JDK 1.8 下运行。DLL 都是 64位,包含了编译及运行相关信息。分享一下,也给自己留个备份

Global site tag (gtag.js) - Google Analytics