2 ZMQ套接字和模式 Sockets and Patterns
2 ZMQ套接字和模式 Sockets and Patterns
2.1 The Socket API
- 创建和销毁套接字,它们一起形成套接字生命的循环(参见zmq socket(), zmq close())。
- 通过设置选项来配置套接字,并在必要时检查它们(参见zmq setsockopt(), zmq getsockopt()))。
- 通过创建和网络拓扑之间的ZeroMQ连接(参见zmq bind(), zmq connect()),将套接字插入网络拓扑。
- 通过在套接字上写入和接收消息来传输数据(参见zmq msg send(), zmq msg recv())。
2.1.1 将套接字插入到拓扑
要在两个节点之间创建连接,可以在一个节点中使用zmq_bind(),在另一个节点中使用zmq_connect()。一般来说,执行zmq_bind()的节点是一个“服务器”,位于一个已知的网络地址上,执行zmq_connect()的节点是一个“客户机”,具有未知或任意的网络地址。因此,我们说“将socket 绑定到端点”和“将socket 连接到端点”,端点就是那个已知的网络地址。
ZeroMQ连接与传统的TCP连接有些不同。主要的显著差异是:
- 它们通过任意传输(inproc、ipc、tcp、pgm或epgm)。请参见zmq inproc()、zmq ipc()、zmq tcp()、zmq pgm()、zmq epgm()。
- 一个将socket可能有许多传出和传入连接。.
- 没有’ zmq_accept ‘()方法。当socket绑定到端点时,它将自动开始接受连接
- 网络连接本身发生在后台,如果网络连接中断,ZeroMQ将自动重新连接(例如,如果peer 消失,然后返回)。
- 您的应用程序代码不能直接使用这些连接;它们被封装在socket下面。
许多架构遵循某种客户/服务器模型,服务器是最静态的组件,客户端是最动态的组件(来来去去)。有时寻址问题:服务器对客户端可见,但不一定反之亦然。因此,很明显,这是哪个节点应该正在进行zmq_bind()(服务器),哪个节点应该执行zmq_connect()(客户端)。它还取决于您使用的套接字种类,具有异常网络架构的一些例外。
现在,假设在启动服务器之前先启动客户机。在传统的网络中,我们会看到一个大大的红色失败标志。但是ZeroMQ让我们任意地开始和停止。只要客户机节点执行zmq_connect(),连接就存在,该节点就可以开始向socket写入消息。在某个阶段(希望是在消息排队太多而开始被丢弃或客户机阻塞之前),服务器会启动,执行zmq_bind(),然后ZeroMQ开始传递消息。
一个服务器节点可以绑定到许多端点(即协议和地址的组合),并且它可以使用一个socket来实现这一点。这意味着它将接受跨不同传输的连接:
1 | zmq_bind (socket, "tcp://*:5555"); |
虽然ZeroMQ试图对哪边bind和哪边connect保持中立,但还是有区别的。稍后我们将更详细地看到这些。其结果是,您通常应该将“服务器”视为拓扑的静态部分,它绑定到或多或少固定的端点,而将“客户机”视为动态部分,它们来来去去并连接到这些端点。然后,围绕这个模型设计应用程序。它“正常工作”的可能性要大得多。
2.1.2 发送和接收消息
要发送和接收消息,可以使用zmq_msg_send()和zmq_msg_recv()方法。这些名称都是传统的,但是ZeroMQ的I/O模型与传统的TCP模型有很大的不同,您需要时间来理解它。
让我们来看看TCP sockets和ZeroMQ sockets在处理数据方面的主要区别:
- ZeroMQ套接字像UDP一样携带消息,而不像TCP那样携带字节流。ZeroMQ消息是长度指定的二进制数据。我们很快就会讲到信息;它们的设计是针对性能进行优化的,因此有点棘手。
- ZeroMQ套接字在后台线程中执行I/O。这意味着消息到达本地输入队列并从本地输出队列发送,无论您的应用程序在忙什么。
- 根据socket类型,ZeroMQ sockets具有内置的1对n路由行为。
- zmq_send()方法实际上并不将消息发送到socket connection(s)。它对消息进行排队,以便I/O线程可以异步发送消息。它不会阻塞,除非在某些异常情况下。因此,当zmq_send()返回到应用程序时,不一定要发送消息。
2.1.3 单播传输
ZeroMQ提供了一组单播传输(inproc、ipc、tcp)和组播传输(epgm、pgm)。多播是一种先进的技术,我们将在后面介绍。不要甚至开始使用它,除非你知道你的扇出比例将使1对n单播不可能。
- tcp:对于大多数常见的情况,使用tcp,这是一个断开连接的tcp传输。它有弹性,便于携带,对于大多数情况都足够快。我们称此为断开连接,因为ZeroMQ的tcp传输在连接之前不需要端点的存在。客户端和服务器可以在任何时候连接和绑定,可以切换和返回,并且对应用程序保持透明。
- ipc:进程间ipc传输也是断开连接式(disconnected )的,就像tcp一样。它有一个限制:它还不能在Windows上运行。按照惯例,我们使用带有“.ipc”扩展名,以避免与其他文件名的潜在冲突。在UNIX系统上,如果使用ipc端点,则需要使用适当的权限创建这些端点,否则在不同用户id下运行的进程之间可能无法共享这些端点。您还必须确保所有进程都可以访问这些文件,例如,在相同的工作目录中运行。
- inproc:线程间传输(inproc)是一种连接(connected )的信号传输。它比tcp或ipc快得多。与tcp和ipc相比,这种传输有一个特定的限制:服务器必须在任何客户机发出连接之前发出绑定。这是在Zeromq v4.0和更高版本中修复的
2.1.4 ZeroMQ 不是中立的载体
Zeromq不是中性载体:它对其使用的传输协议施加了框架。这种框架与现有协议不兼容,。例如,比较HTTP请求和Zeromq请求,都是依赖TCP/IP。不能使用zmq实现一个HTTP服务。
HTTP请求使用CR-LF作为最简单的帧分隔符,而ZeroMQ使用长度指定的帧。因此,您可以使用ZeroMQ编写类似http的协议,例如使用请求-应答套接字模式。但它不是HTTP。
2.1.5 I/O Threads
我们说过ZeroMQ在后台线程中执行I/O。一个I/O线程(适用于所有类型socket)对于除最极端的应用程序之外的所有应用程序都是足够的。当您创建一个新的context时,它从一个I/O线程开始。一般的经验法则是,每秒允许1千兆字节(gigabyte ,1GB?)的数据进出一个I/O线程。要增加I/O线程的数量,请在创建任何socket之前使zmq_ctx_set()调用:
1 | int io_threads = 4; |
我们已经看到一个套接字可以同时处理几十个,甚至数千个连接。这对如何编写应用程序有根本性的影响。传统的网络应用程序在每个远程连接上有一个进程或一个线程,该进程或线程处理一个套接字。ZeroMQ允许您将整个结构分解为单个进程,然后根据伸缩性的需要将其分解。
如果您仅将Zeromq用于线程间通信(即,没有外部套接字I/O的多线程应用程序),则可以将I/O线程设置为零。但是,这并不是一个重大的优化,也没什么卵用。
2.2 消息传递模式 Messaging Patterns
内置的核心Zeromq模式是:
- Request-reply: 它将一组客户机连接到一组服务。这是一个远程过程调用和任务分发模式。
- Pub-sub:它将一组发布者连接到一组订阅者。这是一个数据发布模式。
- Pipeline:它以扇出/扇入模式连接节点,该模式可以有多个步骤和循环。这是一个并行的任务分发和收集模式
- Exclusive pair:只连接两个sockets 。这是一个用于连接进程中的两个线程的模式,不要与“普通”sockets 对混淆。
有效的绑定如下(您还将看到对XPUB和XSUB套接字的引用,(它们类似于PUB和SUB的原始版本)。)
- PUB and SUB
- REQ and REP
- REQ and ROUTER (take care, REQ inserts an extra null frame)
- DEALER and REP (take care, REP assumes a null frame)
- DEALER and ROUTER
- DEALER and DEALER
- ROUTER and ROUTER
- PUSH and PULL
- PAIR and PAIR
2.2.1 使用消息(Working with Messages)
实际上,libzmq核心库有两个api来发送和接收消息。zmq send()和zmq recv()方法是简单的一行程序。但zmq recv()不擅长处理任意消息大小:它会将消息截断为您提供的任何缓冲区大小(也就是和缓冲区大小有关)。因此有了第二个与zmq msg t结构一起工作的API,它具有更丰富但更困难的API
- 初始化消息:
zmq_msg_init()
,zmq_msg_init_size()
,zmq_msg_init_data()
. - 发送和接收消息:
zmq_msg_send()
,zmq_msg_recv()
. - 释放消息:
zmq_msg_close()
. - 访问消息内容:
zmq_msg_data()
,zmq_msg_size()
,zmq_msg_more()
. - 使用消息属性:
zmq_msg_get()
,zmq_msg_set()
. - 操作消息:
zmq_msg_copy()
,zmq_msg_move()
.
在网络上,ZeroMQ消息是适合内存的从0开始的任何大小的二进制。可以使用protobuf、msgpack、JSON或自定义实现自己的序列化。
在内存中,ZeroMQ消息是zmq_msg_t结构(或类,取决于您的语言)。下面是在C语言中使用ZeroMQ消息的基本规则:
- 创建并传递zmq_msg_t对象,而不是数据块。
- 要读取消息,可以使用zmq_msg_init()创建一个空消息,然后将其传递给zmq_msg_recv()。
- 要从新数据中编写一条消息,可以使用zmq_msg_init_size()创建一条消息,同时分配某个大小的数据块。然后使用memcpy填充数据,并将消息传递给zmq_msg_send()。
- 要释放(而不是销毁)消息,可以调用zmq_msg_close()。这将删除引用,最终ZeroMQ将销毁消息。
- 要访问消息内容,可以使用zmq_msg_data()。要知道消息包含多少数据,可以使用zmq_msg_size()。
- 不要使用zmq_msg_move()、zmq_msg_copy()或zmq_msg_init_data(),除非您阅读了手册页并确切地知道为什么需要这些。
- 您传递一个消息后zmq_msg_send(),ØMQ将clear 这个消息。同时将大小设置为零。您不能两次发送相同的消息,并且不能在发送消息后访问消息数据。
- 如果您使用zmq_send()和zmq_recv(),而不是消息结构,这些规则将不适用。
如果您希望多次发送相同的消息,并且消息大小相当,那么创建第二个消息,使用zmq_msg_init()初始化它,然后使用zmq_msg_copy()创建第一个消息的副本。这不是复制数据,而是复制引用。然后可以发送消息两次(如果创建了更多副本,则可以发送两次或多次),并且只有在发送或关闭最后一个副本时才最终销毁消息。
ZeroMQ还支持多部分消息,它允许您作为单个在线消息发送或接收帧列表。这在实际应用中被广泛使用,我们将在本章后面的第三章—高级请求-应答模式中看到它。
帧(Frames)在ZeroMQ参考手册页面中也称为消息部件)是ZeroMQ消息的基本线格式。帧是指定长度的数据块。长度向上可以为零。
最初,ZeroMQ消息是一个帧,就像UDP一样。稍后,我们使用多部分消息对此进行了扩展,这些消息非常简单,就是一系列帧,其中“more”位设置为1,然后是一个位设置为0的帧。然后ZeroMQ API允许您编写带有“more”标志的消息,当您读取消息时,它允许您检查是否有“more”
- 消息可以是一个或多个部分。
- 这些部分也被称为“帧”。
- 每个部分都是zmq_msg_t对象。
- 可以在底层API中分别发送和接收每个部分。
- 高级api提供包装器来发送整个多部分消息。
- 您可以发送零长度的消息
- ZeroMQ保证交付消息的所有部分(一个或多个),或者一个也不交付。
- ZeroMQ不会立即发送消息(单个或多个部分),而是在稍后某个不确定的时间。
- 消息(单个或多个部分)必须装入内存。如果您想发送任意大小的文件,应该将它们分成几部分,并将每一部分作为单独的单部分消息发送。使用多部分数据不会减少内存消耗。
- 当接收到消息结束时,必须调用zmq msg close(),当作用域关闭时不会自动销毁对象。发送消息后不能调用此方法。
2.2.2 处理多个Sockets(Handling Multiple Sockets)
在到目前为止的所有例子中,大多数例子的主循环是:
- 1.等待套接字上的消息。
- 2.过程信息。
- 3.重复。
如果我们想同时读取多个端点呢?最简单的方法是将一个socket连接到所有端点,并让ZeroMQ为我们执行扇入。
要同时读取多个sockets,可以使用zmq_poll()。
下面是一个使用非阻塞读取从两个sockets读取的简单示例,这种方法的代价是第一个消息上的一些额外延迟。在亚毫秒延迟至关重要的应用程序中。
1 | //msreader: Multiple socket reader in C |
您可以通过先读取一个套接字,然后读取第二个套接字来公平地对待套接字,而不是像上面中所做的那样对它们进行优先级排序。使用zmq_poll():
1 | mspoller: Multiple socket poller in C |
2.2.3 多部分消息(Multipart Messages)
ZeroMQ允许我们用几个帧(frame)组成一个消息(message)
当您处理多部分消息时,每个部分都是一个zmq msg项。例如,如果你发送一个包含五个部分的消息,你必须构造、发送和销毁五个zmq msg项。
下面是我们如何在多部分消息中发送帧(我们将每一帧接收到一个消息对象)
1 | zmq_msg_send (&message, socket, ZMQ_SNDMORE); |
下面是我们如何接收和处理消息中的所有部分,无论是单个部分还是多个部分
1 | while (1) { |
- 当您发送一个多部分消息时,第一部分(以及所有后续部分)只有在您发送最后一部分时才实际通过网络发送。
- 如果您正在使用zmq_poll(),当您接收到消息的第一部分时,其他部分也都已经到达。
- 您将接收到消息的所有部分,或者完全不接收。
- 消息的每个部分都是一个单独的zmq_msg项。
- 无论是否选中more属性,都将接收消息的所有部分。
- 发送时,ZeroMQ将消息帧在内存中排队,直到最后一个消息帧被接收,然后将它们全部发送出去。
- 除了关闭套接字外,无法取消部分发送的消息。
2.2.4 中介和代理 Intermediaries and Proxies
在ZeroMQ中,我们根据上下文将这些代理、队列、转发器、设备或代理称为代理。
2.2.5 动态发现 The Dynamic Discovery Problem
动态发现有几种解决方案。最简单的方法是通过硬编码(或配置)网络体系结构来完全避免这种情况,以便手工完成发现。也就是说,当您添加一个新片段时,您将重新配置网络以了解它。例如下面图12-小规模的Pub-Sub网络
在实践中,这会导致越来越脆弱和笨拙的架构。假设你有一个出版商和100个订阅者。通过在每个订阅者中配置发布者端点,可以将每个订阅者连接到发布者。那也是件很轻松的事。用户是动态的;出版商是静态的。现在假设你添加了更多的出版商。突然,事情变得不那么容易了。如果你继续将每个订阅者与每个发布者联系起来,避免动态发现的成本就会越来越高。
对此有很多答案,但最简单的答案是添加中介;也就是说,网络中所有其他节点都连接到的一个静态点。在传统的消息传递中,这是消息代理的工作。ZeroMQ没有提供这样的消息代理,但是它让我们可以很容易地构建中介。
您可能想知道,如果所有网络最终都变得足够大,需要中介体,那么为什么不为所有应用程序设置一个message broker呢?对于初学者来说,这是一个公平的妥协。只要始终使用星型拓扑结构,忘记性能,事情就会正常工作。然而,消息代理是贪婪的;作为中央中介人,它们变得太复杂、太有状态,最终成为一个问题。
最好将中介看作简单的无状态消息交换机。一个很好的类比是HTTP代理;它在那里,但没有任何特殊的作用。在我们的示例中,添加一个 pub-sub代理解决了动态发现问题。我们在网络的“中间”设置代理。代理打开一个XSUB套接字、一个XPUB套接字,并将每个套接字绑定到已知的IP地址和端口。然后,所有其他进程都连接到代理,而不是彼此连接。添加更多订阅者或发布者变得很简单。
我们需要XPUB和XSUB套接字,因为ZeroMQ从订阅者到发布者执行订阅转发。XSUB和XPUB与SUB和PUB完全一样,只是它们将订阅公开为特殊消息。代理必须通过从XPUB套接字读取这些订阅消息并将其写入XSUB套接字,从而将这些订阅消息从订阅方转发到发布方。这是XSUB和XPUB的主要用例。
2.2.6 共享队列Shared Queue (DEALER and ROUTER sockets)
在Hello World客户机/服务器应用程序中,我们有一个客户机与一个服务通信。然而,在实际情况中,我们通常需要允许多个服务和多个客户机。这让我们可以扩展服务的功能(许多线程、进程或节点,而不是一个)。唯一的限制是服务必须是无状态的,所有状态都在请求中,或者在一些共享存储(如数据库)中。
有两种方法可以将多个客户机连接到多个服务器。蛮力方法是将每个客户端套接字连接到多个服务端点。一个客户端套接字可以连接到多个服务套接字,然后REQ套接字将在这些服务之间分发请求。如上图15,假设您将一个客户端套接字连接到三个服务端点;客户机请求R1、R2、R3、R4。R1和R4进入服务A,R2转到B,R3转到服务C。并不是所有的服务都收到R1、R2、R3、R4。详细可以参考测试代码1-reqrep/2文件夹。
这种设计可以让您更便宜地添加更多的客户端。您还可以添加更多的服务。每个客户端将其请求分发给服务。但是每个客户机都必须知道服务拓扑。如果您有100个客户机,然后决定再添加3个服务,那么您需要重新配置并重新启动100个客户机,以便客户机了解这3个新服务。
理想情况下,我们应该能够在任何时候添加和删除服务或客户机,而不需要触及拓扑的任何其他部分。
因此,我们将编写一个小消息队列代理来提供这种灵活性。代理绑定到两个端点,一个用于客户机的前端,一个用于服务的后端。然后,它使用zmq_poll()监视这两个sockets 的活动,当它有一些活动时,它在它的两个sockets 之间传递消息。它实际上并不明确地管理任何队列—zeromq在每个sockets 上自动管理队列。
当您使用REQ与REP对话时,您将得到一个严格同步的请求-应答对话框。客户端发送一个请求。服务读取请求并发送响应。然后客户端读取应答。如果客户机或服务尝试执行其他操作(例如,在不等待响应的情况下连续发送两个请求),它们将得到一个错误。
但是我们的代理必须是非阻塞的。显然,我们可以使用zmq_poll()来等待两个socket上的活动,但是不能使用REP和REQ。
幸运的是,有两个名为DEALER和ROUTER的socket允许您执行非阻塞的请求-响应。在高级请求-应答模式中,您将看到DEALER和ROUTER套接字如何让您构建各种异步请求-应答流。现在,我们只需要看看DEALER 和ROUTER 如何让我们扩展REQ-REP跨一个中介,也就是我们的小broker。
在这个简单的扩展请求-应答模式中,REQ与ROUTER 对话,而DEALER 与REP对话。在DEALER 与ROUTER 之间,我们必须有代码(就像我们的broker一样)将消息从一个socket 中提取出来,并将它们推送到另一个socket 中。
request-reply broker绑定到两个端点,一个用于clients 连接(前端socket),另一个用于workers 连接(后端)。要测试此broker,您需要更改workers ,以便他们连接到后端socket。这是一个client ,如图17
1 | //rrclient: Request-reply client in C |
Here is the worker:
1 | //rrworker: Request-reply worker in C |
这是代理,它可以正确地处理多部分消息:
1 | //rrbroker: Request-reply broker in C |
使用请求-应答代理可以使客户机/服务器体系结构更容易伸缩,因为客户机看不到worker,而worker也看不到客户机。唯一的静态节点是中间的代理。
2.2.7 ZeroMQ的内置代理函数 ZeroMQ’s Built-In Proxy Function
原来,上一节的rrbroker中的核心循环非常有用,并且可以重用。它让我们可以毫不费力地构建pub-sub转发器和共享队列以及其他小型中介。ZeroMQ将其封装在一个方法中,zmq_proxy()
:
1 | zmq_proxy (frontend, backend, capture); |
必须正确地连接、绑定和配置这两个(或者三个sockets,如果我们想捕获数据的话)。当我们调用zmq_proxy方法时,就像启动rrbroker的主循环一样。让我们重写 request-reply broker来调用zmq_proxy,并将其重新标记为一个听起来很昂贵的“消息队列”(人们已经为执行更少的代码向house收费):
1 | msgqueue: Message queue broker in C |
2.2.8 传输桥接 Transport Bridging
ZeroMQ用户经常会问,“我如何将我的ZeroMQ网络与技术X连接起来?”其中X是其他网络或消息传递技术。
答案很简单,就是建一座桥。桥接是一个小应用程序,它在一个socket上讲一个协议,并在另一个套接字上转换成 to/from第二个协议。协议解释器,如果你喜欢的话。ZeroMQ中常见的桥接问题是桥接两个传输或网络。
例如,我们将编写一个小代理,它位于发布者和一组订阅者之间,连接两个网络。前端socket (SUB)面向气象服务器所在的内部网络,后端(PUB)面向外部网络上的订阅者。它订阅前端socket 上的天气服务,并在后端socket 上重新发布数据。
1 | //wuproxy: Weather update proxy in C |
它看起来与前面的代理示例非常相似,但关键部分是前端和后端sockets 位于两个不同的网络上。例如,我们可以使用这个模型将组播网络(pgm传输)连接到tcp publisher。
2.3 处理错误
上面的C示例中都没有错误处理。有一些简单的规则,从POSIX约定开始:
- 如果创建对象的方法失败,则返回NULL。
- 处理数据的方法可能返回已处理的字节数,或在出现错误或故障时返回-1。
- 其他方法在成功时返回0,在错误或失败时返回-1。
- 错误代码在errno或zmq_errno()中提供。
- zmq_strerror()提供了用于日志记录的描述性错误文本。
1 | void *context = zmq_ctx_new (); |
有两个主要的例外情况,你应该作为非致命的处理:
- 当您的代码接收到带有ZMQ_DONTWAIT选项的消息并且没有等待的数据时,ZeroMQ将返回-1并再次将errno设置为EAGAIN。
- 当一个线程调用zmq_ctx_destroy(),而其他线程仍在执行阻塞工作时,zmq_ctx_destroy()调用关闭上下文,所有阻塞调用都以-1退出,errno设置为ETERM。