`
iyuan
  • 浏览: 462432 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

zeroMQ初体验-26.可靠性-管家模式

    博客分类:
  • MQ
阅读更多
上一节末尾有说到协议,zeromq自然做了充沛的封装,"管家模式"便由此而来。


是不是有点像简化版的"偏执模式"?这里的“broker”需要做到"承上启下"。因为这是"协议"的具体实现,自然,这里以api形式给出各个角色的相应实现。

为客户端提供的api:
local setmetatable = setmetatable

local mdp = require"mdp"

local zmq = require"zmq"
local zpoller = require"zmq.poller"
local zmsg = require"zmsg"
require"zhelpers"

local s_version_assert = s_version_assert

local obj_mt = {}
obj_mt.__index = obj_mt

function obj_mt:set_timeout(timeout)
    self.timeout = timeout
end

function obj_mt:set_retries(retries)
    self.retries = retries
end

function obj_mt:destroy()
    if self.client then self.client:close() end
    self.context:term()
end

local function s_mdcli_connect_to_broker(self)
    -- close old socket.
    if self.client then
        self.poller:remove(self.client)
        self.client:close()
    end
    self.client = assert(self.context:socket(zmq.REQ))
    assert(self.client:setopt(zmq.LINGER, 0))
    assert(self.client:connect(self.broker))
    if self.verbose then
        s_console("I: connecting to broker at %s…", self.broker)
    end
    -- add socket to poller
    self.poller:add(self.client, zmq.POLLIN, function()
        self.got_reply = true
    end)
end

--
-- Send request to broker and get reply by hook or crook
-- Returns the reply message or nil if there was no reply.
--
function obj_mt:send(service, request)
    -- Prefix request with protocol frames
    -- Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
    -- Frame 2: Service name (printable string)
    request:push(service)
    request:push(mdp.MDPC_CLIENT)
    if self.verbose then
        s_console("I: send request to '%s' service:", service)
        request:dump()
    end

    local retries = self.retries
    while (retries > 0) do
        local msg = request:dup()
        msg:send(self.client)
        self.got_reply = false

        while true do
            local cnt = assert(self.poller:poll(self.timeout * 1000))
            if cnt ~= 0 and self.got_reply then
                local msg = zmsg.recv(self.client)
                if self.verbose then
                    s_console("I: received reply:")
                    msg:dump()
                end
                assert(msg:parts() >= 3)

                local header = msg:pop()
                assert(header == mdp.MDPC_CLIENT)
                local reply_service = msg:pop()
                assert(reply_service == service)
                return msg
            else
                retries = retries - 1
                if (retries > 0) then
                    if self.verbose then
                        s_console("W: no reply, reconnecting…")
                    end
                    -- Reconnect
                    s_mdcli_connect_to_broker(self)
                    break -- outer loop will resend request.
                else
                    if self.verbose then
                        s_console("W: permanent error, abandoning request")
                    end
                    return nil -- Giving up
                end
            end
        end
    end
end

module(…)

function new(broker, verbose)
    s_version_assert (2, 1);
    local self = setmetatable({
        context = zmq.init(1),
        poller = zpoller.new(1),
        broker = broker,
        verbose = verbose,
        timeout = 2500, -- msecs
        retries = 3,    -- before we abandon
    }, obj_mt)

    s_mdcli_connect_to_broker(self)
    return self
end

setmetatable(_M, { __call = function(self, …) return new(…) end })

客户端调用:
require"mdcliapi"
require"zmsg"
require"zhelpers"

local verbose = (arg[1] == "-v")
local session = mdcliapi.new("tcp://localhost:5555", verbose)

local count=1
repeat
    local request = zmsg.new("Hello world")
    local reply = session:send("echo", request)
    if not reply then
        break    --  Interrupt or failure
    end
    count = count + 1
until (count == 100000)
printf("%d requests/replies processed\n", count)
session:destroy()

服务端api:
local HEARTBEAT_LIVENESS = 3  -- 3-5 is reasonable

local setmetatable = setmetatable

local mdp = require"mdp"

local zmq = require"zmq"
local zpoller = require"zmq.poller"
local zmsg = require"zmsg"
require"zhelpers"

local s_version_assert = s_version_assert

local obj_mt = {}
obj_mt.__index = obj_mt

function obj_mt:set_heartbeat(heartbeat)
    self.heartbeat = heartbeat
end

function obj_mt:set_reconnect(reconnect)
    self.reconnect = reconnect
end

function obj_mt:destroy()
    if self.worker then self.worker:close() end
    self.context:term()
end

-- Send message to broker
-- If no msg is provided, create one internally
local function s_mdwrk_send_to_broker(self, command, option, msg)
    msg = msg or zmsg.new()

    -- Stack protocol envelope to start of message
    if option then
        msg:push(option)
    end
    msg:push(command)
    msg:push(mdp.MDPW_WORKER)
    msg:push("")

    if self.verbose then
        s_console("I: sending %s to broker", mdp.mdps_commands[command])
        msg:dump()
    end
    msg:send(self.worker)
end

local function s_mdwrk_connect_to_broker(self)
    -- close old socket.
    if self.worker then
        self.poller:remove(self.worker)
        self.worker:close()
    end
    self.worker = assert(self.context:socket(zmq.XREQ))
    assert(self.worker:setopt(zmq.LINGER, 0))
    assert(self.worker:connect(self.broker))
    if self.verbose then
        s_console("I: connecting to broker at %s…", self.broker)
    end
    -- Register service with broker
    s_mdwrk_send_to_broker(self, mdp.MDPW_READY, self.service)
    -- If liveness hits zero, queue is considered disconnected
    self.liveness = HEARTBEAT_LIVENESS
    self.heartbeat_at = s_clock() + self.heartbeat
    -- add socket to poller
    self.poller:add(self.worker, zmq.POLLIN, function()
        self.got_msg = true
    end)
end

--
-- Send reply, if any, to broker and wait for next request.
--
function obj_mt:recv(reply)
    -- Format and send the reply if we are provided one
    if reply then
        assert(self.reply_to)
        reply:wrap(self.reply_to, "")
        self.reply_to = nil
        s_mdwrk_send_to_broker(self, mdp.MDPW_REPLY, nil, reply)
    end
    self.expect_reply = true

    self.got_msg = false
    while true do
        local cnt = assert(self.poller:poll(self.heartbeat * 1000))
        if cnt ~= 0 and self.got_msg then
            self.got_msg = false
            local msg = zmsg.recv(self.worker)
            if self.verbose then
                s_console("I: received message from broker:")
                msg:dump()
            end
            self.liveness = HEARTBEAT_LIVENESS
            -- Don't try to handle errors, just assert noisily
            assert(msg:parts() >= 3)

            local empty = msg:pop()
            assert(empty == "")

            local header = msg:pop()
            assert(header == mdp.MDPW_WORKER)

            local command = msg:pop()
            if command == mdp.MDPW_REQUEST then
                -- We should pop and save as many addresses as there are
                -- up to a null part, but for now, just save one…
                self.reply_to = msg:unwrap()
                return msg -- We have a request to process
            elseif command == mdp.MDPW_HEARTBEAT then
                -- Do nothing for heartbeats
            elseif command == mdp.MDPW_DISCONNECT then
                -- dis-connect and re-connect to broker.
                s_mdwrk_connect_to_broker(self)
            else
                s_console("E: invalid input message (%d)", command:byte(1,1))
                msg:dump()
            end
        else
            self.liveness = self.liveness - 1
            if (self.liveness == 0) then
                if self.verbose then
                    s_console("W: disconnected from broker - retrying…")
                end
                -- sleep then Reconnect
                s_sleep(self.reconnect)
                s_mdwrk_connect_to_broker(self)
            end

            -- Send HEARTBEAT if it's time
            if (s_clock() > self.heartbeat_at) then
                s_mdwrk_send_to_broker(self, mdp.MDPW_HEARTBEAT)
                self.heartbeat_at = s_clock() + self.heartbeat
            end
        end
    end
end

module(…)

function new(broker, service, verbose)
    s_version_assert(2, 1);
    local self = setmetatable({
        context = zmq.init(1),
        poller = zpoller.new(1),
        broker = broker,
        service = service,
        verbose = verbose,
        heartbeat = 2500, -- msecs
        reconnect = 2500, -- msecs
    }, obj_mt)

    s_mdwrk_connect_to_broker(self)
    return self
end

setmetatable(_M, { __call = function(self, …) return new(…) end })

服务端调用:
require"mdwrkapi"
require"zmsg"

local verbose = (arg[1] == "-v")
local session = mdwrkapi.new("tcp://localhost:5555", "echo", verbose)

local reply
while true do
    local request = session:recv(reply)
    if not request then
        break              --  Worker was interrupted
    end
    reply = request        --  Echo is complex… :-)
end
session:destroy()


注意:
这里的api全部都是单线程的,不会做心跳,并且不会做错误报告(这里可以根据具体需要修正)。确定连接通路就任务分配的是“管家”:
require"zmq"
require"zmq.poller"
require"zmsg"
require"zhelpers"
require"mdp"

local tremove = table.remove

--  We'd normally pull these from config data

local HEARTBEAT_LIVENESS   = 3       --  3-5 is reasonable
local HEARTBEAT_INTERVAL   = 2500    --  msecs
local HEARTBEAT_EXPIRY     = HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS

--  ---------------------------------------------------------------------
--  Constructor for broker object

--  ---------------------------------------------------------------------
--  Broker object's metatable.
local broker_mt = {}
broker_mt.__index = broker_mt

function broker_new(verbose)
    local context = zmq.init(1)
    --  Initialize broker state
    return setmetatable({
        context = context,
        socket = context:socket(zmq.XREP),
        verbose = verbose,
        services = {},
        workers = {},
        waiting = {},
        heartbeat_at = s_clock() + HEARTBEAT_INTERVAL,
    }, broker_mt)
end

--  ---------------------------------------------------------------------
--  Service object
local service_mt = {}
service_mt.__index = service_mt

--  Worker object
local worker_mt = {}
worker_mt.__index = worker_mt

-- helper list remove function
local function zlist_remove(list, item)
    for n=#list,1,-1 do
        if list[n] == item then
            tremove(list, n)
        end
    end
end

--  ---------------------------------------------------------------------
--  Destructor for broker object

function broker_mt:destroy()
    self.socket:close()
    self.context:term()
    for name, service in pairs(self.services) do
        service:destroy()
    end
    for id, worker in pairs(self.workers) do
        worker:destroy()
    end
end

--  ---------------------------------------------------------------------
--  Bind broker to endpoint, can call this multiple times
--  We use a single socket for both clients and workers.

function broker_mt:bind(endpoint)
    self.socket:bind(endpoint)
    s_console("I: MDP broker/0.1.1 is active at %s", endpoint)
end

--  ---------------------------------------------------------------------
--  Delete any idle workers that haven't pinged us in a while. Workers
--  are oldest to most recent, so we stop at the first alive worker.

function broker_mt:purge_workers()
    local waiting = self.waiting
    for n=1,#waiting do
        local worker = waiting[n]
        if (not worker:expired()) then
            return             --  Worker is alive, we're done here
        end
        if (self.verbose) then
            s_console("I: deleting expired worker: %s", worker.identity)
        end

        self:worker_delete(worker, false)
    end
end

--  ---------------------------------------------------------------------
--  Locate or create new service entry

function broker_mt:service_require(name)
    assert (name)
    local service = self.services[name]
    if not service then
        service = setmetatable({
            name = name,
            requests = {},
            waiting = {},
            workers = 0,
        }, service_mt)
        self.services[name] = service
        if (self.verbose) then
            s_console("I: received message:")
        end
    end
    return service
end

--  ---------------------------------------------------------------------
--  Destroy service object, called when service is removed from
--  broker.services.

function service_mt:destroy()
end

--  ---------------------------------------------------------------------
--  Dispatch requests to waiting workers as possible

function broker_mt:service_dispatch(service, msg)
    assert (service)
    local requests = service.requests
    if (msg) then               --  Queue message if any
        requests[#requests + 1] = msg
    end

    self:purge_workers()
    local waiting = service.waiting
    while (#waiting > 0 and #requests > 0) do
        local worker = tremove(waiting, 1) -- pop worker from service's waiting queue.
        zlist_remove(self.waiting, worker) -- also remove worker from broker's waiting queue.
        local msg = tremove(requests, 1) -- pop request from service's request queue.
        self:worker_send(worker, mdp.MDPW_REQUEST, nil, msg)
    end
end

--  ---------------------------------------------------------------------
--  Handle internal service according to 8/MMI specification

function broker_mt:service_internal(service_name, msg)
    if (service_name == "mmi.service") then
        local name = msg:body()
        local service = self.services[name]
        if (service and service.workers) then
            msg:body_set("200")
        else
            msg:body_set("404")
        end
    else
        msg:body_set("501")
    end

    --  Remove & save client return envelope and insert the
    --  protocol header and service name, then rewrap envelope.
    local client = msg:unwrap()
    msg:wrap(mdp.MDPC_CLIENT, service_name)
    msg:wrap(client, "")

    msg:send(self.socket)
end

--  ---------------------------------------------------------------------
--  Creates worker if necessary

function broker_mt:worker_require(identity)
    assert (identity)

    --  self.workers is keyed off worker identity
    local worker = self.workers[identity]
    if (not worker) then
        worker = setmetatable({
            identity = identity,
            expiry = 0,
        }, worker_mt)
        self.workers[identity] = worker
        if (self.verbose) then
            s_console("I: registering new worker: %s", identity)
        end
    end
    return worker
end

--  ---------------------------------------------------------------------
--  Deletes worker from all data structures, and destroys worker

function broker_mt:worker_delete(worker, disconnect)
    assert (worker)
    if (disconnect) then
        self:worker_send(worker, mdp.MDPW_DISCONNECT)
    end
    local service = worker.service
    if (service) then
        zlist_remove (service.waiting, worker)
        service.workers = service.workers - 1
    end
    zlist_remove (self.waiting, worker)
    self.workers[worker.identity] = nil
    worker:destroy()
end

--  ---------------------------------------------------------------------
--  Destroy worker object, called when worker is removed from
--  broker.workers.

function worker_mt:destroy(argument)
end

--  ---------------------------------------------------------------------
--  Process message sent to us by a worker

function broker_mt:worker_process(sender, msg)
    assert (msg:parts() >= 1)     --  At least, command

    local command = msg:pop()
    local worker_ready = (self.workers[sender] ~= nil)
    local worker = self:worker_require(sender)

    if (command == mdp.MDPW_READY) then
        if (worker_ready) then          --  Not first command in session then
            self:worker_delete(worker, true)
        elseif (sender:sub(1,4) == "mmi.") then  --  Reserved service name
            self:worker_delete(worker, true)
        else
            --  Attach worker to service and mark as idle
            local service_name = msg:pop()
            local service = self:service_require(service_name)
            worker.service = service
            service.workers = service.workers + 1
            self:worker_waiting(worker)
        end
    elseif (command == mdp.MDPW_REPLY) then
        if (worker_ready) then
            --  Remove & save client return envelope and insert the
            --  protocol header and service name, then rewrap envelope.
            local client = msg:unwrap()
            msg:wrap(mdp.MDPC_CLIENT, worker.service.name)
            msg:wrap(client, "")

            msg:send(self.socket)
            self:worker_waiting(worker)
        else
            self:worker_delete(worker, true)
        end
    elseif (command == mdp.MDPW_HEARTBEAT) then
        if (worker_ready) then
            worker.expiry = s_clock() + HEARTBEAT_EXPIRY
        else
            self:worker_delete(worker, true)
        end
    elseif (command == mdp.MDPW_DISCONNECT) then
        self:worker_delete(worker, false)
    else
        s_console("E: invalid input message (%d)", command:byte(1,1))
        msg:dump()
    end
end

--  ---------------------------------------------------------------------
--  Send message to worker
--  If pointer to message is provided, sends & destroys that message

function broker_mt:worker_send(worker, command, option, msg)
    msg = msg and msg:dup() or zmsg.new()

    --  Stack protocol envelope to start of message
    if (option) then                 --  Optional frame after command
        msg:push(option)
    end
    msg:push(command)
    msg:push(mdp.MDPW_WORKER)
    --  Stack routing envelope to start of message
    msg:wrap(worker.identity, "")

    if (self.verbose) then
        s_console("I: sending %s to worker", mdp.mdps_commands[command])
        msg:dump()
    end
    msg:send(self.socket)
end

--  ---------------------------------------------------------------------
--  This worker is now waiting for work

function broker_mt:worker_waiting(worker)
    --  Queue to broker and service waiting lists
    self.waiting[#self.waiting + 1] = worker
    worker.service.waiting[#worker.service.waiting + 1] = worker
    worker.expiry = s_clock() + HEARTBEAT_EXPIRY
    self:service_dispatch(worker.service, nil)
end

--  ---------------------------------------------------------------------
--  Return 1 if worker has expired and must be deleted

function worker_mt:expired()
    return (self.expiry < s_clock())
end
--  ---------------------------------------------------------------------
--  Process a request coming from a client

function broker_mt:client_process(sender, msg)
    assert (msg:parts() >= 2)     --  Service name + body

    local service_name = msg:pop()
    local service = self:service_require(service_name)
    --  Set reply return address to client sender
    msg:wrap(sender, "")
    if (service_name:sub(1,4) == "mmi.") then
        self:service_internal(service_name, msg)
    else
        self:service_dispatch(service, msg)
    end
end

--  ---------------------------------------------------------------------
--  Main broker work happens here

local verbose = (arg[1] == "-v")

s_version_assert (2, 1)
s_catch_signals ()
local self = broker_new(verbose)
self:bind("tcp://*:5555")

local poller = zmq.poller.new(1)

--  Process next input message, if any
poller:add(self.socket, zmq.POLLIN, function()
    local msg = zmsg.recv(self.socket)
    if (self.verbose) then
        s_console("I: received message:")
        msg:dump()
    end
    local sender = msg:pop()
    local empty  = msg:pop()
    local header = msg:pop()

    if (header == mdp.MDPC_CLIENT) then
        self:client_process(sender, msg)
    elseif (header == mdp.MDPW_WORKER) then
        self:worker_process(sender, msg)
    else
        s_console("E: invalid message:")
        msg:dump()
    end
end)

--  Get and process messages forever or until interrupted
while (not s_interrupted) do
    local cnt = assert(poller:poll(HEARTBEAT_INTERVAL * 1000))
    --  Disconnect and delete any expired workers
    --  Send heartbeats to idle workers if needed
    if (s_clock() > self.heartbeat_at) then
        self:purge_workers()
        local waiting = self.waiting
        for n=1,#waiting do
            local worker = waiting[n]
            self:worker_send(worker, mdp.MDPW_HEARTBEAT)
        end
        self.heartbeat_at = s_clock() + HEARTBEAT_INTERVAL
    end
end
if (s_interrupted) then
    printf("W: interrupt received, shutting down…\n")
end
self:destroy()


这里的“管家”基本上做了所有他能做的事:心跳,代理发送信息,合理利用多服务资源。
或许,效能上还有些问题,那么试试"异步"?
require"zmq"
require"zmq.threads"
require"zmsg"

local common_code = [[
    require"zmq"
    require"zmsg"
    require"zhelpers"
]]

local client_task = common_code .. [[
    local context = zmq.init(1)
    local client = context:socket(zmq.XREQ)
    client:setopt(zmq.IDENTITY, "C", 1)
    client:connect("tcp://localhost:5555")

    printf("Setting up test…\n")
    s_sleep(100)

    local requests
    local start

    printf("Synchronous round-trip test…\n")
    requests = 10000
    start = s_clock()
    for n=1,requests do
        local msg = zmsg.new("HELLO")
        msg:send(client)
        msg = zmsg.recv(client)
    end
    printf(" %d calls/second\n",
        (1000 * requests) / (s_clock() - start))

    printf("Asynchronous round-trip test…\n")
    requests = 100000
    start = s_clock()
    for n=1,requests do
        local msg = zmsg.new("HELLO")
        msg:send(client)
    end
    for n=1,requests do
        local msg = zmsg.recv(client)
    end
    printf(" %d calls/second\n",
        (1000 * requests) / (s_clock() - start))

    client:close()
    context:term()
]]

local worker_task = common_code .. [[
    local context = zmq.init(1)
    local worker = context:socket(zmq.XREQ)
    worker:setopt(zmq.IDENTITY, "W", 1)
    worker:connect("tcp://localhost:5556")

    while true do
        local msg = zmsg.recv(worker)
        msg:send(worker)
    end
    worker:close()
    context:term()
]]

local broker_task = common_code .. [[
    --  Prepare our context and sockets
    local context = zmq.init(1)
    local frontend = context:socket(zmq.XREP)
    local backend  = context:socket(zmq.XREP)
    frontend:bind("tcp://*:5555")
    backend:bind("tcp://*:5556")

    require"zmq.poller"
    local poller = zmq.poller(2)
    poller:add(frontend, zmq.POLLIN, function()
        local msg = zmsg.recv(frontend)
        --msg[1] = "W"
        msg:pop()
        msg:push("W")
        msg:send(backend)
    end)
    poller:add(backend, zmq.POLLIN, function()
        local msg = zmsg.recv(backend)
        --msg[1] = "C"
        msg:pop()
        msg:push("C")
        msg:send(frontend)
    end)
    poller:start()
    frontend:close()
    backend:close()
    context:term()
]]

s_version_assert(2, 1)

local client = zmq.threads.runstring(nil, client_task)
assert(client:start())
local worker = zmq.threads.runstring(nil, worker_task)
assert(worker:start(true))
local broker = zmq.threads.runstring(nil, broker_task)
assert(broker:start(true))

assert(client:join())


如此这般,效能倒是大大降低了(官网说法是降了近20倍),分析了下原因,由于异步需要管理各条任务,不断轮询之类的原因,反倒降低了性能,那么I/O的异步呢?

异步的客户端api:
local setmetatable = setmetatable

local mdp = require"mdp"

local zmq = require"zmq"
local zpoller = require"zmq.poller"
local zmsg = require"zmsg"
require"zhelpers"

local s_version_assert = s_version_assert

local obj_mt = {}
obj_mt.__index = obj_mt

function obj_mt:set_timeout(timeout)
    self.timeout = timeout
end

function obj_mt:destroy()
    if self.client then self.client:close() end
    self.context:term()
end

local function s_mdcli_connect_to_broker(self)
    -- close old socket.
    if self.client then
        self.poller:remove(self.client)
        self.client:close()
    end
    self.client = assert(self.context:socket(zmq.XREQ))
    assert(self.client:setopt(zmq.LINGER, 0))
    assert(self.client:connect(self.broker))
    if self.verbose then
        s_console("I: connecting to broker at %s…", self.broker)
    end
    -- add socket to poller
    self.poller:add(self.client, zmq.POLLIN, function()
        self.got_reply = true
    end)
end

--
-- Send request to broker and get reply by hook or crook
--
function obj_mt:send(service, request)
    -- Prefix request with protocol frames
    -- Frame 0: empty (REQ emulation)
    -- Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
    -- Frame 2: Service name (printable string)
    request:push(service)
    request:push(mdp.MDPC_CLIENT)
    request:push("")
    if self.verbose then
        s_console("I: send request to '%s' service:", service)
        request:dump()
    end
    request:send(self.client)
    return 0
end

--  Returns the reply message or NULL if there was no reply. Does not
--  attempt to recover from a broker failure, this is not possible
--  without storing all unanswered requests and resending them all…
function obj_mt:recv()
    self.got_reply = false

    local cnt = assert(self.poller:poll(self.timeout * 1000))
    if cnt ~= 0 and self.got_reply then
        local msg = zmsg.recv(self.client)
        if self.verbose then
            s_console("I: received reply:")
            msg:dump()
        end
        assert(msg:parts() >= 3)

        local empty = msg:pop()
        assert(empty == "")

        local header = msg:pop()
        assert(header == mdp.MDPC_CLIENT)

        return msg
    end
    if self.verbose then
        s_console("W: permanent error, abandoning request")
    end
    return nil -- Giving up
end

module(…)

function new(broker, verbose)
    s_version_assert (2, 1);
    local self = setmetatable({
        context = zmq.init(1),
        poller = zpoller.new(1),
        broker = broker,
        verbose = verbose,
        timeout = 2500, -- msecs
    }, obj_mt)

    s_mdcli_connect_to_broker(self)
    return self
end

setmetatable(_M, { __call = function(self, …) return new(…) end })


异步的客户端:
require"mdcliapi2"
require"zmsg"
require"zhelpers"

local verbose = (arg[1] == "-v")
local session = mdcliapi2.new("tcp://localhost:5555", verbose)

local count=100000
for n=1,count do
    local request = zmsg.new("Hello world")
    session:send("echo", request)
end
for n=1,count do
    local reply = session:recv()
    if not reply then
        break   --  Interrupted by Ctrl-C
    end
end
printf("%d replies received\n", count)
session:destroy()


当当当当:
$ time mdclient
同步的:
real    0m14.088s
user    0m1.310s
sys     0m2.670s
异步的:
real    0m8.730s
user    0m0.920s
sys     0m1.550s
10个服务端的异步:
real    0m3.863s
user    0m0.730s
sys     0m0.470s

经过测试,4核的话起8个服务端就算饱和了。不过,就效率而言,应该是足够了。
值得注意到是,"异步管家模式"并非全能。由于他没有做管家的连接重试,所以一旦“管家”崩溃了,那自然一切都say goodbye了。

为了服务更靠谱,或许还需要一个叫做"发现服务"的系统,来确认到底有哪些服务可用。
require"mdcliapi"
require"zmsg"
require"zhelpers"

local verbose = (arg[1] == "-v")
local session = mdcliapi.new("tcp://localhost:5555", verbose)

--  This is the service we want to look up
local request = zmsg.new("echo")

--  This is the service we send our request to
local reply = session:send("mmi.service", request)

if (reply) then
    printf ("Lookup echo service: %s\n", reply:body())
else
    printf ("E: no response from broker, make sure it's running\n")
end

session:destroy()



注:以上皆为lua代码

(未完待续)
0
1
分享到:
评论

相关推荐

    zeromq-2.1.7.tar.gz

    zeromq-2.1.7.tar.gz 的早期的一个版本,本人已安装成功 放心使用

    zeromq-4.0.3.tar.gz.zip

    zeromq-4.0.3.tar.gz zeromq-4.0.3.tar.gz zeromq-4.0.3.tar.gz

    zeromq-4.3.2.tar.gz

    zeromq-4.3.2.tar.gz,可在linux下编译安装,能够使用zeromq进行sorket开发,多线程,提升性能,效率,可以配合msgpack进行使用,是个好的扩展插件

    zeromq-4.1.3.tar.gz

    zeromq-4.1.3.tar.gz,最新的zeromq的开发工具包,希望对开发者有用

    zeromq-4.1.8.tar.gz

    zeromq-4.1.8.tar.gz 有问题请联系

    zeromq-4.0.5-4.el7.x86_64.rpm

    官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装

    zeromq-2.1.9.tar.gz

    zeromq-2.1.9.tar.gz 这是zeromq linux 官方原版 请放心下载

    zeromq-4.2.3.tar.gz

    zeromq-4.2.3.tar.gz 一个稳定的版本,可以安装使用!

    zeromq-4.3.4.zip

    0MQ version 4.3.4 stable, released on 2021/01/17

    zeromq-3.2.5.tar.gz、jzmq.tar.gz、Python-2.6.6.tar.bz2、storm-0.8.0.zip下载

    storm搭建所需资源

    zeromq-4.2.0.tar.gz源码包

    在官网下载zeromq太慢了,网速极不稳定,特意下载放在这里供大家下载,当然象征性地赚点 资源分

    zeromq-3.2.5.tar.gz

    ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...

    zeromq-4.1.4.tar.gz

    ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...

    zeromq-4.1.2.tar.gz

    ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。

    zeromq-4.3.4.tar.gz

    0MQ version 4.3.4 stable, released on 2021/01/17

    zeromq-4.2.1.tar.gz (包括安装包和zeromq\jzmq教程)

    ZeroMQ(也说明 ØMQ,0MQ 或 ZMQ)是一个高性能的异步消息库,旨在使用分布式或并行应用程序。它提供了一个消息队列,但 不同于面向消息的中间件,一个 ZeroMQ 系统可以在没有专用运行消息代理。jzmq安装包,本人也...

    zeromq-4.2.3.zip

    zeroMQ 4.2.3版本 zeromq-4.2.3.tar.gz 欢迎关注我的CSDN博客:https://mp.csdn.net/console/home 免积分下载

    zeromq-4.2.5.tar.gz

    zeromq-4.2.5.tar.gz

    Win64-ZeroMQ-JZMQ-CZMQ.zip

    VS2015 在Widows 10 上编译的 ZeroMQ 4.3.2,JZMQ 3.1 CZMQ 4.2,可以在 JDK 1.8 下运行。DLL 都是 64位,包含了编译及运行相关信息。分享一下,也给自己留个备份

Global site tag (gtag.js) - Google Analytics