java 异步转同步

时间:2025-08-27 10:06:01来源:互联网

下面小编就为大家分享一篇java 异步转同步,具有很好的参考价值,希望对大家有所帮助。

业务场景

一个需要实时返回的功能,实现逻辑中有异步的逻辑(消息队列,websocket消息,多线程等)

异步转同步的方案

循环等待

我们调用对方一个请求,在没有结果之前一直循环查询即可。
这个结果可以在内存中,也可以放在 redis 缓存或者 mysql 等数据库中。

特点:比较消耗性能

public  class MyQuery {
    private static final Log log = LogFactory.getLog(MyQuery.class);
    protected String result;
	/**
     * 超时时间
     */
    private long timeoutMills = 3000;

    public void asyncToSync() {
        log.info("开始查询...");
        new Thread(new Runnable() {
            public void run() {
                remoteCall();
            }
        }).start();
        try {
            final long endTimeMills = System.currentTimeMillis() + timeoutMills;
            while (StringUtil.isEmpty(result)) {
                // 超时判断
                if(System.currentTimeMillis() >= endTimeMills) {
                    throw new RuntimeException("请求超时");
                }

                //循环等待一下
                TimeUnit.MILLISECONDS.sleep(10);
            }

            //获取结果
            log.info("完成查询,结果为:" + result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    
    /**
     * 远程调用
     */
    protected void remoteCall() {
        try {
            log.info("远程调用开始");
            TimeUnit.SECONDS.sleep(5);
            result = "success";
            log.info("远程调用结束");
        } catch (InterruptedException e) {
            log.error("远程调用失败", e);
        }
    }

}

 基于 wait() & notifyAll()

public  class MyQuery {
    private static final Log log = LogFactory.getLog(MyQuery.class);
    protected String result;
    private final Object lock = new Object();

    public void asyncToSync() {
        log.info("开始查询...");
        new Thread(new Runnable() {
            public void run() {
                remoteCall();
            }
        }).start();
        try {
            // 等待 10s
            synchronized (lock) {
                log.info("主线程进入等待");
                lock.wait(10 * 1000);
            }
            //获取结果
            log.info("完成查询,结果为:" + result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    
    /**
     * 远程调用
     */
    protected void remoteCall() {
        try {
            log.info("远程调用开始");
            TimeUnit.SECONDS.sleep(5);
            result = "success";
            log.info("远程调用结束");
        } catch (InterruptedException e) {
            log.error("远程调用失败", e);
        }
		synchronized (lock) {
            log.info("远程线程执行完成,唤醒所有等待。");
            lock.notifyAll();
        }
    }
}

 基于条件锁的实现

public  class MyQuery {
    private static final Log log = LogFactory.getLog(MyQuery.class);
    protected String result;
    private final Lock lock = new ReentrantLock();

    private final Condition condition = lock.newCondition();

    public void asyncToSync() {
        log.info("开始查询...");
        new Thread(new Runnable() {
            public void run() {
                remoteCall();
            }
        }).start();
        lock.lock();
        try{
            // 等待
            log.info("主线程进入等待");
            condition.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        log.info("完成查询,结果为:" + result);
    }

    
    /**
     * 远程调用
     */
    protected void remoteCall() {
        lock.lock();
        try{
            log.info("远程调用开始");
            TimeUnit.SECONDS.sleep(5);
            result = "success";
            log.info("远程调用结束");
            log.info("远程线程执行完成,唤醒所有等待线程。");
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

}

CountDownLatch

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


public class MyQuery{
    private static final Log log = LogFactory.getLog(MyQuery.class);
    protected String result;

    /**
     * 闭锁
     * 调用1次,后续方法即可通行。
     */
    private final CountDownLatch countDownLatch = new CountDownLatch(1);
	
		public void asyncToSync() {
			log.info("开始查询...");
			new Thread(new Runnable() {
				public void run() {
					remoteCall();
				}
			}).start();
			 try {
	//          countDownLatch.await();
				countDownLatch.await(10, TimeUnit.SECONDS);
				log.info("完成查询,结果为:" + result);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		
		}

    protected void remoteCall() {
	    try{
            log.info("远程调用开始");
            TimeUnit.SECONDS.sleep(5);
            result = "success";
            log.info("远程调用结束");
        }catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 调用一次闭锁
        countDownLatch.countDown();
    }
}

 

CyclicBarrier

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;


public class MyQuery{

    private static final Log log = LogFactory.getLog(MyQuery.class);
    protected String result;
    private CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

	public void asyncToSync() {
        log.info("开始查询...");
        new Thread(new Runnable() {
            public void run() {
                remoteCall();
            }
        }).start();
		try {
            cyclicBarrier.await();
            log.info("主线程进入等待");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
            //获取结果
            log.info("完成查询,结果为:" + result);
		
		}
    protected void remoteCall() {
		try {
            log.info("远程调用开始");
            TimeUnit.SECONDS.sleep(5);
            result = "success";
            log.info("远程调用结束");
        } catch (InterruptedException e) {
            log.error("远程调用失败", e);
        }

        try {
            cyclicBarrier.await();
            log.info("远程调用进入等待");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

}

 Future

import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;

import java.util.concurrent.*;


public class MyQuery{

    private static final Log log = LogFactory.getLog(MyQuery.class);

    private final ExecutorService executorService = Executors.newSingleThreadExecutor();

    public void asyncToSync() {
        //1. 开始调用
        super.startQuery();

        //2. 远程调用
        Future<String> stringFuture = remoteCallFuture();

        //3. 完成结果
        try {
            String result = stringFuture.get(10, TimeUnit.SECONDS);
            log.info("调用结果:{}", result);
        } catch (InterruptedException | TimeoutException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    /**
     * 远程调用
     * @return Future 信息
     */
    private Future<String> remoteCallFuture() {
        FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                log.info("开始异步调用");
                TimeUnit.SECONDS.sleep(5);
                log.info("完成异步调用");
                return "success";
            }
        });

        executorService.submit(futureTask);
        // 关闭线程池
        executorService.shutdown();
        return futureTask;
    }

}

Spring EventListener

public class BookingCreatedEvent extends ApplicationEvent {

    private static final long serialVersionUID = -13870782222217348344L;
    private String info;

    public BookingCreatedEvent(Object source) {
        super(source);
    }

    public BookingCreatedEvent(Object source, String info) {
        super(source);
        this.info = info;
    }

    public String getInfo() {
        return info;
    }
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class BookingService {

    @Autowired
    private ApplicationContext context;

    private volatile BookingCreatedEvent bookingCreatedEvent;

    /**
     * 异步转同步查询
     * @param info
     * @return
     */
    public String asyncQuery(final String info) {
        query(info);

        new Thread(new Runnable() {
            @Override
            public void run() {
                remoteCallback(info);
            }
        }).start();

        while(bookingCreatedEvent == null) {
            //.. 空循环
            // 短暂等待。
            try {
                TimeUnit.MILLISECONDS.sleep(1);
            } catch (InterruptedException e) {
                //...
            }
            //2. 使用两个单独的 event...

        }

        final String result = bookingCreatedEvent.getInfo();
        bookingCreatedEvent = null;
        return result;
    }

    @EventListener
    public void onApplicationEvent(BookingCreatedEvent bookingCreatedEvent) {
        System.out.println("监听到远程的信息: " + bookingCreatedEvent.getInfo());
        this.bookingCreatedEvent = bookingCreatedEvent;
        System.out.println("监听到远程消息后: " + this.bookingCreatedEvent.getInfo());
    }

    /**
     * 执行查询
     * @param info
     */
    public void query(final String info) {
        System.out.println("开始查询: " + info);
    }

    /**
     * 远程回调
     * @param info
     */
    public void remoteCallback(final String info) {
        System.out.println("远程回调开始: " + info);

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 重发结果事件
        String result = info + "-result";
        BookingCreatedEvent bookingCreatedEvent = new BookingCreatedEvent(this, result);
        //触发event
        this.context.publishEvent(bookingCreatedEvent);
    }
}

 

本站部分内容转载自互联网,如果有网站内容侵犯了您的权益,可直接联系我们删除,感谢支持!