DynamicTp 是什么
DynamicTp 是一个基于 Java 的动态线程池框架,特性如下:
-
代码零侵入:我们改变了线程池以往的使用姿势,所有配置均放在配置中心,服务启动时会从配置中心拉取配置生成线程池对象放到 Spring 容器中,使用时直接从 Spring 容器中获取,对业务代码零侵入
-
轻量简单:使用起来极其简单,引入相应依赖,接入只需简单 4 步就可完成,顺利 3 分钟搞定,相当丝滑
-
通知告警:提供多种通知告警维度(配置变更通知、活性报警、队列容量阈值报警、拒绝触发报警、任务执行或等待超时报警),触发配置阈值实时推送告警信息,已支持企微、钉钉、飞书、邮件、云之家报警,同时提供 SPI 接口可自定义扩展实现
-
运行监控:定时采集线程池指标数据(20 多种指标,包含线程池维度、队列维度、任务维度、tps、tpxx 等),支持通过 MicroMeter、JsonLog、JMX 三种方式定时获取,也可以通过 SpringBoot Endpoint 端点实时获取最新指标数据,同时提供 SPI 接口可自定义扩展实现
-
任务增强:提供任务包装功能(比 Spring 线程池任务包装更强大),实现 TaskWrapper 接口即可,如 MdcTaskWrapper、TtlTaskWrapper、SwTraceTaskWrapper、OpenTelemetryWrapper,可以支持线程池上下文信息传递
-
多配置中心支持:支持多种主流配置中心,包括 Nacos、Apollo、Zookeeper、Consul、Etcd、Polaris、ServiceComb,同时也提供 SPI 接口可自定义扩展实现
-
中间件线程池管理:集成管理常用第三方组件的线程池,已集成 Tomcat、Jetty、Undertow、Dubbo、RocketMq、Hystrix、Grpc、Motan、Okhttp3、Brpc、Tars、SofaRpc、RabbitMq、Liteflow 等组件的线程池管理(动态调参、监控、报警)
-
多模式:提供了增强线程池 DtpExecutor,IO 密集型场景使用的线程池 EagerDtpExecutor,调度线程池 ScheduledDtpExecutor,有序线程池 OrderedDtpExecutor,可以根据业务场景选择合适的线程池
-
兼容性:JUC 普通线程池和 Spring 中的 ThreadPoolTaskExecutor 也可以被框架管理,只需@Bean 定义时加 @DynamicTp 注解即可
-
可靠性:依靠 Spring 生命周期管理,可以做到优雅关闭线程池,在 Spring 容器关闭前尽可能多的处理队列中的任务
-
高可扩展:框架核心功能都提供 SPI 接口供用户自定义个性化实现(配置中心、配置文件解析、通知告警、监控数据采集、任务包装、拒绝策略等等)
-
线上大规模应用:参考美团线程池实践,美团内部已经有该理论成熟的应用经验
示例代码
以下是一个使用 DynamicTp 实现动态调整线程池线程数量的完整实例代码,本示例基于 Spring Boot 框架,使用 Nacos 作为配置中心。
1. 添加依赖
在 pom.xml
中添加 DynamicTp 和 Nacos 相关依赖:
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- DynamicTp -->
<dependency>
<groupId>cn.dynamic-tp</groupId>
<artifactId>dynamic-tp-spring-boot-starter</artifactId>
<version>1.1.4</version>
</dependency>
<!-- Nacos Config -->
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
2. 配置 Nacos
在 application.yml
中配置 Nacos 信息:
spring:
application:
name: dynamic-tp-demo
cloud:
nacos:
config:
server-addr: 127.0.0.1:8848
file-extension: yaml
3. 配置线程池
在 Nacos 配置中心创建一个 dynamic-tp-demo.yaml
文件,添加线程池配置:
spring:
dynamic:
tp:
enabled: true
executors:
- threadPoolName: myDynamicTp
corePoolSize: 5
maxPoolSize: 10
queueCapacity: 200
keepAliveTime: 60
timeUnit: SECONDS
4. 创建线程池 Bean
在 Spring Boot 项目中创建线程池 Bean:
import com.dtp.core.annotation.DynamicTp;
import com.dtp.core.support.ThreadPoolBuilder;
import com.dtp.core.thread.DtpExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
@Configuration
public class ThreadPoolConfig {
@Bean
@DynamicTp
public ExecutorService myDynamicTp() {
return ThreadPoolBuilder.newBuilder()
.threadPoolName("myDynamicTp")
.buildDynamic();
}
}
5. 创建测试服务
创建一个服务类,使用线程池执行任务:
import org.springframework.stereotype.Service;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@Service
public class TaskService {
private final ExecutorService executorService;
public TaskService(ExecutorService myDynamicTp) {
this.executorService = myDynamicTp;
}
public void submitTask() {
executorService.submit(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("Task executed by thread: " + Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
6. 创建控制器
创建一个控制器来触发任务提交:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TaskController {
@Autowired
private TaskService taskService;
@GetMapping("/submitTask")
public String submitTask() {
taskService.submitTask();
return "Task submitted.";
}
}
7. 运行项目并测试
启动 Spring Boot 项目,访问 http://localhost:8080/submitTask
来提交任务。
8. 动态调整线程池参数
在 Nacos 配置中心修改 dynamic-tp-demo.yaml
文件中的线程池参数,例如将 corePoolSize
改为 8,maxPoolSize
改为 15,保存配置后,DynamicTp 会自动监听配置变更并动态调整线程池的线程数量。
架构设计
框架功能大体可以分为以下几个模块
配置变更监听模块
线程池管理模块
监控模块
通知告警模块
三方组件线程池管理模块
更新线程池逻辑
具体逻辑在 DtpRegistry
类中的 doRefreshPoolSize
方法,其功能是根据新的线程池属性配置(DtpExecutorProps
)来更新线程池的核心线程数(corePoolSize
)和最大线程数(maximumPoolSize
)。
代码如下:
private static void doRefreshPoolSize(ExecutorAdapter<?> executor, DtpExecutorProps props) {
if (props.getMaximumPoolSize() < executor.getMaximumPoolSize()) {
if (!Objects.equals(executor.getCorePoolSize(), props.getCorePoolSize())) {
executor.setCorePoolSize(props.getCorePoolSize());
}
if (!Objects.equals(executor.getMaximumPoolSize(), props.getMaximumPoolSize())) {
executor.setMaximumPoolSize(props.getMaximumPoolSize());
}
return;
}
if (!Objects.equals(executor.getMaximumPoolSize(), props.getMaximumPoolSize())) {
executor.setMaximumPoolSize(props.getMaximumPoolSize());
}
if (!Objects.equals(executor.getCorePoolSize(), props.getCorePoolSize())) {
executor.setCorePoolSize(props.getCorePoolSize());
}
}
-
方法参数:
ExecutorAdapter<?> executor
:表示要更新配置的线程池执行器适配器,通过这个适配器可以调用线程池的各种设置方法,如设置核心线程数和最大线程数。DtpExecutorProps props
:包含了新的线程池属性配置,其中包括新的核心线程数和最大线程数等信息。
-
主要逻辑:
- 首先判断新的最大线程数(
props.getMaximumPoolSize()
)是否小于当前线程池的最大线程数(executor.getMaximumPoolSize()
):- 如果新的最大线程数小于当前最大线程数,说明需要缩小线程池规模。此时先检查核心线程数是否需要更新,即比较当前核心线程数(
executor.getCorePoolSize()
)和新的核心线程数(props.getCorePoolSize()
)是否相等。如果不相等,则调用executor.setCorePoolSize(props.getCorePoolSize())
方法将核心线程数更新为新的值。 - 接着检查最大线程数是否需要更新,比较当前最大线程数和新的最大线程数是否相等。如果不相等,则调用
executor.setMaximumPoolSize(props.getMaximumPoolSize())
方法将最大线程数更新为新的值。 - 完成上述操作后,通过
return
语句结束方法,不再执行后续逻辑。
- 如果新的最大线程数小于当前最大线程数,说明需要缩小线程池规模。此时先检查核心线程数是否需要更新,即比较当前核心线程数(
- 如果新的最大线程数不小于当前最大线程数,说明不需要缩小线程池规模,或者是要扩大线程池规模。此时先检查最大线程数是否需要更新,即比较当前最大线程数和新的最大线程数是否相等。如果不相等,则调用
executor.setMaximumPoolSize(props.getMaximumPoolSize())
方法将最大线程数更新为新的值。 - 最后检查核心线程数是否需要更新,比较当前核心线程数和新的核心线程数是否相等。如果不相等,则调用
executor.setCorePoolSize(props.getCorePoolSize())
方法将核心线程数更新为新的值。
- 首先判断新的最大线程数(
文章评论