首页>>后端>>java->利用CompletableFuture做多线程并发操作

利用CompletableFuture做多线程并发操作

时间:2023-11-29 本站 点击:13

前言

在项目开发中,经常会遇到一个问题:在一个后端接口里,往往会进行多项耗时任务(相互之间独立,没有依赖)的操作,如:

需要从不同的外部接口获取不同的数据,做融合;

请求外部接口数据的同时,还需要读取数据库;

等等

如果在一个请求的主线程里,串行做这些任务操作,会导致响应时间的线性叠加,极有可能导致不符合要求,如图1:

那么,对这些耗时任务进行并行操作,从而使得:响应时间 约等于 耗时最大的任务处理时间,这样可以大大降低系统的响应时间,如图2:

Future 和 CompletableFuture

Future

Future类型,其实就是一个未来任务的返回对象,或者说是子线程的返回对象(通过线程池方式分配子线程)

ExecutorServiceexecutor=Executors.newFixedThreadPool(4);//定义任务:Callable<String>task=newTask();//提交任务并获得Future:Future<String>future=executor.submit(task);//从Future获取异步执行返回的结果:Stringresult=future.get();//可能阻塞

可以看到,通过线程池的方式创建子线程后,executor.submit()返回的是一个Future对象,通过future.get()方法来获得该子任务的运行结果。需要注意的是,这个操作是阻塞的,也就是说,如果这个子任务没有运行结束,主线程会一直block在改行,直到子任务完成。

一个Future<V>接口表示一个未来可能会返回的结果,它定义的方法有:(refer to Ref.4)

get():获取结果(可能会等待)

get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;

cancel(boolean mayInterruptIfRunning):取消当前任务;

isDone():判断任务是否已完成。

CompletableFuture

当需要判断图2中的所有task是否完成时,如果采用Future,则需要:

调用future.get()获取运行结果,

或者轮询future.isDone()方法直到返回true

无论哪种方法,都是在主线程里调用,且会阻塞主线程。

以上痛点,从Java 8开始引入了CompletableFuture方法。主要新增的功能有:

thenAccept(): 当task正常完成后,回调调用.thenAccept()方法

exceptionally(): 当task出现异常是,回调调用.exceptionally()方法

anyOf(): 当所有的task中,只要有一个task完成,则主线程继续往下走,可以使用.anyOf()方法

allOf(): 所有的task均完成后,则主线程继续往下走

supplyAsync(): 异步执行,有返回值

runAsync(): 异步执行,无返回值

针对图2,需要所有task都完成后,再执行后续操作,就可以用allOf()方法:

CompletableFuture.allOf(task1,task2,...,taskn).join();

注意:CompletableFuture的命名规则:

xxx():表示该方法将继续在已有的线程中执行;

xxxAsync():表示将异步在线程池中执行,即可以异步执行。

基于CompletableFuture+线程池的代码实现

线程池配置类

@Configuration@Slf4j@EnableAsyncpublicclassExecutorConfig{@BeanpublicExecutorasyncExecutor(){log.info("startasyncexecutor");ThreadPoolTaskExecutorthreadPoolTaskExecutor=newThreadPoolTaskExecutor();//配置核心线程数threadPoolTaskExecutor.setCorePoolSize(ThreadPoolConstant.CORE_POOL_SIZE);//配置最大线程数threadPoolTaskExecutor.setMaxPoolSize(ThreadPoolConstant.MAX_POOL_SIZE);//配置队列大小threadPoolTaskExecutor.setQueueCapacity(ThreadPoolConstant.QUEUE_CAPACITY);//配置线程池中线程的名称前缀threadPoolTaskExecutor.setThreadNamePrefix(ThreadPoolConstant.THREAD_NAME_PREFIX);//HelloWorldServiceImplrejection-policy:当pool已经达到maxsize时,如何处理新任务://CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行;//AbortPolicy:拒绝执行新任务,并抛出RejectedExecutionException异常;//DiscardPolicy:丢弃当前将要加入队列的任务;//DiscardOldestPolicy:丢弃任务队列中最旧的任务;threadPoolTaskExecutor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());threadPoolTaskExecutor.initialize();returnthreadPoolTaskExecutor;}}

异步服务与服务实现

publicinterfaceAsyncService{@Async("asyncExecutor")CompletableFuture<String>getResponseFromCp(QueryTrainInfoDetailReqDTOWithTypequeryTrainInfoDetailReqDTOWithType,intqueryType);}
@ServicepublicclassAsyncServiceImplimplementsAsyncService{@AutowiredCustomPropscustomProps;@AutowiredRestTemplaterestTemplate;@OverridepublicCompletableFuture<String>getResponseFromCp(QueryTrainInfoDetailReqDTOWithTypequeryTrainInfoDetailReqDTOWithType,intqueryType){returnCompletableFuture.completedFuture(FactoryUtil.createFactory(customProps,null,restTemplate).obtainData(queryTrainInfoDetailReqDTOWithType.setQueryType(queryType),String.class));}}

业务代码中调用异步服务接口

...@AutowiredAsyncServiceasyncService;@OverridepublicReturnDataqTrainInfoDetail(QueryTrainInfoDetailReqDTOqueryTrainInfoDetailReqDTO){QueryTrainInfoDetailReqDTOWithTypequeryTrainInfoDetailReqDTOWithType=newQueryTrainInfoDetailReqDTOWithType().setQueryTrainInfoDetailReqDTO(queryTrainInfoDetailReqDTO);CompletableFuture<String>fromCpFirstReq=asyncService.getResponseFromCp(queryTrainInfoDetailReqDTOWithType,1);CompletableFuture<String>fromCpSecondReq=asyncService.getResponseFromCp(queryTrainInfoDetailReqDTOWithType,2);CompletableFuture.allOf(fromCpFirstReq,fromCpSecondReq).join();//阻塞直到当第一次请求和第二次请求都完成}...

参考文章

Java CompletableFuture 详解

SpringBoot中如何优雅的使用多线程

SpringBoot 使用 Future 实现多任务并行

使用Future

使用CompletableFuture

CompletableFuture 详解


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/84.html