ZeroMQ基础入门

ZeroMQ基础入门MQ 是消息队列 message queue 的简称 目前有多种消息队列可用 包括 RabbitMQ Kafka 等 它们各有特色 可以结合具体的项目需求使用 ZeroMQ 简称 Zmq 或者 0mq 核心引擎由 c 编写 是轻量级消息通信库

大家好,我是讯享网,很高兴认识大家。

MQ是消息队列(message queue)的简称,目前有多种消息队列可用,包括RabbitMQ、Kafka等,它们各有特色,可以结合具体的项目需求使用。

ZeroMQ简称Zmq,或者0mq,核心引擎由c++编写,是轻量级消息通信库,在对传统的标准socket接口扩展的基础上形成的特色消息通信中间件。

Zmq提供了异步消息队列的抽象,具有多种消息通信模式,能够实现消息过滤,能够无缝对接多种传输协议

简言之,使用socket时,需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而ZMQ屏蔽了这些细节,让网络编程更简单。

ZeroMQ是网络通信中新的一层,介于应用层和传输层之间(按照TCP/IP划分),是一个可伸缩层,可并行运行,分散在分布式系统间。

总之,下面摘自100字描述:

ZMQ(ØMQ、ZeroMQ, 0MQ)看起来像是一套嵌入式的网络链接库,但工作起来更像是一个并发式的框架。它提供的套接字可以在多种协议中传输消息,如线程间、进程间、TCP、广播等。你可以使用套接字构建多对多的连接模式,如扇出、发布-订阅、任务分发、请求-应答等。ZMQ的快速足以胜任集群应用产品。它的异步I/O机制让你能够构建多核应用程序,完成异步消息处理任务。ZMQ有着多语言支持,并能在几乎所有的操作系统上运行。

ZMQ是iMatix公司的产品,以LGPLv3开源协议发布。


ZMQ有多种模式可以使用,常用的模式包括request/reply,publish/subscribe,push/pull三种。

1. request-reply(请求/应答)

一组服务端和一组客户端相连,用于远程过程调用或任务分发

这里使用请求-应答模式实现一个简单的hello world程序。

和普通的socket通信一样,需要一个客户端和一个服务器。客户端发送hello,服务器收到hello后回应world,如图所示。


讯享网 

服务器代码:

// // Hello World server in C++ // Binds REP socket to tcp://*:5555 // Expects "Hello" from client, replies with "World" // #include <zmq.hpp> #include <string> #include <iostream> #ifndef _WIN32 #include <unistd.h> #else #include <windows.h> #define sleep(n) Sleep(n) #endif int main () { // Prepare our context and socket zmq::context_t context (1); zmq::socket_t socket (context, ZMQ_REP); socket.bind ("tcp://*:5555"); while (true) { zmq::message_t request; // Wait for next request from client socket.recv (&request); std::cout << "Received Hello" << std::endl; // Do some 'work' sleep(1); // Send reply back to client zmq::message_t reply (5); memcpy (reply.data (), "World", 5); socket.send (reply); } return 0; } 

讯享网

客户端代码:

讯享网// // Hello World client in C++ // Connects REQ socket to tcp://localhost:5555 // Sends "Hello" to server, expects "World" back // #include <zmq.hpp> #include <string> #include <iostream> int main () { // Prepare our context and socket zmq::context_t context (1); zmq::socket_t socket (context, ZMQ_REQ); std::cout << "Connecting to hello world server…" << std::endl; socket.connect ("tcp://localhost:5555"); // Do 10 requests, waiting each time for a response for (int request_nbr = 0; request_nbr != 10; request_nbr++) { zmq::message_t request (5); memcpy (request.data (), "Hello", 5); std::cout << "Sending Hello " << request_nbr << "…" << std::endl; socket.send (request); // Get the reply. zmq::message_t reply; socket.recv (&reply); std::cout << "Received World " << request_nbr << std::endl; } return 0; } 

使用REQ-REP套接字发送和接受消息是需要遵循一定规律的。客户端首先使用zmq_send()发送消息,再用zmq_recv()接收,如此循环。如果打乱了这个顺序(如连续发送两次)则会报错。类似地,服务端必须先进行接收,后进行发送(和http请求的模式一样,请求响应,一次请求对应一个响应)

理论上你可以连接千万个客户端到这个服务端上,同时连接都没问题,程序仍会运作得很好(因为是异步的)。你可以尝试一下先打开客户端,再打开服务端,可以看到程序仍然会正常工作。

对于字符串,需要注意的一点是,ZMQ不会关心发送消息的内容,只要知道它所包含的字节数。这意味着,ZMQ的字符串是有长度的,且传送时不加结束符。当使用c语言接收时,应注意申请比长度多一个字节的存储空间,并置位结束符’/0’,否则在打印字符串时可能得到奇怪的结果。

下面是一个使用c语言处理字符串的例子:

// 从ZMQ套接字中接收字符串,并转换为C语言的字符串 static char * s_recv (void *socket) { zmq_msg_t message; zmq_msg_init (&message); zmq_recv (socket, &message, 0); int size = zmq_msg_size (&message); char *string = malloc (size + 1);//release it after use memcpy (string, zmq_msg_data (&message), size); zmq_msg_close (&message); string [size] = 0; return (string); }

2. publish/subscribe(发布/订阅)

将一组发布者和一组订阅者相连,用于数据分发

发布/订阅模式实现了单向数据分发服务端将事件发送给一组客户端

让我们看一个天气信息发布的例子,包括邮编、温度、相对湿度。我们生成这些随机信息,用来模拟气象站所做的那样。

服务端代码:

讯享网// Weather update server in C++ // Binds PUB socket to tcp://*:5556 // Publishes random weather updates // // Olivier Chamoux <> // #include <zmq.hpp> #include <stdio.h> #include <stdlib.h> #include <time.h> #if (defined (WIN32)) #include <zhelpers.hpp> #endif #define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0)) int main () { // Prepare our context and publisher zmq::context_t context (1); zmq::socket_t publisher (context, ZMQ_PUB); publisher.bind("tcp://*:5556"); publisher.bind("ipc://weather.ipc"); // Not usable on Windows. // Initialize random number generator srandom ((unsigned) time (NULL)); while (1) { int zipcode, temperature, relhumidity; // Get values that will fool the boss zipcode = within (); temperature = within (215) - 80; relhumidity = within (50) + 10; // Send message to all subscribers zmq::message_t message(20); snprintf ((char *) message.data(), 20 , "%05d %d %d", zipcode, temperature, relhumidity); //中间有一个空格分割 publisher.send(message); } return 0; } 

服务端在一个无限循环里更新数据,并把天气信息持续不断地发布出去。注意,它不关心是否有人接收数据,只管发送。如果没有任何人接收数据,它只是简单地丢弃消息。

客户端代码:

// Weather update client in C++ // Connects SUB socket to tcp://localhost:5556 // Collects weather updates and finds avg temp in zipcode // // Olivier Chamoux <> #include <zmq.hpp> #include <iostream> #include <sstream> int main (int argc, char *argv[]) { zmq::context_t context (1); // Socket to talk to server std::cout << "Collecting updates from weather server…\n" << std::endl; zmq::socket_t subscriber (context, ZMQ_SUB); subscriber.connect("tcp://localhost:5556"); // Subscribe to zipcode, default is NYC, 10001 const char *filter = (argc > 1)? argv [1]: "10001 "; subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter)); // Process 100 updates int update_nbr; long total_temp = 0; for (update_nbr = 0; update_nbr < 100; update_nbr++) { zmq::message_t update; int zipcode, temperature, relhumidity; subscriber.recv(&update); std::istringstream iss(static_cast<char*>(update.data())); iss >> zipcode >> temperature >> relhumidity ; total_temp += temperature; } std::cout << "Average temperature for zipcode '"<< filter <<"' was "<<(int) (total_temp / update_nbr) <<"F" << std::endl; return 0; } 

客户端可以根据自己的需要随意订阅感兴趣的数据,它会接受发布者的消息,只处理特定邮编标注的信息,如纽约的邮编是10001。

需要注意的是,在使用SUB套接字时,必须使用setsockopt()方法来设置订阅的内容。如果你不设置订阅内容,那将什么消息都收不到(订阅任何主题(An empty 'option_value' of length zero shall subscribe to all incoming messages))。订阅信息可以是任何字符串,可以设置多次。只要消息满足其中一条订阅信息,SUB套接字就会收到。订阅者可以选择不接收某类消息,也是通过zmq_setsockopt()方法实现的。

PUB-SUB套接字组合是异步的。客户端在一个循环体中使用zmq_recv()接收消息,如果向SUB套接字发送消息则会报错;类似地,服务端可以不断地使用zmq_send()发送消息,但不能在PUB套接字上使用zmq_recv()。

关于PUB-SUB套接字,还有一点需要注意:你无法得知SUB是何时开始接收消息的。就算你先打开了SUB套接字,后打开PUB发送消息,这时SUB还是会丢失一些消息的,因为建立连接是需要一些时间的。很少,但并不是零。

关于如何使发布者和订阅者同步,只有当订阅者准备好时发布者才会开始发送消息的方法可以深入研究,有一种简单的方法来同步PUB和SUB,就是让PUB延迟一段时间再发送消息。现实编程中不建议使用这种方式,因为它太脆弱了,而且不好控制。

另一种同步的方式则是认为发布者的消息流是无穷无尽的,因此丢失了前面一部分信息也没有关系。我们的气象信息客户端就是这么做的。

几点说明:

  • 订阅者可以连接多个发布者,轮流接收消息
  • 如果发布者没有订阅者与之相连,那它发送的消息将直接被丢弃
  • 如果你使用TCP协议,那当订阅者处理速度过慢时,消息会在发布者处堆积。可以使用阈值(HWM)来保护发布者。
  • 从ZeroMQ v3.x开始,当使用(tcp:// or ipc://)连接协议时,消息的过滤在发布端使用epgm://时,在订阅端过滤。在ZeroMQ v2.x中,所有消息的过滤是在订阅者处进行的。也就是说,发布者会向订阅者发送所有的消息,订阅者会将未订阅的消息丢弃。

3. push/pull(推/拉)

使用扇入或扇出的形式组装多个节点,可以产生多个步骤或循环,用于构建并行处理架构

zmq的并行处理模型可以进行超级计算:

  • 任务分发器会生成大量可以并行计算的任务
  • 有一组worker会处理这些任务,现实中,worker可能散落在不同的计算机中,利用GPU(图像处理单元)进行复杂计算
  • 结果收集器会在末端接收所有worker的处理结果,进行汇总

这就是zmq的推拉模式。

下面是任务分发器的代码,它会生成100个任务,任务内容是让收到的worker延迟若干毫秒。

讯享网// Task ventilator in C++ // Binds PUSH socket to tcp://localhost:5557 // Sends batch of tasks to workers via that socket // // Olivier Chamoux <> // #include <zmq.hpp> #include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <iostream> #define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0)) int main (int argc, char *argv[]) { zmq::context_t context (1); // Socket to send messages on zmq::socket_t sender(context, ZMQ_PUSH); sender.bind("tcp://*:5557"); std::cout << "Press Enter when the workers are ready: " << std::endl; getchar (); std::cout << "Sending tasks to workers…\n" << std::endl; // The first message is "0" and signals start of batch zmq::socket_t sink(context, ZMQ_PUSH); sink.connect("tcp://localhost:5558"); zmq::message_t message(2); memcpy(message.data(), "0", 1); sink.send(message); // Initialize random number generator srandom ((unsigned) time (NULL)); // Send 100 tasks int task_nbr; int total_msec = 0; // Total expected cost in msecs for (task_nbr = 0; task_nbr < 100; task_nbr++) { int workload; // Random workload from 1 to 100msecs workload = within (100) + 1; total_msec += workload; message.rebuild(10); memset(message.data(), '\0', 10); sprintf ((char *) message.data(), "%d", workload); sender.send(message); } std::cout << "Total expected cost: " << total_msec << " msec" << std::endl; sleep (1); // Give 0MQ time to deliver return 0; } 

下面是worker的代码,它接受信息并延迟指定的毫秒数,并发送执行完毕的信号:

// Task worker in C++ // Connects PULL socket to tcp://localhost:5557 // Collects workloads from ventilator via that socket // Connects PUSH socket to tcp://localhost:5558 // Sends results to sink via that socket // // Olivier Chamoux <> // #include "zhelpers.hpp" #include <string> int main (int argc, char *argv[]) { zmq::context_t context(1); // Socket to receive messages on zmq::socket_t receiver(context, ZMQ_PULL); receiver.connect("tcp://localhost:5557"); // Socket to send messages to zmq::socket_t sender(context, ZMQ_PUSH); sender.connect("tcp://localhost:5558"); // Process tasks forever while (1) { zmq::message_t message; int workload; // Workload in msecs receiver.recv(&message); std::string smessage(static_cast<char*>(message.data()), message.size()); std::istringstream iss(smessage); iss >> workload; // Do the work s_sleep(workload); // Send results to sink message.rebuild(); sender.send(message); // Simple progress indicator for the viewer std::cout << "." << std::flush; } return 0; } 

下面是结果收集器的代码。它会收集100个处理结果,并计算总的执行时间,让我们由此判别任务是否是并行计算的。

讯享网// Task sink in C++ // Binds PULL socket to tcp://localhost:5558 // Collects results from workers via that socket // // Olivier Chamoux <> // #include <zmq.hpp> #include <time.h> #include <sys/time.h> #include <iostream> int main (int argc, char *argv[]) { // Prepare our context and socket zmq::context_t context(1); zmq::socket_t receiver(context,ZMQ_PULL); receiver.bind("tcp://*:5558"); // Wait for start of batch zmq::message_t message; receiver.recv(&message); // Start our clock now struct timeval tstart; gettimeofday (&tstart, NULL); // Process 100 confirmations int task_nbr; int total_msec = 0; // Total calculated cost in msecs for (task_nbr = 0; task_nbr < 100; task_nbr++) { receiver.recv(&message); if ((task_nbr / 10) * 10 == task_nbr) std::cout << ":" << std::flush; else std::cout << "." << std::flush; } // Calculate and report duration of batch struct timeval tend, tdiff; gettimeofday (&tend, NULL); if (tend.tv_usec < tstart.tv_usec) { tdiff.tv_sec = tend.tv_sec - tstart.tv_sec - 1; tdiff.tv_usec =  + tend.tv_usec - tstart.tv_usec; } else { tdiff.tv_sec = tend.tv_sec - tstart.tv_sec; tdiff.tv_usec = tend.tv_usec - tstart.tv_usec; } total_msec = tdiff.tv_sec * 1000 + tdiff.tv_usec / 1000; std::cout << "\nTotal elapsed time: " << total_msec << " msec\n" << std::endl; return 0; } 

一组任务的平均执行时间在5秒左右,以下是分别开始1个、2个、4个worker时的执行结果:

# 1 worker Total elapsed time: 5034 msec # 2 workers Total elapsed time: 2421 msec # 4 workers Total elapsed time: 1018 msec 

注意事项:

worker上游和任务分发器相连,下游和结果收集器相连,这就意味着你可以开启任意多个worker。但若worker是绑定至端点的,而非连接至端点,那我们就需要准备更多的端点,并配置任务分发器和结果收集器。所以说,任务分发器和结果收集器是这个网络结构中较为稳定的部分,因此应该由它们绑定至端点,而非worker,因为它们较为动态。

我们需要做一些同步的工作,等待worker全部启动之后再分发任务。这点在ZMQ中很重要,且不易解决。连接套接字的动作会耗费一定的时间,因此当第一个worker连接成功时,它会一下收到很多任务。所以说,如果我们不进行同步,那这些任务根本就不会被并行地执行。你可以自己试验一下。

任务分发器使用PUSH套接字向worker均匀地分发任务(假设所有的worker都已经连接上了),这种机制称为负载均衡

结果收集器的PULL套接字会均匀地从worker处收集消息,这种机制称为公平队列

其他资料

zeroMQ不是TCP,不是socket,也不是消息队列,而是这些的综合体。

ZeroMQ是什么

ZeroMQ以嵌入式网络编程库的形式实现了一个并行开发框架(concurrency framework)
能够提供进程内(inproc)、进程间(IPC)、网络(TCP)和广播方式的消息信道,并支持扇出(fan-out)、发布-订阅(pub-sub)、任务分发(task distribution)、请求/响应(request-reply)等通信模式。

ZeroMQ的明确目标是“成为标准网络协议栈的一部分,之后进入Linux内核”。

Zero 之禅(The Zen of Zero)

ZeroMQ是一个很有个性的项目,其名称也暗合禅意:

  • Ø 是一种权衡:让一些丹麦人恼火,但是“Ø”本身也降低了google搜索的命中率以及twitter上的关注度
  • Ø 暗合“零代理(broker)”、“零延迟”
  • Ø 的目标是“零管理、零消耗、零浪费”
  • Ø 符合简约主义:力量的源泉是降低复杂度,而不是增加新功能

ZeroMQ对socket API的封装

与libevent,ACE等项目不同,使用ZeroMQ时可以不关注网络细节。

ZeroMQ的API提供了对于传统socket API的封装,对于套接字类型、连接处理、帧、甚至路由的底层细节都进行了抽象,使得一套API可以用于进程内通讯、IPC、TCP和广播等多种消息信道。

ZeroMQ的这种设计大大简化了应用程序消息通信的实现,使得在多种场景下重用相同的交互模式成为可能。

使用ZeroMQ可以让编写高性能网络应用程序极为简单和有趣。

与socket相比,ZeroMQ API的特征如下:

  • 在后台线程中异步地处理IO。后台线程使用无需锁的数据结构与应用线程通信,所以ZeroMQ应用程序不需要锁、信号量,或者其他等待状态。
  • 组件可以动态地加入和退出,ZeroMQ会自动重新连接。这意味着可以以任何次序启动组件。可以创建“面向服务架构(service-oriented architectures)”,其中的服务可以在任何时候加入或者退出网络。
  • 在需要的时候自动对消息排队。这种处理是智能的,在排队前会尽量让消息靠近接收者。
  • 有处理队列溢出的方法(“高水位标记”)。队列满的时候,ZeroMQ会自动阻塞发送者,或者丢弃消息,取决于你正在使用的消息传递类型(模式)。
  • ZeroMQ让应用程序可以使用传输端点相互交流:TCP、多播、进程内、进程间。使用不同的传输端点时不用修改代码。
  • 根据消息传递模式的不同,使用不同的策略来安全地处理慢速/阻塞的接收者。
  • 使用请求-应答、发布-订阅等多种模式来路由消息。这些模式定义了如何创建网络拓扑结构。
  • 需要降低互联的各部分间的复杂性的时候,可以在网络中放置模式扩展的“设备”(小的代理)。
  • 通过在线路中使用简单的帧,可以精确地传递整个消息。发送10K的消息,则会收到10K的消息。
  • 不对消息格式做任何假定。消息是从零到数G字节的块。需要在高层使用其他产品来表示数据,如Google的Protocol Buffers、XDR等等。
  • 智能地处理网络错误。有时候重试,有时候告诉你操作失败。
  • 降低能耗。使用较少的CPU时间来做更多事情意味着使用更少的能源,而且较老的机器可以使用更长的时间。

ZeroMQ的通信协议

ZeroMQ定义了ZMTP(ZeroMQ Message Transport Protocol, ZeroMQ消息传输协议),在TCP协议之上定义了向后兼容性的规则,可扩展的安全机制,命令和消息分帧,连接元数据,以及其他传输层功能。

相对于其他的消息传输协议/通信协议,ZeroMQ有明显的优势:

  • TCP:ZeroMQ基于消息,使用消息模式而不是字节流。
  • XMPP:ZeroMQ更简单、快速、更底层。Jabber可建在ØMQ之上。
  • AMQP:完成相同的工作,ZeroMQ要快100倍,而且不需要代理(规范更简洁——比AMQP的规范文档少278页)
  • IPC:ZeroMQ可以跨主机通信
  • CORBA:ZeroMQ不会将复杂到恐怖的消息格式强加于你。
  • RPC:ZeroMQ完全是异步的,你可以随时增加/删除参与者。
  • RFC 1149:ZeroMQ比它快多了!
  • 29west LBM:ZeroMQ是自由软件!
  • IBM Low-latency:ZeroMQ是自由软件!
  • Tibco:ZeroMQ仍然是自由软件!

ZeroMQ不是消息队列

在摩尔定律的魔咒下,“分布式处理”逐渐成为主流,随之而来的是关于消息通讯、消息中间件的项目层出不穷。

其中最有名的应该是ZeroMQ和RabbitMQ,Thrift。

2011年,欧洲核子研究组织(CERN)
调查了统一用于操作CERN加速器的中间件解决方案的方式,欧洲核子研究组织的研究比较了
CORBA,Ice,Thrift,ZeroMQ,YAMI4,RTI和Qpid/AMQP,ZeroMQ得到了最高的分数。

但ZeroMQ最大的特点不在性能,而是机制。尽管名字中包含了"MQ",但ZeroMQ并不是“消息队列/消息中间件”。ZeroMQ是一个传输层API库,更关注消息的传输。与消息队列相比,ZeroMQ有以下一些特点:

1、点对点无中间节点

传统的消息队列都需要一个消息服务器来存储转发消息。而ZeroMQ则放弃了这个模式,把侧重点放在了点对点的消息传输上,并且(试图)做到极致。以为消息服务器最终还是转化为服务器对其他节点的点对点消息传输上。ZeroMQ能缓存消息,但是是在发送端缓存。ZeroMQ里有水位设置的相关接口来控制缓存量。当然,ZeroMQ也支持传统的消息队列(通过zmq_device来实现)。

2、强调消息收发模式

在点对点的消息传输上ZeroMQ将通信的模式做了归纳,比如常见的订阅模式(一个消息发多个客户),分发模式(N个消息平均分给X个客户)等等。下面是目前支持的消息模式配对,任何一方都可以做为服务端。

  • PUB and SUB
  • REQ and REP
  • REQ and XREP
  • XREQ and REP
  • XREQ and XREP
  • XREQ and XREQ
  • XREP and XREP
  • PUSH and PULL
  • PAIR and PAIR

3、以统一接口支持多种底层通信方式

不管是线程间通信,进程间通信还是跨主机通信,ZeroMQ都使用同一套API进行调用,只需要更改通信协议名称(如,从"ipc:///xxx"改为"tcp://...

转自:ZeroMQ基础入门_guotianqing的博客-CSDN博客 

小讯
上一篇 2025-01-29 15:04
下一篇 2025-02-18 20:24

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/17243.html