Okio是一个IO库,底层基于Java原生的输入输出流实现。但原生的输入输出流并没有提供超时的检测机制。而Okio实现了这个功能。建议读者先阅读 Android | 彻底理解 Okio 之源码篇 ,然后再阅读本篇内容会更好理解。
Timeout 类的设计
探讨超时机制,首先要了解Timeout
这个类。Timeout
实现了Okio的同步超时检测,这里的同步指的是“任务执行”和“超时检测”是同步的,有顺序的。同步超时不会直接中断任务执行,它首先会检查是否发生超时,然后决定是否中断任务执行。throwIfReached
就是一个同步超时检测的方法。
理解 timeout 与 deadline 的区别
timeout
中文意为“超时”,deadline
中文意为“最后期限”,它们是有明显区别的。 Timeout
类中有一系列的timeoutXxx
方法,timeoutXxx
是用来设置**一次操作完成的最大等待时间。若这个操作在等待时间内没有结束,则认为超时。 deadlineXxx
系列方法则是用来设置一项任务完成的最大等待时间。**意味着在未来多长时间内,需要将这项任务完成,否则认为超时。它可能包含一次或多次的操作。
读取文件的例子
回顾下之前Okio读取文件例子。
public void readFile() { try { FileInputStream fis = new FileInputStream("test.txt"); okio.Source source = Okio.source(fis); BufferedSource bs = Okio.buffer(source); source.timeout().deadline(1, TimeUnit.MILLISECONDS); String res = bs.readUtf8(); System.out.println(res); } catch (Exception e){ e.printStackTrace(); } }
在这个例子中,我们使用deadline
设置了超时时间为1ms,这意味着从现在开始,读取文件的这项任务,必须在未来的1ms内完成,否则认为超时。而读取文件的这项任务,就包含了多次的文件读取操作。
摇骰子的例子
我们再来看下面这个摇骰子的程序。Dice
是一个骰子类,roll
方法表示摇骰子,摇出来的点数latestTotal
不会超过12。rollAtFixedRate
会开启一个线程,每隔一段时间调用roll
方法摇一次骰子。awaitTotal
方法会当骰子的点数与我们传递进去的total
值一样或者超时而结束。
private class Dice { Random random = new Random(); int latestTotal; // 摇骰子 public synchronized void roll() { latestTotal = 2 + random.nextInt(6) + random.nextInt(6); System.out.println("Rolled " + latestTotal); notifyAll(); } // 开启一个线程,每隔一段时间执行 roll 方法 public void rollAtFixedRate(int period, TimeUnit timeUnit) { Executors.newScheduledThreadPool(0).scheduleAtFixedRate(new Runnable() { public void run() { roll(); } }, 0, period, timeUnit); } // 超时检测 public synchronized void awaitTotal(Timeout timeout, int total) throws InterruptedIOException { while (latestTotal != total) { timeout.waitUntilNotified(this); } } }
timeout()
是一个测试骰子类的方法,在主线程中运行。该程序设置每隔3s摇一次骰子,主线程设置超时时间为6s,期望摇到的点数是20。因为设置的超时是timeoutXxx
系列的方法,所以这里超时的意思是“只要我摇一次骰子的时间不超过6s,那么我就不会超时,可以一直摇骰子”。因为摇出骰子的最大点数是12,而期望值是20,永远也摇不出来20这个点数,且摇一次骰子的时间是3s多,也不满足超时的时间。所以主线程就会一直处于等待状态。
public void timeout(){ try { Dice dice = new Dice(); dice.rollAtFixedRate(3, TimeUnit.SECONDS); Timeout timeout = new Timeout(); timeout.timeout(6, TimeUnit.SECONDS); dice.awaitTotal(timeout, 20); } catch (Exception e) { e.printStackTrace(); } }
现在将timeout()
方法修改一下,将timeout.timeout(6, TimeUnit.SECONDS)
改为timeout.deadline(6, TimeUnit.SECONDS)
,之前我们说过deadlineXxx
设置的超时**意味着在未来多长时间内,需要将这项任务完成。**在摇骰子这里的意思就是“从现在开始,我只可以摇6s的骰子。超过这个时间你还在摇,则认为超时”。它关注的是可以摇多久的骰子,而不是摇一次骰子不能超过多久的时间。
public void timeout(){ try { Dice dice = new Dice(); dice.rollAtFixedRate(3, TimeUnit.SECONDS); Timeout timeout = new Timeout(); timeout.deadline(6, TimeUnit.SECONDS); dice.awaitTotal(timeout, 20); } catch (Exception e) { e.printStackTrace(); } }
上述程序,主线程会在6s后因超时而停止等待,结束运行。
等待直到唤醒
前面举了两个例子让大家理解Okio中timeout
和deadline
的区别。在摇骰子的例子中用到了waitUntilNotified
这个方法来检测超时,中文意思为“等待直到唤醒”。也就是Java多线程中经典的“等待-唤醒”机制,该机制常常用于多线程之间的通信。调用waitUntilNotified
方法的线程会一直处于等待状态,除非被唤醒或者因超时而抛出异常。下面是该方法的源码。
public final void waitUntilNotified(Object monitor) throws InterruptedIOException { try { boolean hasDeadline = hasDeadline(); long timeoutNanos = timeoutNanos(); // 若没有设置 deadline && timeout,则一直等待直到唤醒 if (!hasDeadline && timeoutNanos == 0L) { monitor.wait(); // There is no timeout: wait forever. return; } // Compute how long we'll wait. // 计算等待的时长,若同时设置了deadline 和 timeout,则 deadline 优先 long waitNanos; long start = System.nanoTime(); if (hasDeadline && timeoutNanos != 0) { long deadlineNanos = deadlineNanoTime() - start; waitNanos = Math.min(timeoutNanos, deadlineNanos); } else if (hasDeadline) { waitNanos = deadlineNanoTime() - start; } else { waitNanos = timeoutNanos; } // Attempt to wait that long. This will break out early if the monitor is notified. long elapsedNanos = 0L; if (waitNanos > 0L) { long waitMillis = waitNanos / 1000000L; // 等待 waitNanos monitor.wait(waitMillis, (int) (waitNanos - waitMillis * 1000000L)); // 计算从等待 waitNanos 到唤醒所用时间 elapsedNanos = System.nanoTime() - start; } // Throw if the timeout elapsed before the monitor was notified. // 若等待了 waitNanos 还没唤醒,认为超时 if (elapsedNanos >= waitNanos) { throw new InterruptedIOException("timeout"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); // Retain interrupted status. throw new InterruptedIOException("interrupted"); } }
查看waitUntilNotified
的源码,我们发现该方法基于“等待-通知”机制,添加了多线程之间的超时检测功能,一个线程用来执行具体的任务,一个线程调用该方法来检测超时。在Okio中的管道就使用了waitUntilNotified
这个方法。
AsyncTimeout 类的设计
AsyncTimeout
内部维护一个单链表,节点的类型是AsyncTimeout
,以到超时之前的剩余时间升序排序,即超时的剩余时间越大,节点就在链表越后的位置。对链表的操作,使用了synchronized
关键字加类锁,保证在同一时间,只有一个线程可以对链表进行修改访问操作。
AsyncTimeout
实现了Okio的异步超时检测。这里的异步指的是“任务执行”和“超时检测”是异步的,在执行任务的同时,也在进行任务的“超时检测”。你会觉得这和上面摇骰子的例子很像,一个线程执行任务,一个线程检测超时。事实上,AsyncTimeout
也正是这样实现的,它内部的Watchdog
线程就是用来检测超时的。当我们要对一次操作或一项任务设置超时,使用成对的enter()
和exit()
,模板代码如下。
enter(); // do something exit();
若上面do something
的操作超时,timedOut()
方法将会在Watchdog
线程被回调。可以看见,这种包裹性的模板代码,灵活性很大,我们几乎可以在其中放置任何想要检测超时的一个或多个操作。
AsyncTimeout 成员变量
下面是AsyncTimeout
类主要的成员变量。
private static final long IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60); static @Nullable AsyncTimeout head; private boolean inQueue; private @Nullable AsyncTimeout next; private long timeoutAt;
IDLE_TIMEOUT_MILLIS
,在单链表中没有节点时,Watchdog
线程等待的时间head
,单链表的头结点,是一个虚假节点。当链表中只存在该节点,认为该链表为空。inQueue
,当前节点是否在链表中。next
,当前节点的下一个节点。timeoutAt
,以当前时间为基准,当前节点在将来何时超时。
AsyncTimeout 成员方法
scheduleTimeout 有序的将超时节点加入到链表中
scheduleTimeout
方法可以将一个超时节点按照超时的剩余时间有序的插入到链表当中。注意该方法使用synchronized
修饰,是一个同步方法,可以保证对链表的操作是线程安全的。
private static synchronized void scheduleTimeout(AsyncTimeout node, long timeoutNanos, boolean hasDeadline) { // Start the watchdog thread and create the head node when the first timeout is scheduled. // 若 head 节点为 null, 初始化 head 并启动 Watchdog 线程 if (head == null) { head = new AsyncTimeout(); new Watchdog().start(); } // 计算 node 节点的 timeoutAt 值 long now = System.nanoTime(); if (timeoutNanos != 0 && hasDeadline) { // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around, // Math.min() is undefined for absolute values, but meaningful for relative ones. node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now); } else if (timeoutNanos != 0) { node.timeoutAt = now + timeoutNanos; } else if (hasDeadline) { node.timeoutAt = node.deadlineNanoTime(); } else { throw new AssertionError(); } // Insert the node in sorted order. // 返回 node 节点的超时剩余时间 long remainingNanos = node.remainingNanos(now); // 从 head 节点开始遍历链表, 将 node 节点插入到合适的位置 for (AsyncTimeout prev = head; true; prev = prev.next) { // 若当前遍历的节点下一个节点为 null 或者 node 节点的超时剩余时间小于下一个节点 if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) { // 将 node 节点插入到链表 node.next = prev.next; prev.next = node; // 若当前遍历的节点是 head, 唤醒 watchdog 线程 if (prev == head) { AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front. } break; } } }
Watchdog 线程
在scheduleTimeout
方法中,若head
为null
,则会初始化head
并启动Watchdog
线程。Watchdog
是一个守护线程,因此它会随着JVM进程的结束而结束。前面我们说过Watchdog
线程是用来检测超时的,它会逐个检查链表中的超时节点是否超时,直到链表中所有节点检查完毕后结束运行。
private static final class Watchdog extends Thread { Watchdog() { super("Okio Watchdog"); setDaemon(true); } public void run() { while (true) { try { // 超时的节点 AsyncTimeout timedOut; // 加锁,同步代码块 synchronized (AsyncTimeout.class) { // 等待节点超时 timedOut = awaitTimeout(); // Didn't find a node to interrupt. Try again. // 当前该节点没有超时,继续检查 if (timedOut == null) continue; // The queue is completely empty. Let this thread exit and let another watchdog thread // get created on the next call to scheduleTimeout(). // 链表中已经没有超时节点,结束运行 if (timedOut == head) { head = null; return; } } // Close the timed out node. // timedOut 节点超时,回调 timedOut() 方法 timedOut.timedOut(); } catch (InterruptedException ignored) { } } } }
awaitTimeout 等待节点超时
在Watchdog
线程中会调用awaitTimeout
方法来等待检测的节点超时,若检测的节点没有超时,该方法返回null
。否则返回超时的节点。
static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException { // Get the next eligible node. // 检测的节点 AsyncTimeout node = head.next; // The queue is empty. Wait until either something is enqueued or the idle timeout elapses. // 若链表为空 if (node == null) { long startNanos = System.nanoTime(); // Watchdog 线程等待 60s,期间会释放类锁 AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS); // 等待 60s 后若链表还为空则返回 head,否则返回 null return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS ? head // The idle timeout elapsed. : null; // The situation has changed. } // node 节点超时剩余的时间 long waitNanos = node.remainingNanos(System.nanoTime()); // The head of the queue hasn't timed out yet. Await that. // node 节点超时剩余的时间 > 0,说明 node 还未超时,继续等待 waitNanos 后返回 null if (waitNanos > 0) { // Waiting is made complicated by the fact that we work in nanoseconds, // but the API wants (millis, nanos) in two arguments. long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); AsyncTimeout.class.wait(waitMillis, (int) waitNanos); return null; } // The head of the queue has timed out. Remove it. // node 节点超时了,将 node 从链表中移除并返回 head.next = node.next; node.next = null; return node; }
enter 进入超时检测
分析完上面三个方法后再来看enter
就非常的简单了,enter
内部调用了scheduleTimeout
方法来添加一个超时节点到链表当中,而Watchdog
线程随即会开始检测超时。
public final void enter() { if (inQueue) throw new IllegalStateException("Unbalanced enter/exit"); long timeoutNanos = timeoutNanos(); boolean hasDeadline = hasDeadline(); if (timeoutNanos == 0 && !hasDeadline) { return; // No timeout and no deadline? Don't bother with the queue. } // 更新 inQueue 为 true inQueue = true; scheduleTimeout(this, timeoutNanos, hasDeadline); }
exit 退出超时检测
前面说过,enter
和exit
在检测超时是需要成对出现的。它们之间的代码就是需要检测超时的代码。exit
方法的返回值表示enter
和exit
中间检测的代码是否超时。
public final boolean exit() { if (!inQueue) return false; // 更新 inQueue 为 false inQueue = false; return cancelScheduledTimeout(this); }
cancelScheduledTimeout
方法会将当前的超时节点从链表中移除。为了保证对链表的操作是线程安全的,该方法也是一个同步方法。我们知道在awaitTimeout
方法中,若某个节点超时了会将它从链表中移除。那么当调用cancelScheduledTimeout
发现node
不在链表中,则一定表明node
超时了。
private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) { // Remove the node from the linked list. // 若 node 在链表中,将其移除。 for (AsyncTimeout prev = head; prev != null; prev = prev.next) { if (prev.next == node) { prev.next = node.next; node.next = null; return false; } } // The node wasn't found in the linked list: it must have timed out! // node 不在链表中,则 node 一定超时了,返回 true return true; }
总结
本文详细讲解了Okio中超时机制的实现原理,主要是Timeout
和AsyncTimeout
类的源码分析与解读。相信大家已经掌握了这部分知识,现总结一下文中要点。
- Okio 基于等待-唤醒机制,使用
Watchdog
线程来检测超时。 - 当要对某项操作或任务进行超时检测时,将它们放到
enter
和exit
的中间。 - Okio 对链表的使用非常频繁,在文件读写和超时检测都使用到了链表这个结构。