业务场景
一个需要实时返回的功能,实现逻辑中有异步的逻辑(消息队列,websocket消息,多线程等)
异步转同步的方案
循环等待
我们调用对方一个请求,在没有结果之前一直循环查询即可。
这个结果可以在内存中,也可以放在 redis 缓存或者 mysql 等数据库中。
特点:比较消耗性能
public class MyQuery {
private static final Log log = LogFactory.getLog(MyQuery.class);
protected String result;
/**
* 超时时间
*/
private long timeoutMills = 3000;
public void asyncToSync() {
log.info("开始查询...");
new Thread(new Runnable() {
public void run() {
remoteCall();
}
}).start();
try {
final long endTimeMills = System.currentTimeMillis() + timeoutMills;
while (StringUtil.isEmpty(result)) {
// 超时判断
if(System.currentTimeMillis() >= endTimeMills) {
throw new RuntimeException("请求超时");
}
//循环等待一下
TimeUnit.MILLISECONDS.sleep(10);
}
//获取结果
log.info("完成查询,结果为:" + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 远程调用
*/
protected void remoteCall() {
try {
log.info("远程调用开始");
TimeUnit.SECONDS.sleep(5);
result = "success";
log.info("远程调用结束");
} catch (InterruptedException e) {
log.error("远程调用失败", e);
}
}
}
基于 wait() & notifyAll()
public class MyQuery {
private static final Log log = LogFactory.getLog(MyQuery.class);
protected String result;
private final Object lock = new Object();
public void asyncToSync() {
log.info("开始查询...");
new Thread(new Runnable() {
public void run() {
remoteCall();
}
}).start();
try {
// 等待 10s
synchronized (lock) {
log.info("主线程进入等待");
lock.wait(10 * 1000);
}
//获取结果
log.info("完成查询,结果为:" + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 远程调用
*/
protected void remoteCall() {
try {
log.info("远程调用开始");
TimeUnit.SECONDS.sleep(5);
result = "success";
log.info("远程调用结束");
} catch (InterruptedException e) {
log.error("远程调用失败", e);
}
synchronized (lock) {
log.info("远程线程执行完成,唤醒所有等待。");
lock.notifyAll();
}
}
}
基于条件锁的实现
public class MyQuery {
private static final Log log = LogFactory.getLog(MyQuery.class);
protected String result;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public void asyncToSync() {
log.info("开始查询...");
new Thread(new Runnable() {
public void run() {
remoteCall();
}
}).start();
lock.lock();
try{
// 等待
log.info("主线程进入等待");
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
log.info("完成查询,结果为:" + result);
}
/**
* 远程调用
*/
protected void remoteCall() {
lock.lock();
try{
log.info("远程调用开始");
TimeUnit.SECONDS.sleep(5);
result = "success";
log.info("远程调用结束");
log.info("远程线程执行完成,唤醒所有等待线程。");
condition.signalAll();
} finally {
lock.unlock();
}
}
}
CountDownLatch
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class MyQuery{
private static final Log log = LogFactory.getLog(MyQuery.class);
protected String result;
/**
* 闭锁
* 调用1次,后续方法即可通行。
*/
private final CountDownLatch countDownLatch = new CountDownLatch(1);
public void asyncToSync() {
log.info("开始查询...");
new Thread(new Runnable() {
public void run() {
remoteCall();
}
}).start();
try {
// countDownLatch.await();
countDownLatch.await(10, TimeUnit.SECONDS);
log.info("完成查询,结果为:" + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
protected void remoteCall() {
try{
log.info("远程调用开始");
TimeUnit.SECONDS.sleep(5);
result = "success";
log.info("远程调用结束");
}catch (InterruptedException e) {
e.printStackTrace();
}
// 调用一次闭锁
countDownLatch.countDown();
}
}
CyclicBarrier
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class MyQuery{
private static final Log log = LogFactory.getLog(MyQuery.class);
protected String result;
private CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
public void asyncToSync() {
log.info("开始查询...");
new Thread(new Runnable() {
public void run() {
remoteCall();
}
}).start();
try {
cyclicBarrier.await();
log.info("主线程进入等待");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
//获取结果
log.info("完成查询,结果为:" + result);
}
protected void remoteCall() {
try {
log.info("远程调用开始");
TimeUnit.SECONDS.sleep(5);
result = "success";
log.info("远程调用结束");
} catch (InterruptedException e) {
log.error("远程调用失败", e);
}
try {
cyclicBarrier.await();
log.info("远程调用进入等待");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
Future
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import java.util.concurrent.*;
public class MyQuery{
private static final Log log = LogFactory.getLog(MyQuery.class);
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
public void asyncToSync() {
//1. 开始调用
super.startQuery();
//2. 远程调用
Future<String> stringFuture = remoteCallFuture();
//3. 完成结果
try {
String result = stringFuture.get(10, TimeUnit.SECONDS);
log.info("调用结果:{}", result);
} catch (InterruptedException | TimeoutException | ExecutionException e) {
e.printStackTrace();
}
}
/**
* 远程调用
* @return Future 信息
*/
private Future<String> remoteCallFuture() {
FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
log.info("开始异步调用");
TimeUnit.SECONDS.sleep(5);
log.info("完成异步调用");
return "success";
}
});
executorService.submit(futureTask);
// 关闭线程池
executorService.shutdown();
return futureTask;
}
}
Spring EventListener
public class BookingCreatedEvent extends ApplicationEvent {
private static final long serialVersionUID = -13870782222217348344L;
private String info;
public BookingCreatedEvent(Object source) {
super(source);
}
public BookingCreatedEvent(Object source, String info) {
super(source);
this.info = info;
}
public String getInfo() {
return info;
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class BookingService {
@Autowired
private ApplicationContext context;
private volatile BookingCreatedEvent bookingCreatedEvent;
/**
* 异步转同步查询
* @param info
* @return
*/
public String asyncQuery(final String info) {
query(info);
new Thread(new Runnable() {
@Override
public void run() {
remoteCallback(info);
}
}).start();
while(bookingCreatedEvent == null) {
//.. 空循环
// 短暂等待。
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
//...
}
//2. 使用两个单独的 event...
}
final String result = bookingCreatedEvent.getInfo();
bookingCreatedEvent = null;
return result;
}
@EventListener
public void onApplicationEvent(BookingCreatedEvent bookingCreatedEvent) {
System.out.println("监听到远程的信息: " + bookingCreatedEvent.getInfo());
this.bookingCreatedEvent = bookingCreatedEvent;
System.out.println("监听到远程消息后: " + this.bookingCreatedEvent.getInfo());
}
/**
* 执行查询
* @param info
*/
public void query(final String info) {
System.out.println("开始查询: " + info);
}
/**
* 远程回调
* @param info
*/
public void remoteCallback(final String info) {
System.out.println("远程回调开始: " + info);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 重发结果事件
String result = info + "-result";
BookingCreatedEvent bookingCreatedEvent = new BookingCreatedEvent(this, result);
//触发event
this.context.publishEvent(bookingCreatedEvent);
}
}