ZeroMQ,作为一个消息通信的框架,最重要的还是通信的可靠性,而这其中最重要的就是连接断开之后的重连接机制。在这里,我们分别分析一下ZMQ的连接建立过程和重连接过程。
回顾
在分析之前,我们先来回顾一下SocketBase、SessionBase和Poller这几个类的关系。
ZeroMQ是一个消息传输的框架,介于应用层和传输层之间,其内部将复杂的通讯操作封装了一系列的API供应用层调用。在实际使用中,我们会在应用层直接实例化一个Req、Rep、Pull等包装后的Socket对象,而这些Socket对象的父类其实就是SocketBase,SocketBase的内部维护了一个pipe,它是用于SocketBase与SessionBase之间传递消息的双向消息管道(inpipe和outpipe)。
而SessionBase则是创建SocketChannel并与目标终端进行连接的地方,是与底层Poller最先进行交互的一层(通过StreamEngine进行交互),其内部实现超时重连和断线重连等功能。
StreamEngine这是整个框架中真正和网络交互的类,是基于状态机模式实现的。在连接建立的过程中,当TCP三次握手后,StreamEngine会再一次和另一端握手,这是ZMQ应用层自己的握手协议,握手成功后,就进行msg的传递了。
Poller:Poller是框架中与最底层交互的类,负责真正处理命令和数据的send/recv。
所以我们可以大概知道,连接的建立是socket.connect()开始的,也就是最终交由SocketBase去处理,与重连机制则是由SessionBase实现的。
连接建立过程的分析
现在我们先来看看SocketBase中定义的connect方法:
|
|
我们主要关注TCP连接的建立,毕竟在分布式的环境下还是在用TCP、我们知道一个Socket下面可能对应了多个连接(why???),而每一个连接其实对应的是一个StreamEngine对象,而每一个StreamEngine对象又关联了一个Session对象,用于与上层的Socket之间的交互。在connect()这里这段代码的主要事情就是创建Session对象以及pipe对象,接着调用add_endpoint方法部署session和pipe,所以我们再看看add_endpoint里面做了什么:
|
|
TCP的连接过程中,将会构建TCPConnector,在构建TCPConnector时可以设置这个连接的建立是否需要延时,而在重连接的时候,会自动使用延迟连接的TCPConnector。到这里可以得知,ZMQ对于具体连接的建立,其实是委托给TCPConnector对象来做的,这是一个工具类,它建立连接的大致过程如下:
- [1] 创建一个SocketChannel对象,并将其设置为非阻塞的,然后调用connect方法来建立与远程地址的连接;
- [2] 将SocketChannel注册到IO线程的Poller上去,并要设置connect事件;
- [3] 对于connect事件的回调要做的事情,其实是在poller对象上解除这个socketchannel的注册,然后创建一个新的StreamEngine对象来重新封装这个建立好连接的SocketChannel,然后再将这个StreamEngine对象与刚刚的session对象绑定起来,同时还将这个StreamEngine绑定到IO线程上,也就是在Poller上注册。由此,Session和Poller就可以通过StreamEngine来通讯了。
至此,ZMQ的连接就建立完成了。
重连机制的分析
在分析重连机制前,我们先看看连接断开之后会发生些什么。首先要知道,如果连接已经断开,那么在channel上read将会返回-1,而当底层的channel有数据可以读取的时候将执行StreamEngine和in_event回调。所以我们看看在read返回-1之后,in_event都做了些什么:
|
|
当返回-1后,in_event中会执行error方法:
|
|
如果底层的连接断开了,那么当前的这个channel和streamEngine对象也就无效了,所以我们就要通同上层socket连接断开,告诉session重连,取消在poller上的注册,销毁当前的对象等等……
下面我们再重点看看我们通过engine_error()通知session后,session是如何实现重连的:
|
|
可以看到session释放了当前的streamEngine对象,并且清空了pipe中没有处理完的消息,在timeout_error和connection_error的错误下就执行reconnect()方法,进行重新连接:
|
|
这里最核心的一句就是start_connecting(true)了,也就是我们最开始在讲连接建立的过程分析中的一个方法,不过这里传入的是true,所以这里是延迟进行连接的,后面流程跟上面一样,就不再讲了。