本文介绍: 在上一篇文章中,我们使用多个线程隔离不同的异步任务,这篇文章我们将围绕在@Async线程池的拒绝策略进行完善线程池的使用,在我们例举案例之前,我们先了解一下什么情况下才使用@Async线程拒绝策略@Async拒绝策略用来解决什么问题,使用他有什么好处?

上一篇文章中,我们使用多个线程隔离不同的异步任务,这篇文章我们将围绕在@Async线程池的拒绝策略进行完善线程池的使用,在我们例举案例之前,我们先了解一下:


  1. @Async拒绝策略用来解决什么问题,还有使用他究竟有什么好处?

    使用@Async的拒绝策略可以解决异步任务线程队列已满时的问题。当线程队列已满时,默认的拒绝策略是抛出RejectedExecutionException异常,表示无法接受新的任务。而自定义拒绝策略可以提供一种灵活的方式处理这种情况,从而解决以下问题

    1. 避免任务丢失:当线程池队列已满时,如果没有合适的拒绝策略,新的任务可能会被丢弃,导致任务丢失。通过自定义拒绝策略,你可以选择将任务丢弃、阻塞等待或者采取其他适当的处理方式,以避免任务丢失

    2. 控制任务流量:拒绝策略可以帮助你控制任务的流量。当线程池队列已满时,你可以选择拒绝执行新的任务,从而控制任务的提交速率,避免系统资源被过度消耗。这对于保护系统的稳定性和可靠性非常重要。

    3. 提供反馈机制自定义拒绝策略可以提供一种反馈机制,告知任务提交者任务被拒绝执行的原因。通过捕获拒绝执行异常或其他方式,你可以根据需要记录日志发送通知或采取其他适当的操作以便及时了解任务无法执行的情况。


  1. 在什么情况下,我们才使用@Async的拒绝策略?
    1. 任务队列满载:当异步任务提交的速度超过线程池处理任务的速度时,任务队列可能会被填满。这时,新的任务无法加入队列,就需要使用线程拒绝策略来处理这些被拒绝的任务。

    2. 任务执行资源有限:当系统资源(如线程数)有限,并且无法扩展时,可能会出现无法处理所有任务的情况。这时,使用线程拒绝策略可以控制任务的提交速率,避免资源被过度消耗。

    3. 任务处理能力不足:当异步任务的处理能力不足以满足需求时,可以使用线程拒绝策略来限制任务的提交,以避免任务堆积和系统负载过高。

    4. 任务优先级管理:有时,你可能希望根据任务的优先级来管理任务的执行。通过自定义线程拒绝策略,你可以根据任务的优先级进行选择性的拒绝执行,以确保高优先级任务能够及时得到处理

接下来,我们看一个案例
我们先创建一个Spring Boot应用创建好我们的线程池配置

@EnableAsync
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @EnableAsync
    @Configuration
    class TaskPoolConfig {
        @Bean
        public Executor taskExecutor1() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(2);
            executor.setMaxPoolSize(2);
            executor.setQueueCapacity(2);
            executor.setKeepAliveSeconds(60);
            executor.setThreadNamePrefix("executor-1-");
            //后续在这里填写我们需要的拒绝策略
            return executor;
        }
    }
}

如上,我们创建了一个核心线程数为2,最大线程数为2,缓冲队列长度为2,假设我们有五个异步任务同时开始,那么会造成什么情况呢?

接着看吧,我们使用@Async注解实现一个任务

@Slf4j
@Component
public class AsyncTasks {

    public static Random random = new Random();

    @Async("taskExecutor1")
    public CompletableFuture<String> doTaskOne(String taskNo) throws Exception {
        log.info("开始任务:{}", taskNo);
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("完成任务:{},耗时:{} 毫秒", taskNo, end - start);
        return CompletableFuture.completedFuture("任务完成");
    }
}

我们来编写一个测试用例,来看看会发生什么结果

@Slf4j
@SpringBootTest
public class ApplicationTests {

    @Autowired
    private AsyncTasks asyncTasks;

    @Test
    public void test2() throws Exception {
        long start = System.currentTimeMillis();
        // 线程池1
        CompletableFuture<String> task1 = asyncTasks.doTaskOne("1");
        CompletableFuture<String> task2 = asyncTasks.doTaskOne("2");
        CompletableFuture<String> task3 = asyncTasks.doTaskOne("3");
        CompletableFuture<String> task4 = asyncTasks.doTaskOne("4");
        CompletableFuture<String> task5 = asyncTasks.doTaskOne("5");
        // 一起执行
        CompletableFuture.allOf(task1, task2, task3, task4, task5).join();
        long end = System.currentTimeMillis();
        log.info("任务全部完成,总耗时:" + (end - start) + "毫秒");
    }
}
2023-11-28 19:03:57.138  INFO 27916 --- [   executor-1-1] com.miaow.demo.AsyncTasks       : 开始任务:1
2023-11-28 19:03:57.138  INFO 27916 --- [   executor-1-2] com.miawo.demo.AsyncTasks       : 开始任务:2

org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@5580d62f[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@17b6d426

	at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:324)
	at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1618)
	at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1843)
	at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:274)
	...
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
	at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@17b6d426 rejected from java.util.concurrent.ThreadPoolExecutor@5580d62f[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:321)
	... 76 more

我们来对报错日志进行分析
[java.util.concurrent.ThreadPoolExecutor@5580d62f[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: 这段代码中,我们可以明确的知道,在我们的超过了执行线程 + 缓冲队列长度,也就是 2 + 2 = 4 ,但是我们进来了5个线程,所以我们的第五个线程就被拒绝了。
所以,在默认情况之下,我们的线程池的拒绝策略就是:
当线程池队列满了,那么我们的线程池就会丢弃这个任务,并抛出异常


OK,既然线程池中有默认的线程池拒绝策略,那么我们可以对他配置吗?考虑到实际开发过程中,我们在有些任务场景中,直接拒绝的策略一般都不太适用,有的时候,我们会选择丢掉之前开始执行但是并未完成的任务,也可能会考虑丢掉刚刚开始执行,但是没完成的任务,反正有各种场景,只要你线程没执行完毕,我就可以丢弃你,那么我们具体要怎么实现呢?

线程池的拒绝策略是指当线程池无法接受新的任务时,如何处理这些被拒绝的任务。在Spring框架中,可以通过配置ThreadPoolTaskExecutor设置线程池的拒绝策略。

ThreadPoolTaskExecutor提供了几种常见的拒绝策略:

  • AbortPolicy(默认):当线程池无法接受新的任务时,直接抛出RejectedExecutionException异常

  • CallerRunsPolicy:当线程池无法接受新的任务时,将任务返回调用者执行。也就是说,如果线程池满了,任务会在调用者的线程中执行。

  • DiscardPolicy:当线程池无法接受新的任务时,直接丢弃这个任务,不做任何处理

  • DiscardOldestPolicy:当线程池无法接受新的任务时,先丢弃最早加入队列的任务,然后尝试再次提交新的任务。

来,我们在代码中进行配置

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

// AbortPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

// DiscardPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

// DiscardOldestPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

// CallerRunsPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

Lamba表达式的线程策略配置:

executor.setRejectedExecutionHandler((r, executor1) -> {
    // 拒绝策略的逻辑
});

总的来说,当异步任务的提交速度超过处理速度、资源有限或任务处理能力不足时,使用@Async的线程拒绝策略可以帮助你控制任务的提交速率,避免任务堆积和系统负载过高。这样可以提高系统的稳定性和可靠性,确保异步任务的顺利执行。

原文地址:https://blog.csdn.net/qq_45922256/article/details/134668302

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。

如若转载,请注明出处:http://www.7code.cn/show_5721.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注