跳到主要内容

项目线程池统一管理

为什么需要线程池?

使用线程池可以有效地提高资源利用率、控制并发数量、简化线程管理,并提高系统的响应速度,这对于需要高并发、高吞吐量的项目非常重要

如何使用

代码

下面是项目线程池的配置

@Configuration
@EnableAsync
public class ThreadPoolConfig implements AsyncConfigurer {
/**
* 项目共用线程池
*/
public static final String YUNFEICHAT_EXECUTOR = "yunfeichatExecutor";
/**
* websocket通信线程池
*/
public static final String WS_EXECUTOR = "websocketExecutor";

@Override
public Executor getAsyncExecutor() {
return yunfeichatExecutor();
}

@Bean(YUNFEICHAT_EXECUTOR)
@Primary
public ThreadPoolTaskExecutor yunfeichatExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setWaitForTasksToCompleteOnShutdown(true);// 等待任务执行完成后关闭线程池 优雅关闭
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("yunfeichat-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 满了调用线程执行,认为重要任务
executor.setThreadFactory(new MyThreadFactory(executor));
executor.initialize();
return executor;
}
}

代码解释:

  1. @EnableAsync 注解会启用对异步方法调用的支持

    使用 @Async 注解的方法将在独立的线程中执行,而不是在当前线程中执行。

    通常情况下,我们会配置一个自定义的 ThreadPoolTaskExecutor 作为异步执行器,以提供更细粒度的控制和配置。 在这种情况下,@EnableAsync 注解会自动检测并使用我们配置的自定义执行器,而不是使用默认的 SimpleAsyncTaskExecutor

  2. 实现了 AsyncConfigurer 接口的 getAsyncExecutor 方法,返回了 yunfeichatExecutor 方法创建的线程池实例,作为项目默认的异步执行线程池。

  3. yunfeichatExecutor 方法创建并配置了一个 ThreadPoolTaskExecutor 线程池实例:

    • setWaitForTasksToCompleteOnShutdown(true): 在关闭线程池时等待任务执行完成,实现优雅关闭。
    • setCorePoolSize(10)setMaxPoolSize(10): 设置线程池的核心线程数和最大线程数为 10。
    • setQueueCapacity(200): 设置任务队列容量为 200。
    • setThreadNamePrefix("yunfeichat-executor-"): 设置线程名称前缀。
    • setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()): 当线程池处于饱和状态时,将任务交给调用线程来执行,认为这些任务是重要的。
    • setThreadFactory(new MyThreadFactory(executor)): 使用自定义的线程工厂创建线程。
    • initialize(): 初始化线程池。
  4. Primary

    @Primary 注解的作用是标记一个 Bean 为主要 Bean,当存在多个相同类型的 Bean 时,@Primary 标记的 Bean 将被优先使用。

    在 Spring 中,当一个接口或抽象类有多个实现时,Spring 需要确定应该使用哪个实现。通常情况下,这是通过@Qualifier注解或 Bean 名称来确定的。但是,如果没有使用@Qualifier注解,并且 Bean 名称也不能明确区分,那么 Spring 会选择标有@Primary注解的 Bean。

    一个常见的例子是:当一个接口有多个实现类时,我们可以使用@Primary注解来标记其中一个实现类为主要实现,这样在自动注入时就会优先使用这个实现类。

  5. 拒绝策略

    ThreadPoolExecutor.CallerRunsPolicyRejectedExecutionHandler 的一种实现,它用于在线程池饱和时的任务拒绝处理策略。

    当线程池中的线程数量达到最大值,并且任务队列也已满时,新的任务将被拒绝执行。CallerRunsPolicy 的行为如下:

    1. 当一个任务被拒绝执行时,它将由调用者的线程来直接执行这个任务。
    2. 这样做的目的是:
      • 降低系统的整体吞吐量,使系统处于稳定状态,不会因为过多的任务进入而崩溃。
      • 给调用者一定的反馈,让它知道自己的任务没有被成功地异步执行。
    3. 这种策略适用于以下场景:
      • 对于关键任务,我们希望它们能够尽快得到执行,即使会降低整体吞吐量。
      • 对于不太重要的任务,我们可以让它们排队等待,而不是被直接拒绝。

使用

@Async
@Override
public void renewalTokenIfNecessary(String token) {
Long uid = getValidUid(token);
String key = getUidKey(uid);
Long expireDays = RedisUtils.getExpire(key, TimeUnit.DAYS);
if (expireDays == -2) {
return;
}
if (expireDays < TOKEN_EXPIRE_DAYS) {
RedisUtils.expire(key, TOKEN_EXPIRE_DAYS, TimeUnit.DAYS);
}
}

我们使用Async注解,就可以让这个方法异步执行了

优雅停机

项目关闭的时候,会调用JVM的shutdownHook回调线程池,等队列里任务执行完再停机。保证任务不丢失。

shutdownHook会回调spring容器,所以我们实现spring的DisposableBeandestroy方法也可以达到一样的效果,在里面调用executor.shutdown()并等待线程池执行完毕。

我们现在使用的线程池是Spring的线程池,而不是JUC包自带的线程池,Spring的线程池自带了优雅停机的功能。

追踪ThreadPoolTaskExecutor源码:

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {

然后进入ExecutorConfigurationSupport:

public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory
implements BeanNameAware, InitializingBean, DisposableBean {

这里看到了继承Spring的DisposableBean,里面便是destory方法:

public interface DisposableBean {
void destroy() throws Exception;
}

因此我们的ExecutorConfigurationSupport会实现这个方法:

	private boolean waitForTasksToCompleteOnShutdown = false;

public void destroy() {
shutdown();
}
public void shutdown() {
if (logger.isDebugEnabled()) {
logger.debug("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (this.executor != null) {
if (this.waitForTasksToCompleteOnShutdown) {
this.executor.shutdown();
}
else {
for (Runnable remainingTask : this.executor.shutdownNow()) {
cancelRemainingTask(remainingTask);
}
}
awaitTerminationIfNecessary(this.executor);
}
}

解释:这里线程池会根据waitForTasksToCompleteOnShutdown的值来判断是否要等待所有的任务执行完,默认是false,只有为true的时候,才会调用线程池的shutdown。this.executor.shutdown();

线程池的shutdown:

shutdown() 方法用于有序地关闭执行者服务(ExecutorService)。其主要作用如下:

  1. 有序关闭:当调用 shutdown() 方法时,执行者服务将停止接受新任务,但仍会完成之前提交的任务。

  2. 拒绝新任务:在调用 shutdown() 方法之后,执行者服务将不再接受新的任务。任何尝试提交新任务的操作都会抛出 RejectedExecutionException 异常。

  3. 不等待任务完成:shutdown() 方法并不会等待之前提交的任务全部完成。如果需要等待任务完成,应该使用 awaitTermination() 方法。

  4. 安全异常:如果存在安全管理器,并且调用者没有足够的权限修改执行者服务管理的线程,shutdown() 方法可能会抛出 SecurityException 异常。

异常捕获

线程池默认不会抛异常,而仅仅是打印错误信息

@Test
void main() {
threadPoolTaskExecutor.execute(() -> {
if (1 == 1) {
log.error("出错了");
throw new RuntimeException("test");
}
});
}

运行结果:

image-20240517205519031

我们想要的结果是向上面一样可以有日志,而不是在控制台打印。如果出了问题,却不打印error日志,那问题就被隐藏了,非常危险

原因,在ThreadGroup.java文件中,我们找到了对应的源码,可以看到这里仅仅是控制台打印:

public void uncaughtException(Thread t, Throwable e) {
if (parent != null) {
parent.uncaughtException(t, e);
} else {
Thread.UncaughtExceptionHandler ueh =
Thread.getDefaultUncaughtExceptionHandler();
if (ueh != null) {
ueh.uncaughtException(t, e);
} else if (!(e instanceof ThreadDeath)) {
System.err.print("Exception in thread \""
+ t.getName() + "\" ");
e.printStackTrace(System.err);
}
}
}

现在我们想要重写这个方法,能够以日志的方式进行捕获:

重写,使得可以以日志的方式打印错误信息:

@Slf4j
public class GlobalUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {

private static final GlobalUncaughtExceptionHandler instance = new GlobalUncaughtExceptionHandler();

private GlobalUncaughtExceptionHandler() {
}

@Override
public void uncaughtException(Thread t, Throwable e) {
log.error("Exception in thread {} ", t.getName(), e);
}

public static GlobalUncaughtExceptionHandler getInstance() {
return instance;
}

}

接下来就是使用这个了:

@Bean(YUNFEICHAT_EXECUTOR)
@Primary
public ThreadPoolTaskExecutor yunfeichatExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setWaitForTasksToCompleteOnShutdown(true);// 等待任务执行完成后关闭线程池 优雅关闭
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("yunfeichat-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 满了调用线程执行,认为重要任务
executor.setThreadFactory(new MyThreadFactory(executor));
executor.initialize();
return executor;
}

我们发现ThreadPoolTaskExecutor这个类已经实现了ThreadFactory接口

image-20240517210435462

如果我们把线程工厂换了,那么里面的设置都要重新写,现在我们想要使用这个线程工厂,并在此基础上可以增加新的功能,也就是加上我们的GlobalUncaughtExceptionHandler,因此,我们很自然的可以想到装饰器模式。

于是便有了我们的MyThreadFactory

@Slf4j
@AllArgsConstructor
public class MyThreadFactory implements ThreadFactory {

private final ThreadFactory factory;

@Override
public Thread newThread(Runnable r) {
Thread thread = factory.newThread(r);
thread.setUncaughtExceptionHandler(GlobalUncaughtExceptionHandler.getInstance());
return thread;
}
}

这里是组合关系,我们保存组合的成员属性,通过构造器传入

如果是继承,那么可以使用super