枪火工厂游戏
17.89MB · 2025-10-15
spymemcached的IO流程解析
spymemcached的整体源码结构介绍 从整体上介绍了spymemcached的设计流程,功能流转以及有特色的地方;本文重点介绍一下spymemcached的核心功能--网络IO功能;了解一下高性能的缓存数据库的SDK核心功能,看看是如何设计?我们能从中学习到什么?
简单介绍一下
public void handleIO() throws IOException {
if (shutDown) {
logger.debug("No IO while shut down.");
return;
}
handleInputQueue();
long delay = wakeupDelay;
if (!reconnectQueue.isEmpty()) {
long now = System.currentTimeMillis();
long then = reconnectQueue.firstKey();
delay = Math.max(then - now, 1);
}
assert selectorsMakeSense() : "Selectors don't make sense.";
int selected = selector.select(delay);
if (shutDown) {
return;
} else if (selected == 0 && addedQueue.isEmpty()) {
handleWokenUpSelector();
} else if (selector.selectedKeys().isEmpty()) {
handleEmptySelects();
} else {
emptySelects = 0;
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()) {
SelectionKey sk = iterator.next();
handleIO(sk);
iterator.remove();
}
}
// 负责处理清理工作,比如:超过超时阈值次数后,断连; 重连reconnectQueue
handleOperationalTasks();
handleReconnectDueToTimeout();
}
List<MemcachedNode>
;private void handleInputQueue() {
if (!addedQueue.isEmpty()) {
Collection<MemcachedNode> toAdd = new HashSet<>();
Collection<MemcachedNode> todo = new HashSet<>();
MemcachedNode qaNode;
while ((qaNode = addedQueue.poll()) != null) {
todo.add(qaNode);
}
for (MemcachedNode node : todo) {
boolean readyForIO = false;
if (node.isActive()) {
if (node.getCurrentWriteOp() != null) {
readyForIO = true;
}
} else {
toAdd.add(node);
}
node.copyInputQueue();
if (readyForIO) {
try {
if (node.getWbuf().hasRemaining()) {
handleWrites(node);
}
} catch (IOException e) {
logger.warn("Exception handling write", e);
lostConnection(node);
}
}
node.fixupOps();
}
addedQueue.addAll(toAdd);
}
}
前面省略了一些建连、监听事件的逻辑, 直入主题:主干源码流程;
private void handleWrites(final MemcachedNode node) throws IOException {
// 核心:把node中的op的数据写入writebuffer
node.fillWriteBuffer(shouldOptimize);
boolean canWriteMore = node.getBytesRemainingToWrite() > 0;
while (canWriteMore) {
int wrote = node.writeSome();
metrics.updateHistogram(OVERALL_AVG_BYTES_WRITE_METRIC, wrote);
node.fillWriteBuffer(shouldOptimize);
canWriteMore = wrote > 0 && node.getBytesRemainingToWrite() > 0;
}
}
public final void fillWriteBuffer(boolean shouldOptimize) {
if (toWrite == 0 && readQ.remainingCapacity() > 0) {
getWbuf().clear();
Operation o=getNextWritableOp();
while(o != null && toWrite < getWbuf().capacity()) {
synchronized(o) {
assert o.getState() == OperationState.WRITING;
ByteBuffer obuf = o.getBuffer();
assert obuf != null : "Didn't get a write buffer from " + o;
if (obuf != null) {
int bytesToCopy = Math.min(getWbuf().remaining(), obuf.remaining());
byte[] b = new byte[bytesToCopy];
obuf.get(b);
getWbuf().put(b);
if (!o.getBuffer().hasRemaining()) {
o.writeComplete();
transitionWriteItem();
preparePending();
if (shouldOptimize) {
optimize();
}
o = getNextWritableOp();
}
toWrite += bytesToCopy;
} else {
reportFillWriteBufferBug();
removeCurrentWriteOpWhileWriteBufferIsNull();
o = getNextWritableOp();
}
}
}
getWbuf().flip();
assert toWrite <= getWbuf().capacity() : "toWrite exceeded capacity: "
+ this;
assert toWrite == getWbuf().remaining() : "Expected " + toWrite
+ " remaining, got " + getWbuf().remaining();
} else {
logger.debug("Buffer is full, skipping");
}
}
这里有个问题: read-write的顺序问题; 在这个方法执行中,有一个时间段:如果readQ和writeQ都有这个Op, 此时server OOM,就会导致当前的Op已经有返回值了;所以这种情况下,就需要把writeQ中的Op给清掉;不然执行就会出错;
private Operation getNextWritableOp() {
Operation o = getCurrentWriteOp();
while (o != null && o.getState() == OperationState.WRITE_QUEUED) {
synchronized(o) {
if (o.isCancelled()) {
logger.debug("Not writing cancelled op.");
Operation cancelledOp = removeCurrentWriteOp();
assert o == cancelledOp;
} else if (o.isTimedOut(defaultOpTimeout)) {
logger.debug("Not writing timed out op.");
Operation timedOutOp = removeCurrentWriteOp();
assert o == timedOutOp;
} else {
o.writing();
if (!(o instanceof TapAckOperationImpl)) {
readQ.add(o);
}
return o;
}
o = getCurrentWriteOp();
}
}
return o;
}
private void handleReads(final MemcachedNode node) throws IOException {
Operation currentOp = node.getCurrentReadOp();
if (currentOp instanceof TapAckOperationImpl) {
node.removeCurrentReadOp();
return;
}
ByteBuffer rbuf = node.getRbuf();
final SocketChannel channel = node.getChannel();
int read = channel.read(rbuf);
metrics.updateHistogram(OVERALL_AVG_BYTES_READ_METRIC, read);
if (read < 0) {
currentOp = handleReadsWhenChannelEndOfStream(currentOp, node, rbuf);
}
while (read > 0) {
rbuf.flip();
while (rbuf.remaining() > 0) {
if (currentOp == null) {
throw new IllegalStateException("No read operation.");
}
long timeOnWire =
System.nanoTime() - currentOp.getWriteCompleteTimestamp();
metrics.updateHistogram(OVERALL_AVG_TIME_ON_WIRE_METRIC,
(int)(timeOnWire / 1000));
metrics.markMeter(OVERALL_RESPONSE_METRIC);
synchronized(currentOp) {
readBufferAndLogMetrics(currentOp, rbuf, node);
}
currentOp = node.getCurrentReadOp();
}
rbuf.clear();
read = channel.read(rbuf);
node.completedRead();
}
}
注:net.spy.memcached.protocol.ascii.OperationImpl#readFromBuffer
public void readFromBuffer(ByteBuffer data) throws IOException {
// Loop while there's data remaining to get it all drained.
while (getState() != OperationState.COMPLETE && data.remaining() > 0) {
if (readType == OperationReadType.DATA) {
handleRead(data);
} else {
int offset = -1;
for (int i = 0; data.remaining() > 0; i++) {
byte b = data.get();
if (b == 'r') {
foundCr = true;
} else if (b == 'n') {
assert foundCr : "got a n without a r";
offset = i;
foundCr = false;
break;
} else {
assert !foundCr : "got a r without a n";
byteBuffer.write(b);
}
}
if (offset >= 0) {
String line = new String(byteBuffer.toByteArray(), CHARSET);
byteBuffer.reset();
OperationErrorType eType = classifyError(line);
if (eType != null) {
errorMsg = line.getBytes();
handleError(eType, line);
} else {
handleLine(line);
}
}
}
}
}
net.spy.memcached.protocol.ascii.BaseGetOpImpl#handleLine
public final void handleLine(String line) {
if (line.equals("END")) {
if (hasValue) {
getCallback().receivedStatus(END);
} else {
getCallback().receivedStatus(NOT_FOUND);
}
transitionState(OperationState.COMPLETE);
data = null;
} else if (line.startsWith("VALUE ")) {
String[] stuff = line.split(" ");
assert stuff[0].equals("VALUE");
currentKey = stuff[1];
currentFlags = Integer.parseInt(stuff[2]);
data = new byte[Integer.parseInt(stuff[3])];
if (stuff.length > 4) {
casValue = Long.parseLong(stuff[4]);
}
readOffset = 0;
hasValue = true;
setReadType(OperationReadType.DATA);
} else if (line.equals("LOCK_ERROR")) {
getCallback().receivedStatus(LOCK_ERROR);
transitionState(OperationState.COMPLETE);
} else {
assert false : "Unknown line type: " + line;
}
}
当服务端返回SERVER_ERROR时,客户端捕获后,并将其封装成OperationException,逐步向上抛,最终被net.spy.memcached.MemcachedConnection#handleIO
捕获,执行lostConnection断连
出现连接异常
channel被关闭,但是MemcachedConnection没有被shutdown
兜底异常
private void lostConnection(final MemcachedNode node) {
queueReconnect(node);
for (ConnectionObserver observer : connObservers) {
observer.connectionLost(node.getSocketAddress());
}
}
protected void queueReconnect(final MemcachedNode node) {
if (shutDown) {
return;
}
logger.warn("Closing, and reopening {}, attempt {}.", node, node.getReconnectCount());
if (node.getSk() != null) {
node.getSk().cancel();
assert !node.getSk().isValid() : "Cancelled selection key is valid";
}
node.reconnecting();
try {
if (node.getChannel() != null && node.getChannel().socket() != null) {
node.getChannel().socket().close();
} else {
logger.info("The channel or socket was null for {}", node);
}
} catch (IOException e) {
logger.warn("IOException trying to close a socket", e);
}
node.setChannel(null);
// 指数退避重连,下次重连时间 = 当前时间 + 2^node的重连次数
long delay = (long) Math.min(maxDelay, Math.pow(2,
node.getReconnectCount()) * 1000);
long reconnectTime = System.currentTimeMillis() + delay;
// 如果已经包含了,则下次执行时间++;
while (reconnectQueue.containsKey(reconnectTime)) {
reconnectTime++;
}
reconnectQueue.put(reconnectTime, node);
metrics.incrementCounter(RECON_QUEUE_METRIC);
node.setupResend();
if (failureMode.get() == FailureMode.Redistribute) {
redistributeOperations(node.destroyInputQueue());
} else if (failureMode.get() == FailureMode.Cancel) {
cancelOperations(node.destroyInputQueue());
}
}
private void handleOperationalTasks() throws IOException {
checkPotentiallyTimedOutConnection();
if (!shutDown && !reconnectQueue.isEmpty()) {
attemptReconnects();
}
if (!retryOps.isEmpty()) {
ArrayList<Operation> operations = new ArrayList<>(retryOps);
retryOps.clear();
redistributeOperations(operations);
}
handleShutdownQueue();
}
源码看多了,发现大家解决问题的思路殊途同归,下面总结一下spymemcached的经典解决问题的思想
如果建连失败,会一直重试,但是每次重试的时间间隔是指数间隔,这样避免频繁无效的重试;
grpc中的重连逻辑也是指数退避重连思想
在spymemcached中,利用重建来规避这个问题,同时阈值是256次空轮询;spymemcached是2013年解决的;
netty也是利用重建的方式规避这个bug,监听阈值也是256, 但是netty使用该思路规避的时间是2019年 ;
在并发中,开发是最复杂的,各种情况都需要考虑, 尤其是多线程同时操作导致线程不安全、资源竞争激烈、效率降低等问题;所以在很多组件中,都会采用并发串行化的思想,即把需要并发执行的操作都封装成一个个对象,然后放到Queue,当线程执行的时候,依次从Queue中取任务执行,这样就避免资源竞争、线程不安全等问题了;
思路如出一辙
MemcachedOperation 利用state来表示当前处于什么状态;
ChannelRegistered -> ChannelActive -> ChannelRead -> ChnannlReadComplete 等等