武士岚2中文内置菜单版
118.33 MB · 2025-11-30
/**
* 本处理器负责处理Socket中的帧,比如从输入流中读取帧数据,或者将帧数据写入到输出流中
*/
public class SocketFrameHandler implements FrameHandler {
/** 底层的JDK的Socket */
private final Socket _socket;
/** 本组件在关闭Socket前,会先对输出流执行刷新操作;可以通过该线程池来异步执行刷新任务 */
private final ExecutorService _shutdownExecutor;
/** Socket底层的输入输出流 */
private final DataInputStream _inputStream;
private final DataOutputStream _outputStream;
/** 这个常量代表等待异步刷新任务执行完成的超时时间 */
public static final int SOCKET_CLOSING_TIMEOUT = 1;
public SocketFrameHandler(Socket socket) throws IOException {
this(socket, null);
}
public SocketFrameHandler(Socket socket, ExecutorService shutdownExecutor) throws IOException {
_socket = socket;
_shutdownExecutor = shutdownExecutor;
_inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
_outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
}
public DataInputStream getInputStream() {
return _inputStream;
}
// 本类的如下方法都是在调用this._socket的相应方法,因此省略:
// getAddress()、getLocalAddress()、getPort()、getLocalPort()、setTimeout()、getTimeout()
/**
* 发送协议头;将本客户端使用的AMQP版本号发送给服务端,与服务端协商AMQP版本
*/
public void sendHeader(int major, int minor, int revision) throws IOException {
synchronized (_outputStream) {
_outputStream.write("AMQP".getBytes("US-ASCII"));
_outputStream.write(0);
_outputStream.write(major);
_outputStream.write(minor);
_outputStream.write(revision);
try {
_outputStream.flush();
} catch (SSLHandshakeException e) {
LOGGER.error("TLS connection failed: {}", e.getMessage());
throw e;
}
}
}
/**
* AMQP协议规定,当连接建立后,客户端需要给服务端发送一个协议头,以协商AMQP版本
* 这里发送的协议头数据是客户端发送的唯一不是以帧格式组织起来的数据
*/
@Override
public void sendHeader() throws IOException {
sendHeader(AMQP.PROTOCOL.MAJOR, AMQP.PROTOCOL.MINOR, AMQP.PROTOCOL.REVISION);
if (this._socket instanceof SSLSocket) {
TlsUtils.logPeerCertificateInfo(((SSLSocket) this._socket).getSession());
}
}
/**
* 重要方法:当连接建立后,调用本方法,对该连接进行初始化
* 这里会启动连接内部的MainLoop线程,该线程会不断处理服务端发送过来的数据
*/
@Override
public void initialize(AMQConnection connection) {
connection.startMainLoop();
}
@Override
public Frame readFrame() throws IOException {
synchronized (_inputStream) {
return Frame.readFrom(_inputStream);
}
}
@Override
public void writeFrame(Frame frame) throws IOException {
synchronized (_outputStream) {
frame.writeTo(_outputStream);
}
}
@Override
public void flush() throws IOException {
_outputStream.flush();
}
@Override
public void close() {
try { _socket.setSoLinger(true, SOCKET_CLOSING_TIMEOUT); } catch (Exception _e) {}
// 将刷新操作包装成一个Callable任务
Callable<Void> flushCallable = new Callable<Void>() {
@Override
public Void call() throws Exception {
flush();
return null;
}
};
// 如果没有异步线程池,则同步执行Callable任务
// 否则,将Callable任务提交给线程池,并等待其执行完成;最多等待SOCKET_CLOSING_TIMEOUT秒
Future<Void> flushTask = null;
try {
if (this._shutdownExecutor == null) {
flushCallable.call();
} else {
flushTask = this._shutdownExecutor.submit(flushCallable);
flushTask.get(SOCKET_CLOSING_TIMEOUT, TimeUnit.SECONDS);
}
} catch (Exception e) {
if (flushTask != null) {
flushTask.cancel(true);
}
}
// 关闭Socket,忽略掉可能出现的异常
try { _socket.close(); } catch (Exception _e) {}
}
}
/**
* 连接工厂,负责创建AMQP连接
*/
public class ConnectionFactory implements Cloneable {
/**
* 创建一条AMQP连接;本类的其它newConnection()方法最终都会调用到本方法
*/
public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName)
throws IOException, TimeoutException {
// 必要时初始化指标收集器(用于记录各项指标,如总共创建了多少条连接)
if (this.metricsCollector == null) {
this.metricsCollector = new NoOpMetricsCollector();
}
// 创建一个FrameHandler工厂;在BIO模式下,这里创建的是SocketFrameHandlerFactory实例
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
// 将本类相关字段的值打包到ConnectionParams实例中,并额外添加连接名称
ConnectionParams params = params(executor);
if (clientProvidedName != null) {
Map<String, Object> properties = new HashMap<>(params.getClientProperties());
properties.put("connection_name", clientProvidedName);
params.setClientProperties(properties);
}
if (isAutomaticRecoveryEnabled()) {
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);
conn.init();
return conn;
} else {
// 获取到用户指定的服务器地址(可指定多个),并依次尝试与这些地址建立连接,并返回第一个创建成功的连接
List<Address> addrs = addressResolver.getAddresses();
Exception lastException = null;
for (Address addr : addrs) {
try {
// 在BIO模式下,这里的fhFactory是SocketFrameHandlerFactory实例;此时create()逻辑为:
// 1. 创建一个客户端Socket,并通过底层的SocketConfigurator组件(来源于this.socketConf)来配置该Socket
// 2. 用该Socket连接上目标地址,并将该Socket包装成SocketFrameHandler实例
// 这里的SocketFrameHandler实例底层的异步任务执行器来源于this.shutdownExecutor
FrameHandler handler = fhFactory.create(addr, clientProvidedName);
// 根据FrameHandler组件创建一条AMQP连接,并调用其start()方法(重点!)
AMQConnection conn = createConnection(params, handler, metricsCollector);
conn.start();
// 记录连接创建事件,并返回该连接
this.metricsCollector.newConnection(conn);
return conn;
} catch (IOException e) {
lastException = e;
} catch (TimeoutException te) {
lastException = te;
}
}
// 如果尝试了所有目标地址都没有连接成功,则抛出最后一个出现的异常
if (lastException != null) {
if (lastException instanceof IOException) {
throw (IOException) lastException;
} else if (lastException instanceof TimeoutException) {
throw (TimeoutException) lastException;
}
}
throw new IOException("failed to connect");
}
}
}
public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {
/**
* 启动AMQP连接;参考AMQP规范文档的《2.2.4 The Connection Class》小节
*/
public void start() throws IOException, TimeoutException {
initializeConsumerWorkService();
// 初始化HeartbeatSender组件(即this._heartbeatSender)
// 这里仅仅是进行字段赋值,并未真正启动HeartbeatSender组件
initializeHeartbeatSender();
// 将MainLoop线程的运行flag置为true;MainLoop线程通过该flag判断是否需要继续运行
this._running = true;
// 根据AMQP规范文档,客户端在发送协议头后,服务端会返回一个connection.start响应
// 这里相当于给0号通道注册了一个CompletableFuture实例(connStartBlocker),用于接收服务端的connection.start响应
AMQChannel.SimpleBlockingRpcContinuation connStartBlocker = new AMQChannel.SimpleBlockingRpcContinuation();
_channel0.enqueueRpc(connStartBlocker);
// 设置超时时间,并发送协议头
try {
_frameHandler.setTimeout(handshakeTimeout);
_frameHandler.sendHeader();
} catch (IOException ioe) {
_frameHandler.close();
throw ioe;
}
// 之前提过,这里其实是调用了本类的startMainLoop()方法,启动了MainLoop线程
// MainLoop线程负责处理服务端发送过来的帧;当收到connection.start响应后,会将其设置到connStartBlocker中
this._frameHandler.initialize(this);
AMQP.Connection.Start connStart;
AMQP.Connection.Tune connTune = null;
try {
// 等待服务端的connection.start响应(该响应包含服务端属性、服务端AMQP协议版本等信息)
connStart = (AMQP.Connection.Start) connStartBlocker.getReply(handshakeTimeout / 2).getMethod();
// 将服务端属性保存起来,并判断能否兼容服务端的AMQP协议版本,不能则报错
_serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());
Version serverVersion = new Version(connStart.getVersionMajor(), connStart.getVersionMinor());
if (!Version.checkVersion(clientVersion, serverVersion)) {
throw new ProtocolVersionMismatchException(clientVersion, serverVersion);
}
// 省略后续的代码,其主要逻辑为:
// 1. 客户端选择一种支持的认证机制,并将其打包成connection.start-ok消息发送出去
// 2. 服务端返回connection.secure消息
// 3. 客户端获取到用户名和密码,并将其打包成connection.secure-ok消息发送出去进行认证
// 4. 服务端返回connection.tune消息,其中包含最大通道数量、最大帧长、心跳包发送的间隔时间等信息
} catch (TimeoutException te) {
_frameHandler.close();
throw te;
} catch (ShutdownSignalException sse) {
_frameHandler.close();
throw AMQChannel.wrap(sse);
} catch(IOException ioe) {
_frameHandler.close();
throw ioe;
}
try {
// 协商最大通道数量(即取客户端和服务端中的较小值),0代表不限制;然后初始化通道管理器
int negotiatedChannelMax = negotiateChannelMax(this.requestedChannelMax, connTune.getChannelMax());
int channelMax = ConnectionFactory.ensureUnsignedShort(negotiatedChannelMax);
if (channelMax != negotiatedChannelMax) {
LOGGER.warn("Channel max must be between 0 and {}, value has been set to {} instead of {}",
MAX_UNSIGNED_SHORT, channelMax, negotiatedChannelMax);
}
_channelManager = instantiateChannelManager(channelMax, threadFactory);
// 协商最大帧长、心跳包发送的间隔时间
int frameMax = negotiatedMaxValue(this.requestedFrameMax, connTune.getFrameMax());
this._frameMax = frameMax;
int negotiatedHeartbeat = negotiatedMaxValue(this.requestedHeartbeat, connTune.getHeartbeat());
int heartbeat = ConnectionFactory.ensureUnsignedShort(negotiatedHeartbeat);
if (heartbeat != negotiatedHeartbeat) {
LOGGER.warn("Heartbeat must be between 0 and {}, value has been set to {} instead of {}",
MAX_UNSIGNED_SHORT, heartbeat, negotiatedHeartbeat);
}
// 这里会启动HeartbeatSender组件,定时发送心跳包
setHeartbeat(heartbeat);
// 发送connection.tune-ok消息(附带上协商结果)
_channel0.transmit(new AMQP.Connection.TuneOk.Builder()
.channelMax(channelMax)
.frameMax(frameMax)
.heartbeat(heartbeat)
.build());
// 发送connection.open请求(附带上要操作的vHost),然后等待服务器返回connection.open-ok响应
_channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder().virtualHost(_virtualHost).build());
} catch (IOException ioe) {
_heartbeatSender.shutdown();
_frameHandler.close();
throw ioe;
} catch (ShutdownSignalException sse) {
_heartbeatSender.shutdown();
_frameHandler.close();
throw AMQChannel.wrap(sse);
}
// 省略剩余的不重要的代码
}
}
public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {
/**
* 启动MainLoop线程
*/
public void startMainLoop() {
MainLoop loop = new MainLoop();
final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
mainLoopThread = Environment.newThread(threadFactory, loop, name);
mainLoopThread.start();
}
private class MainLoop implements Runnable {
/**
* MainLoop线程的执行逻辑:不断从输入流中读取帧,并调用AMQConnection#readFrame()方法处理该帧
*/
@Override
public void run() {
boolean shouldDoFinalShutdown = true;
try {
while (_running) {
Frame frame = _frameHandler.readFrame();
readFrame(frame);
}
} catch (Throwable ex) {
if (ex instanceof InterruptedException) {
// 这里的中断异常只可能在执行doFinalShutdown()方法时产生
// 因此,这种情况下不需要再重复调用doFinalShutdown()方法
shouldDoFinalShutdown = false;
} else {
// 这里会关闭连接
handleFailure(ex);
}
} finally {
if (shouldDoFinalShutdown) {
// 这里会做一些清理工作,比如中断MainLoop线程、回调ShutdownListener组件
doFinalShutdown();
}
}
}
}
/**
* 处理帧数据
*/
private void readFrame(Frame frame) throws IOException {
// 如果接收到了完整的帧,则重置丢失的心跳包数量,并将该帧转发给相应的通道来处理
if (frame != null) {
_missedHeartbeats = 0;
if (frame.type == AMQP.FRAME_HEARTBEAT) {
// 忽略心跳包,因为上面已经重置过_missedHeartbeats了
} else {
if (frame.channel == 0) { // the special channel
_channel0.handleFrame(frame);
} else {
if (isOpen()) {
ChannelManager cm = _channelManager;
if (cm != null) {
ChannelN channel;
try {
channel = cm.getChannel(frame.channel);
} catch (UnknownChannelException e) {
// 如果通道发送完请求后立刻关闭,就可能执行到这里,因此忽略该帧即可
LOGGER.info("Received a frame on an unknown channel, ignoring it");
return;
}
channel.handleFrame(frame);
}
}
}
}
// 否则,说明可能出现了心跳包超时的情况,此时_missedHeartbeats会加一,并且如果超过了阈值,则报错
} else {
handleSocketTimeout();
}
}
}
public abstract class AMQChannel extends ShutdownNotifierComponent {
/** 等待rpc调用完成的超时时间(单位:毫秒);0代表一直等待 */
protected final int _rpcTimeout;
/** 当前正在进行中(即已经发送请求,但未收到响应)的rpc调用 */
private RpcWrapper _activeRpc = null;
/**
* 发送rpc请求并获取响应结果;这里直接调用privateRpc()方法
*
* @param m 请求的类型,如basic.get等
*/
public AMQCommand rpc(Method m) throws IOException, ShutdownSignalException {
return privateRpc(m);
}
private AMQCommand privateRpc(Method m) throws IOException, ShutdownSignalException {
// 这里的SimpleBlockingRpcContinuation组件的功能和CompletableFuture类似,且其底层持有当前请求
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
// 发送rpc请求;这里会注册SimpleBlockingRpcContinuation组件,以便于接收MainLoop线程解析到的响应
rpc(m, k);
// 阻塞等待服务器的响应;最多等待_rpcTimeout毫秒
if (_rpcTimeout == NO_RPC_TIMEOUT) {
return k.getReply();
} else {
try {
return k.getReply(_rpcTimeout);
} catch (TimeoutException e) {
throw wrapTimeoutException(m, e);
}
}
}
/**
* 发送rpc请求:加锁后确保连接未关闭,然后继续调用quiescingRpc()方法真正发送rpc请求
*/
public void rpc(Method m, RpcContinuation k) throws IOException {
synchronized (_channelMutex) {
ensureIsOpen();
quiescingRpc(m, k);
}
}
/**
* 真正发送rpc请求:
* 1. 注册当前请求:将RpcContinuation组件适配成RpcContinuationRpcWrapper组件并保存起来
* 2. 根据METHOD帧数据构造AMQCommand实例,并将该命令数据写入本通道
*/
public void quiescingRpc(Method m, RpcContinuation k) throws IOException {
synchronized (_channelMutex) {
enqueueRpc(k); // 主要执行:doEnqueueRpc(() -> new RpcContinuationRpcWrapper(k));
quiescingTransmit(m); // 主要执行:quiescingTransmit(new AMQCommand(m));
}
}
/**
* 保存当前请求(即这里的RpcWrapper实例)
*/
private void doEnqueueRpc(Supplier<RpcWrapper> rpcWrapperSupplier) {
synchronized (_channelMutex) {
boolean waitClearedInterruptStatus = false;
// 如果上一个请求还没得到响应,则先等待其执行完成
while (_activeRpc != null) {
try {
_channelMutex.wait();
} catch (InterruptedException e) {
waitClearedInterruptStatus = true;
}
}
// 补上等待期间收到的中断,并将RpcWrapper实例(即当前请求)赋值给this._activeRpc
if (waitClearedInterruptStatus) {
Thread.currentThread().interrupt();
}
_activeRpc = rpcWrapperSupplier.get();
}
}
/**
* 将命令数据写入本通道
*/
public void quiescingTransmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
// 处理流控相关的逻辑:当此次Command含有请求体时,需检测当前是否被限流,是则等待限流结束
if (c.getMethod().hasContent()) {
while (_blockContent) {
try {
_channelMutex.wait();
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
// This is to catch a situation when the thread wakes up during
// shutdown. Currently, no command that has content is allowed
// to send anything in a closing state.
ensureIsOpen();
}
}
// 调用器(一般用于记录日志),然后真正发送数据到通道中
this._trafficListener.write(c);
c.transmit(this);
}
}
}
public abstract class AMQChannel extends ShutdownNotifierComponent {
/** 当前正在组装中的命令 */
private AMQCommand _command = new AMQCommand();
/**
* 前面提到,MainLoop线程在解析到一个帧之后,会调用相应通道的handleFrame()方法进行处理
* 主要逻辑:将该帧交给当前的命令组装器处理,如果能组装成一个完整的命令,则调用handleCompleteInboundCommand()方法处理该命令
*/
public void handleFrame(Frame frame) throws IOException {
AMQCommand command = _command;
if (command.handleFrame(frame)) {
_command = new AMQCommand(); // 为解析下个命令做准备
handleCompleteInboundCommand(command);
}
}
/**
* 处理接收到的命令;命令可分为同步命令和异步命令:
* 1. 如果该命令是同步rpc请求返回的响应,则该命令是同步命令;比如basic.get-ok命令,它是对basic.get请求的响应
* 2. 如果该命令不是同步rpc请求返回的响应,则该命令是异步命令;比如basic.return命令,它是服务端随时都可能发送过来的
*/
public void handleCompleteInboundCommand(AMQCommand command) throws IOException {
// 回调器(主要用于记录日志,忽略即可)
this._trafficListener.read(command);
// 先尝试处理异步命令;如果处理失败,说明是同步命令,此时进入if分支进一步处理
if (!processAsync(command)) {
// 检查rpc响应的类型是否和当前正在进行中的rpc请求的类型匹配
// 比如,假设当前请求(即_activeRpc)是basic.get类型,那么响应必须是basic.get-ok类型
// 如果不匹配,则忽略该响应,因为该响应可能是之前某个超时请求对应的响应
if (_checkRpcResponseType) {
synchronized (_channelMutex) {
if (_activeRpc != null && !_activeRpc.canHandleReply(command)) {
return;
}
}
}
// 获取this._activeRpc的值并将该字段重置为null,为下次rpc调用做准备
final RpcWrapper nextOutstandingRpc = nextOutstandingRpc();
// 如果确实有对应的rpc请求,则将该响应设置给该请求对应的CompletableFuture组件中
// the outstanding RPC can be null when calling Channel#asyncRpc
if (nextOutstandingRpc != null) {
nextOutstandingRpc.complete(command);
markRpcFinished(); // 钩子方法;默认为空实现
}
}
}
/**
* 获取并重置this._activeRpc字段的值,并通知正在等待_channelMutex锁的所有线程
*/
public RpcWrapper nextOutstandingRpc() {
synchronized (_channelMutex) {
RpcWrapper result = _activeRpc;
_activeRpc = null;
_channelMutex.notifyAll();
return result;
}
}
/**
* 抽象方法:处理异步请求
*/
public abstract boolean processAsync(Command command) throws IOException;
}
public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel {
/**
* 处理异步命令
*
* @return 是否已将该命令处理完成,或者不需要再进一步处理该命令
*/
@Override
public boolean processAsync(Command command) throws IOException {
Method method = command.getMethod();
// 如果是channel.close类型的命令,则关闭通道,并返回true,代表处理完成
if (method instanceof Channel.Close) {
asyncShutdown(command);
return true;
}
// 如果当前通道还是打开状态,则正常处理服务器发来的命令
if (isOpen()) {
// 处理basic.deliver命令;客户端在发起basic.consume请求后,服务器会将消息以basic.deliver命令的形式发送给客户端
// 这里会通过消息的consumerTag找到对应的消费者(在调用Channel#basicConsume方法时注册),然后调用这个消费者来消费消息
if (method instanceof Basic.Deliver) {
processDelivery(command, (Basic.Deliver) method);
return true;
// 处理basic.return命令;消息回退机制
// 这里会调用所有的ReturnListener组件(this.returnListeners)
} else if (method instanceof Basic.Return) {
callReturnListeners(command, (Basic.Return) method);
return true;
// 处理channel.flow命令;用于启动/解除限流,这个在之前有提到过
} else if (method instanceof Channel.Flow) {
Channel.Flow channelFlow = (Channel.Flow) method;
synchronized (_channelMutex) {
_blockContent = !channelFlow.getActive();
transmit(new Channel.FlowOk(!_blockContent));
_channelMutex.notifyAll();
}
return true;
// 处理basic.ack/nack命令;消息确认机制
// 这里会调用所有的ConfirmListener组件(this.confirmListeners)
} else if (method instanceof Basic.Ack) {
Basic.Ack ack = (Basic.Ack) method;
callConfirmListeners(command, ack);
handleAckNack(ack.getDeliveryTag(), ack.getMultiple(), false);
return true;
} else if (method instanceof Basic.Nack) {
Basic.Nack nack = (Basic.Nack) method;
callConfirmListeners(command, nack);
handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);
return true;
// 处理basic.recover-ok/cancel命令;限于篇幅,这里省略掉了具体的处理代码,请自行翻阅源码查看
} else if (method instanceof Basic.RecoverOk) {
return false;
} else if (method instanceof Basic.Cancel) {
return true;
// 其它类型的命令都当作是同步命令,因此返回false,由handleCompleteInboundCommand()方法来进一步处理
} else {
return false;
}
// 如果通道已关闭,则只放行channel.close-ok响应,其它命令统统丢弃掉
} else {
if (method instanceof Channel.CloseOk) {
return false;
} else {
return true;
}
}
}
}