源码分析-ZeroMQ连接的建立与重连机制

ZeroMQ,作为一个消息通信的框架,最重要的还是通信的可靠性,而这其中最重要的就是连接断开之后的重连接机制。在这里,我们分别分析一下ZMQ的连接建立过程和重连接过程。

回顾

在分析之前,我们先来回顾一下SocketBase、SessionBase和Poller这几个类的关系。

ZeroMQ是一个消息传输的框架,介于应用层和传输层之间,其内部将复杂的通讯操作封装了一系列的API供应用层调用。在实际使用中,我们会在应用层直接实例化一个Req、Rep、Pull等包装后的Socket对象,而这些Socket对象的父类其实就是SocketBase,SocketBase的内部维护了一个pipe,它是用于SocketBase与SessionBase之间传递消息的双向消息管道(inpipe和outpipe)。

而SessionBase则是创建SocketChannel并与目标终端进行连接的地方,是与底层Poller最先进行交互的一层(通过StreamEngine进行交互),其内部实现超时重连和断线重连等功能。

StreamEngine这是整个框架中真正和网络交互的类,是基于状态机模式实现的。在连接建立的过程中,当TCP三次握手后,StreamEngine会再一次和另一端握手,这是ZMQ应用层自己的握手协议,握手成功后,就进行msg的传递了。

Poller:Poller是框架中与最底层交互的类,负责真正处理命令和数据的send/recv。

所以我们可以大概知道,连接的建立是socket.connect()开始的,也就是最终交由SocketBase去处理,与重连机制则是由SessionBase实现的。

连接建立过程的分析

现在我们先来看看SocketBase中定义的connect方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
int zmq::socket_base_t::connect (const char *addr_)
{
scoped_optional_lock_t sync_lock(thread_safe ? &sync : NULL);
if (unlikely (ctx_terminated)) {
errno = ETERM;
return -1;
}
// Process pending commands, if any.
int rc = process_commands (0, false);
if (unlikely (rc != 0)) {
return -1;
}
// Parse addr_ string.
std::string protocol;
std::string address;
if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
return -1;
}
if (protocol == "inproc") { // 进程内部的通信
// 略......
}
// 检查是否是单连接
bool is_single_connect = (options.type == ZMQ_DEALER ||
options.type == ZMQ_SUB ||
options.type == ZMQ_REQ);
if (unlikely (is_single_connect)) {
const endpoints_t::iterator it = endpoints.find (addr_);
if (it != endpoints.end ()) {
// There is no valid use for multiple connects for SUB-PUB nor
// DEALER-ROUTER nor REQ-REP. Multiple connects produces
// nonsensical results.
return 0;
}
}
// Choose the I/O thread to run the session in.
io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
errno = EMTHREAD;
return -1;
}
// 创建address对象
address_t *paddr = new (std::nothrow) address_t (protocol, address, this->get_ctx ());
alloc_assert (paddr);
// Resolve address (if needed by the protocol)
if (protocol == "tcp") { // tcp连接
const char *check = address.c_str ();
if (isalnum (*check) || isxdigit (*check) || *check == '[' || *check == ':') {
check++;
while (isalnum (*check)
|| isxdigit (*check)
|| *check == '.' || *check == '-' || *check == ':' || *check == '%'
|| *check == ';' || *check == '[' || *check == ']' || *check == '_'
|| *check == '*'
) {
check++;
}
}
// Assume the worst, now look for success
rc = -1;
// Did we reach the end of the address safely?
if (*check == 0) {
// Do we have a valid port string? (cannot be '*' in connect
check = strrchr (address.c_str (), ':');
if (check) {
check++;
if (*check && (isdigit (*check)))
rc = 0; // Valid
}
}
if (rc == -1) {
errno = EINVAL;
LIBZMQ_DELETE(paddr);
return -1;
}
// Defer resolution until a socket is opened
paddr->resolved.tcp_addr = NULL;
}
// ......
// 处理其他的通信模式,如ipc、udp、pgm等等
// ......
// Create session.
// 创建session, 第一参数是当前session将会依附的IO线程,第二个参数表示需要主动建立连接
session_base_t *session = session_base_t::create (io_thread, true, this,
options, paddr);
errno_assert (session);
// PGM does not support subscription forwarding; ask for all data to be
// sent to this pipe. (same for NORM, currently?)
bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp";
pipe_t *newpipe = NULL;
// 创建pipe的关联,连接session与当前的socket
if (options.immediate != 1 || subscribe_to_all) {
// Create a bi-directional pipe.
object_t *parents [2] = {this, session};
pipe_t *new_pipes [2] = {NULL, NULL};
bool conflate = options.conflate &&
(options.type == ZMQ_DEALER ||
options.type == ZMQ_PULL ||
options.type == ZMQ_PUSH ||
options.type == ZMQ_PUB ||
options.type == ZMQ_SUB);
int hwms [2] = {conflate? -1 : options.sndhwm,
conflate? -1 : options.rcvhwm};
bool conflates [2] = {conflate, conflate};
rc = pipepair (parents, new_pipes, hwms, conflates);
errno_assert (rc == 0);
// Attach local end of the pipe to the socket object.
// 将第一个pipe与当前socket关联
attach_pipe (new_pipes [0], subscribe_to_all);
newpipe = new_pipes [0];
// Attach remote end of the pipe to the session object later on.
// 将另外一个pipe与session关联起来,这样session与socket就能够通过pipe通信了
session->attach_pipe (new_pipes [1]);
}
// Save last endpoint URI
paddr->to_string (last_endpoint);
// 将这个session与这个地址关联起来
add_endpoint (addr_, (own_t *) session, newpipe);
return 0;
}

我们主要关注TCP连接的建立,毕竟在分布式的环境下还是在用TCP、我们知道一个Socket下面可能对应了多个连接(why???),而每一个连接其实对应的是一个StreamEngine对象,而每一个StreamEngine对象又关联了一个Session对象,用于与上层的Socket之间的交互。在connect()这里这段代码的主要事情就是创建Session对象以及pipe对象,接着调用add_endpoint方法部署session和pipe,所以我们再看看add_endpoint里面做了什么:

1
2
3
4
5
6
7
// 这里的addr、session和pipe记录当前建立的连接的所有信息
void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe)
{
// Activate the session. Make it a child of this socket.
launch_child (endpoint_); // 部署这个endpoint,这里主要的是将这个endpoint加入到IO线程
endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_pipe_t (endpoint_, pipe)));
}

TCP的连接过程中,将会构建TCPConnector,在构建TCPConnector时可以设置这个连接的建立是否需要延时,而在重连接的时候,会自动使用延迟连接的TCPConnector。到这里可以得知,ZMQ对于具体连接的建立,其实是委托给TCPConnector对象来做的,这是一个工具类,它建立连接的大致过程如下:

  • [1] 创建一个SocketChannel对象,并将其设置为非阻塞的,然后调用connect方法来建立与远程地址的连接;
  • [2] 将SocketChannel注册到IO线程的Poller上去,并要设置connect事件;
  • [3] 对于connect事件的回调要做的事情,其实是在poller对象上解除这个socketchannel的注册,然后创建一个新的StreamEngine对象来重新封装这个建立好连接的SocketChannel,然后再将这个StreamEngine对象与刚刚的session对象绑定起来,同时还将这个StreamEngine绑定到IO线程上,也就是在Poller上注册。由此,Session和Poller就可以通过StreamEngine来通讯了。

至此,ZMQ的连接就建立完成了。

重连机制的分析

在分析重连机制前,我们先看看连接断开之后会发生些什么。首先要知道,如果连接已经断开,那么在channel上read将会返回-1,而当底层的channel有数据可以读取的时候将执行StreamEngine和in_event回调。所以我们看看在read返回-1之后,in_event都做了些什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
void zmq::stream_engine_t::in_event ()
{
zmq_assert (!io_error);
// If still handshaking, receive and process the greeting message.
if (unlikely (handshaking))
if (!handshake ())
return;
zmq_assert (decoder);
// If there has been an I/O error, stop polling.
if (input_stopped) {
rm_fd (handle);
io_error = true;
return;
}
// If there's no data to process in the buffer...
if (!insize) {
// Retrieve the buffer and read as much data as possible.
// Note that buffer can be arbitrarily large. However, we assume
// the underlying TCP layer has fixed buffer size and thus the
// number of bytes read will be always limited.
size_t bufsize = 0;
// 从解码器里面获取buf,用于写入读取的数据,因为已经设置了底层socket的TCP接收缓冲区的大小
decoder->get_buffer (&inpos, &bufsize);
const int rc = tcp_read (s, inpos, bufsize);// 读取接收到的数据
if (rc == 0) { // 如果为0,表示底层的socket连接被关闭
// connection closed by peer
errno = EPIPE;
error (connection_error);
return;
}
if (rc == -1) { // 如果为-1,表示底层的socket连接已经出现了问题
if (errno != EAGAIN)
error (connection_error);
return;
}
// Adjust input size
insize = static_cast <size_t> (rc);
// Adjust buffer size to received bytes
decoder->resize_buffer(insize);
}
int rc = 0;
size_t processed = 0;
while (insize > 0) {
rc = decoder->decode (inpos, insize, processed);// 解析读取到的数据
zmq_assert (processed <= insize);
inpos += processed;
insize -= processed;// 还没有处理的数据的大小
if (rc == 0 || rc == -1)
break;
rc = (this->*process_msg) (decoder->msg ());
if (rc == -1)
break;
}
// Tear down the connection if we have failed to decode input data
// or the session has rejected the message.
if (rc == -1) {
if (errno != EAGAIN) {
error(protocol_error);
return;
}
input_stopped = true;
reset_pollin (handle);
}
session->flush (); // 将decoder解析出来的数据交给session
}

当返回-1后,in_event中会执行error方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void zmq::stream_engine_t::error (error_reason_t reason)
{
if (options.raw_socket && options.raw_notify) {
// For raw sockets, send a final 0-length message to the application
// so that it knows the peer has been disconnected.
msg_t terminator;
terminator.init();
(this->*process_msg) (&terminator);
terminator.close();
}
zmq_assert (session);
#ifdef ZMQ_BUILD_DRAFT_API
int err = errno;
if (mechanism == NULL) {
if (reason == protocol_error)
socket->event_handshake_failed_zmtp (endpoint, err);
else
socket->event_handshake_failed_no_detail (endpoint, err);
} else if (mechanism->status () == mechanism_t::handshaking) {
if (mechanism->error_detail () == mechanism_t::zmtp)
socket->event_handshake_failed_zmtp (endpoint, err);
else if (mechanism->error_detail () == mechanism_t::zap)
socket->event_handshake_failed_zap (endpoint, err);
else if (mechanism->error_detail () == mechanism_t::encryption)
socket->event_handshake_failed_encryption (endpoint, err);
else
socket->event_handshake_failed_no_detail (endpoint, err);
}
#endif
socket->event_disconnected (endpoint, (int) s);// 通知上层的socket
session->flush ();
session->engine_error (reason); // 通知session释放当前的engine和pipe,然后重新连接
unplug (); // 取消在poller上面的注册
delete this; // 关闭底层的channel,关闭当前
}

如果底层的连接断开了,那么当前的这个channel和streamEngine对象也就无效了,所以我们就要通同上层socket连接断开,告诉session重连,取消在poller上的注册,销毁当前的对象等等……

下面我们再重点看看我们通过engine_error()通知session后,session是如何实现重连的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void zmq::session_base_t::engine_error (
zmq::stream_engine_t::error_reason_t reason)
{
// Engine is dead. Let's forget about it.
engine = NULL; // 释放当前engine对象
// Remove any half-done messages from the pipes.
if (pipe)
clean_pipes (); // 清除pipe中没处理完的msg
zmq_assert (reason == stream_engine_t::connection_error
|| reason == stream_engine_t::timeout_error
|| reason == stream_engine_t::protocol_error);
switch (reason) {
case stream_engine_t::timeout_error: // 连接超时的错误
case stream_engine_t::connection_error: // 连接断开的错误
if (active)
reconnect (); // 尝试重连
else
terminate ();
break;
case stream_engine_t::protocol_error:
terminate ();
break;
}
// Just in case there's only a delimiter in the pipe.
if (pipe)
pipe->check_read ();
if (zap_pipe)
zap_pipe->check_read ();
}

可以看到session释放了当前的streamEngine对象,并且清空了pipe中没有处理完的消息,在timeout_error和connection_error的错误下就执行reconnect()方法,进行重新连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void zmq::session_base_t::reconnect ()
{
// For delayed connect situations, terminate the pipe
// and reestablish later on
if (pipe && options.immediate == 1
&& addr->protocol != "pgm" && addr->protocol != "epgm"
&& addr->protocol != "norm" && addr->protocol != "udp") {
pipe->hiccup ();
pipe->terminate (false);
terminating_pipes.insert (pipe);
pipe = NULL;
if (has_linger_timer) {
cancel_timer (linger_timer_id);
has_linger_timer = false;
}
}
reset ();
// Reconnect.
if (options.reconnect_ivl != -1)
start_connecting (true);
// For subscriber sockets we hiccup the inbound pipe, which will cause
// the socket object to resend all the subscriptions.
if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB || options.type == ZMQ_DISH))
pipe->hiccup ();
}

这里最核心的一句就是start_connecting(true)了,也就是我们最开始在讲连接建立的过程分析中的一个方法,不过这里传入的是true,所以这里是延迟进行连接的,后面流程跟上面一样,就不再讲了。